Home Running a Spark notebook in AWS EMR
Post
Cancel

Running a Spark notebook in AWS EMR

The following blog will demonstrate how to execute a spark notebook programatically in AWS EMR via the aws cli. All the bash scripts and json configurations referenced in this blog can be accessed here

Create the EMR cluster

This assumes you have data in S3 bucket and want to copy into hdfs as a step when creating the EMR cluster (based on s3_hdfs_copy_step.json). This enables s3-transfer acceleration by using s3-accelerate.amazonaws.com endpoint (which assumes the respective S3 bucket has transfer acceleartion enabled (refer to S3 uploads for large datasets section). To disable this, modify to –s3Endpoint=s3.amazonaws.com in json. Run the bash script below and pass in first arg as true or false (if you want script to also start notebook execution after cluster created). Second arg is desired timeout threhsold in secs after which the cluster auto-terminates if idle.

  • with no notebook execution (this is also the default setting and equivalent to just running sh aws_emr/create_cluster.sh)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ sh aws_emr/create_cluster.sh false 3600


 Running emr create cluster command: 
{
    "ClusterId": "j-3GOQA3U911OMF",
    "ClusterArn": "arn:aws:elasticmapreduce:us-east-1:376337229415:cluster/j-3GOQA3U911OMF"
}

Cluster Creation time: 2022-04-28T00:11:12
Script complete !


  • with notebook execution set to true - script will continue running till notebook start execution complete. To do this, it needs to check cluster create status and wait for this to change to ‘WAITING’. This can be between 7-12 mins depending on number of steps added to cluster creation action
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
$ sh aws_emr/create_cluster.sh true 3600


changing to dir containing config files /Users/rk1103/Documents/databricks_spark_certification/aws_emr/

 Running emr create cluster command: 
{
    "ClusterId": "j-1DE5WQUU4DNHZ",
    "ClusterArn": "arn:aws:elasticmapreduce:us-east-1:376337229415:cluster/j-1DE5WQUU4DNHZ"
}

Cluster Creation time: 2022-04-26T23:25:10

Checking cluster status is in 'WAITING' state before notebook start execution ....

Cluster status still in STARTING state. Waiting for a minute, before checking again
Cluster status still in STARTING state. Waiting for a minute, before checking again
Cluster status still in STARTING state. Waiting for a minute, before checking again
Cluster status still in STARTING state. Waiting for a minute, before checking again
Cluster status still in STARTING state. Waiting for a minute, before checking again
Cluster status still in STARTING state. Waiting for a minute, before checking again
Cluster status still in STARTING state. Waiting for a minute, before checking again
Cluster status still in STARTING state. Waiting for a minute, before checking again
Cluster status still in STARTING state. Waiting for a minute, before checking again
Cluster status still in STARTING state. Waiting for a minute, before checking again
Cluster status still in RUNNING state. Waiting for a minute, before checking again
Cluster status still in RUNNING state. Waiting for a minute, before checking again

CLuster status now in WAITING state, so starting notebook execution
{
    "NotebookExecutionId": "ex-J02SKBUZSN8BD2YCTXSA7MW022RP0"
}

Script complete !
  • or to run without steps - pass in third arg as empty string
1
$ sh aws_emr/create_cluster.sh true 3600 ""

SSH into the master node

Once cluster has been setup - we can ssh into master node by following command. we will need to first change permissions on file to allow only read-only access. Otherwise you will get an error like: Permissions 0644 for ‘youramazon.pem’ are too open. It is recommended that your private key files are NOT accessible by others. This private key will be ignored.

Then run the ssh command with path to where you private key is stored and the dns of the master node (which you can get from the cluster summary on console or via cli)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$ chmod 400 <PRIVATE-KEY>
$ ssh -i <PATH-TO-PRIVATE-KEY> hadoop@<MASTER-PUBLIC_DNS>



Last login: Tue Apr 26 00:16:33 2022

       __|  __|_  )
       _|  (     /   Amazon Linux 2 AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-2/
42 package(s) needed for security, out of 80 available
Run "sudo yum update" to apply all updates.
                                                                    
EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR    
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R   
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R 
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R 
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR   
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R  
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR
                                                                    
[hadoop@ip-10-0-0-58 ~]$ 

The command above creates an EMR cluster with spark and configuration for master and core nodes as speciifed in instancegroupconfig.json along with auto scaling limits. The VPC subnet and availability zones are speciifed in –ec2-attributes values. Finally, we set –no-termination-protected to allow manual termination of cluster and set –auto-termination-policy to just over 1.5 hrs so the cluster terminates automatically if in idle state for this time. For more options and variations of this command please refer to the AWS docs here

To automate execution of an existing EMR notebook, run the following command. Each notebook execution also writes the updated notebook file in a folder titled executions inside notebook storage location in the aws-emr-resources bucket configured for EMR notebooks in the tree structure as below:

aws-emr-resources-376337229415-us-east-1
      - notebooks
         - e-1VLA7UDB2TM65N23MXOLDAA48 (notebook-id)
             - executions
                  - ex-J02SWU1BN9LI8XJ2U56LIYY7ZYPYU (execution-id)

The result of each execution is stored in a different folder named with execution id

1
2
3
4
5
6
7
8
9
10
11
12
aws emr --region us-east-1 \
start-notebook-execution \
--editor-id e-1VLA7UDB2TM65N23MXOLDAA48 \
--relative-path parking_ticket_violations.ipynb \
--notebook-execution-name test \
--execution-engine '{"Id" : "j-3UZJRU19QI2AM"}' \
--service-role EMR_Notebooks_DefaultRole

{
    "NotebookExecutionId": "ex-J02SMG5VBOA7L6QMRR1D710S64V7S"
}

Monitoring the cluster metrics

The Monitor Metrics with CloudWatch docs lists all the available EMR metrics on CloudWatch.

You can view the metrics that Amazon EMR reports to CloudWatch using the Amazon EMR console or the CloudWatch console. From the cloudwatch console, navigate to ‘all metrics’, ‘EMR’ and in the ‘jobflowmetrics’ column, filter by cluster id e.g. j-B857LVXYQNH5. Tick the corresponding metrics to see the graph being updated with the metric overlayed (as in the screenshots below).You can see the selected metrics in graph metrics tab or run queries to get further insights

S3 uploads for large datasets

Data in S3 can be accessed directly from spark in EMR via spark.read.csv() command. The following workflow is just for getting data into hdfs.

S3 Transfer Acceleration helps you fully utilize your bandwidth, minimize the effect of distance on throughput, and is designed to ensure consistently fast data transfer to Amazon S3 regardless of your source’s location. The amount of acceleration primarily depends on your available bandwidth, the distance between the source and destination, and packet loss rates on the network path. Generally, you will see more acceleration when the source is farther from the destination, when there is more available bandwidth, and/or when the object size is bigger.

Therefore, in order to test it, one could use the Amazon S3 Transfer Acceleration Speed Comparison tool to compare accelerated and non-accelerated upload speeds across Amazon S3 regions. The Speed Comparison tool uses multipart uploads to transfer a file from your browser to various Amazon S3 regions with and without using Transfer Acceleration. You can access the Speed Comparison tool using the link

Alternatively, for testing the upload speed of Amazon S3 Transfer Acceleration for a specific file size, follow the instructions here: https://aws.amazon.com/premiumsupport/knowledge-center/upload-speed-s3-transfer-acceleration/ to install the required dependencies from linux command line. and then execute the bash script downloaded from https://github.com/awslabs/aws-support-tools/blob/master/S3/S3_Transfer_Acceleration/Bash-script/test-upload.sh Pass in required file path and s3 destination file name in the subsequent input prompts as below

1
2
3
4
5
6
7
8
9
10
$ sh aws_etl/datasets/test-upload.sh 

Enter the local path of the file you want to upload: /Users/rk1103/Documents/databricks_spark_certification/datasets/parking_violations/parking_violations_2017.csv
Enter the destination file name: parking_violations_2017.csv
Enter the name of the bucket: big-datasets-over-gb
Enter the region of your Bucket (e.g. us-west-2): us-east-1
Do you want to upload via Transfer Acceleration: (y/n) y
Completed 1.1 GiB/1.9 GiB (2.2 MiB/s) with 1 file(s) remaining    

If S3 transfer acceleration is improving results then use https://docs.aws.amazon.com/AmazonS3/latest/dev/transfer-acceleration-examples.html#transfer-acceleration-examples-aws-cli-2 upload data to S3 bucket using transfer acceleration endpoint and check if you are getting expected results.

1
2
aws s3 cp ~/Downloads/archive/parking_violations_2017.csv s3://big-datasets-over-gb/parking_violations/2017/ --endpoint-url https://s3-accelerate.amazonaws.com 

Further, if transfer acceleration is not giving improved speed for your region, then you can use S3 Multipart uploads to upload on object to S3 bucket. Multipart upload allows you to upload a single object as a set of parts. In general, we recommend that, when your object size reaches 100 MB, you should consider using multipart uploads instead of uploading the object in a single operation. https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html https://aws.amazon.com/premiumsupport/knowledge-center/s3-multipart-upload-cli/

So, you can make use of AWS CLI in order to upload objects using multipart upload. Also, kindly know that, by default ‘multipart_threshold’ value is set to 8MB so, when uploading, downloading, or copying a file, S3 will automatically switch to multipart operations if the file reaches a given size threshold. However, in order to improve upload performance you can customize the values for multipart upload configuration i.e the values for multipart_threshold, multipart_chunk size etc. accordingly, in the default location of ~/.aws/config file. So, the chunk size can be increased in the CLI in the ~/.aws/config file so that you have bigger chunk sizes and therefore less parts. Please refer the below document for more information: https://docs.aws.amazon.com/cli/latest/topic/s3-config.html

1
2
3
4
$ aws configure set default.s3.multipart_chunksize 1GB
$ aws s3 cp ~/Documents/databricks_spark_certification/datasets/seattle_library/library-collection-inventory.csv s3://big-datasets-over-gb/seattle_library/ \
--endpoint-url https://s3-accelerate.amazonaws.com

Note: upload speed depends on many factors such as internet speed, file size and concurrency including network slowness or congestion from client’s end etc

To spot check file location is all ok in hdfs and contents, ssh into master node as described in the earlier section. Use Hadoop HDFS commands as described in this tutorial

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[hadoop@ip-10-0-0-58 ~]$ hadoop fs -ls hdfs:///output
Found 2 items
drwxr-xr-x   - hadoop hadoop          0 2022-04-26 00:02 hdfs:///output/2016
drwxr-xr-x   - hadoop hadoop          0 2022-04-26 00:02 hdfs:///output/2017


[hadoop@ip-10-0-0-58 ~]$ hadoop fs -ls hdfs:///output/2016
Found 1 items
-rw-r--r--   1 hadoop hadoop 2151937808 2022-04-26 00:03 hdfs:///output/2016/parking_violations_2016.csv

[hadoop@ip-10-0-0-58 ~]$ hadoop fs -tail hdfs:///output/2016
tail: `hdfs:///output/2016': Is a directory
[hadoop@ip-10-0-0-58 ~]$ hadoop fs -tail hdfs:///output/2016/parking_violations_2016.csv
9500927,FFM1969,NY,PAS,06/08/2016,38,SUBN,TOYOT,T,25590,56890,78820,20161128,0043,43,43,362197,T201,P,1150A,,BX,O,1416,East Ave,,0,408,h1,,Y,0900A,0700P,BLUE,,2013,,0,04 2,38-Failure to Display Muni Rec,,,,,,,,,,,
8359500940,2482082,IN,PAS,06/08/2016,51,VAN,FRUEH,T,73980,40404,40404,88880088,0043,43,43,362197,T201,P,1207P,,BX,F,1499,West St,,0,408,e3,,,,,WHITE,,0,,0,04 2,51-Sidewalk,,,,,,,,,,,
8359500952,65919JW,NY,COM,06/08/2016,52,VAN,FRUEH,T,0,0,0,88888888,0043,43,43,362197,T201,P,1212P,,BX,I,W,Metropolitan Ave,0ft S/of Wood Rd,0,408,e4,,,,,BROWN,,2007,,0,04 2,52-Intersection,,,,,,,,,,,
8359500964,68718MG,NY,COM,06/08/2016,82,PICK,DODGE,T,0,0,0,20170531,0043,43,43,362197,T201,P,1219P,,BX,I,S,Wood Ave,85ft W/of Virginia A,0,408,k1,,,,,WH,,2015,,0,04 2,82-Unaltered Commerc Vehicle,,,,,,,,,,,
8359500976,FZX4974,NY,PAS,06/08/2016,38,4DSD,HONDA,T,0,0,0,20160711,0043,43,43,362197,T201,P,1222P,,BX,I,S,Wood Ave,95ft W/of Virginia A,0,408,h1,,Y,0830A,0700P,BK,,2013,,0,04 2,38-Failure to Display Muni Rec,,,,,,,,,,,
[hadoop@ip-10-0-0-58 ~]$ 

Tearing down cluster resources

For terminating clusters to avoid incurring extra costs, run the bash script teardown_resources.sh. This checks for existing list of active clusters (in waiting status) and terminates them.

1
2
3
4
5
6
$ sh aws_etl/datasets/teardown_resources.sh 

Deleting cluster with id: j-1DE5WQUU4DNHZ
Running command: 'emr terminate-clusters --cluster-id j-1DE5WQUU4DNHZ' ....
Done

If no active clusters, then should return the following message

1
2
3
4
$ sh aws_etl/datasets/teardown_resources.sh 

No clusters to be deleted.

This post is licensed under CC BY 4.0 by the author.

Orchestrating ETL workflow using AWS Step Function

Using AWS Data Pipeline to automate movement of data between AWS services

Comments powered by Disqus.