AWS Big Data Blog

Automating EMR workloads using AWS Step Functions

Amazon EMR allows you to process vast amounts of data quickly and cost-effectively at scale. Using open-source tools such as Apache Spark, Apache Hive, and Presto, and coupled with the scalable storage of Amazon Simple Storage Service (Amazon S3), Amazon EMR gives analytical teams the engines and elasticity to run petabyte-scale analysis for a fraction of the cost of traditional on-premises clusters. Developers and analysts can use Jupyter-based Amazon EMR notebooks for iterative development, collaboration, and access to data stored across AWS data products.

What happens if you have Amazon EMR code that needs to run automatically on a regular basis? Maybe the job only runs when for certain events, like new data arriving in Amazon S3. Or maybe you want to run a job every Friday afternoon at 2:00 PM. What if there is a multiple step process?

To run Amazon EMR workloads on a schedule, you can automate everything with AWS Step Functions. This post walks through how to use Step Functions state machines and the callback pattern to automate EMR jobs. You can download the code examples from the GitHub repo.

Prerequisites

To follow along with this walkthrough, you must have the following:

Solution overview

For this use case, I want to run two applications on my EMR cluster. The start of the second application depends on the successful completion and output of the first. At a high level, I want to launch an EMR cluster automatically, run the code, and remove the cluster. Specifically, when the first program successfully completes, I want to run the second program.

At the conclusion of the second application, in some cases I may want to run both programs multiple times (with different dataset sizes, perhaps). I need a way to decide to run the process again with the same cluster. Whether the steps succeed or fail, at the conclusion, I always want to delete the CloudFormation stack that contains my EMR cluster to reduce cost. The following diagram illustrates this high-level overview of the pipeline operation.

Workflow details

I run two programs, and I need the first program to complete before running the second one. I optionally want to repeat those two programs with different datasets to get the final state of the data. To orchestrate the jobs, I can run through the same steps multiple times with the same active EMR cluster.

To facilitate automating the pipeline, I use an inner state machine to check the cluster status and submit EMR job steps. I then wrap that inner state machine in an outer state machine. The outer state machine starts the cluster and submits information to the inner state machine. It waits for all steps to complete, then deletes the EMR cluster.

The following flow chart illustrates the steps and checkpoints that make up the pipeline.

Deploying the pipeline state machines

To simplify pipeline deployment, I use AWS SAM, an open-source framework for building serverless applications. AWS SAM provides a single deployment configuration, extensions to CloudFormation templates, built-in best practices, and local debugging and testing. You can use AWS SAM with a suite of AWS tools for building serverless applications. For more information, see What Is the AWS Serverless Application Model (AWS SAM)?

Initiating the application

Navigate to the path where you want to download the files and initiate the AWS SAM application. I want to run the code from my local machine and have created the following location:

~/Documents/projects/blog/steppipeline/automation-ml-step-data-pipeline

From this directory, I initialize the application using sam init. This connects to the repository and downloads the files for creation of the ML pipeline. See the following code:

sam init -l https://github.com/aws-samples/automation-ml-step-data-pipeline.git

Creating the S3 bucket and moving dependencies

For this post, I orchestrate an existing process from the post Anomaly Detection Using PySpark, Hive, and Hue on Amazon EMR, which runs on Amazon EMR. The pipeline reads code artifacts from Amazon S3, where the EMR cluster has read permission. There are two programs: kmeansandey.py and kmeanswsssey.py.

First, create the bucket from the command line using the aws s3 mb command and upload the code. Your bucket name must be globally unique:

aws s3 mb s3://<your bucket name>

Move the artifacts to your bucket, replacing <your bucket name> with your bucket name:

aws s3 cp sample_ml_code/kmeansandey.py s3://<your bucket name>/testcode/kmeansandey.py
aws s3 cp sample_ml_code/kmeanswsssey.py s3://<your bucket name>/testcode/kmeanswsssey.py
aws s3 cp emr/bootstrapactions.sh s3://<your bucket name>/emr-bootstrap-scripts/bootstrapactions.sh
aws s3 cp emr/emr-cluster-config.json s3://<your bucket name>/emr-cluster-config.json
aws s3 cp emr/emr-cluster-sample.yaml s3://<your bucket name>/emr-cluster-sample.yaml

Deploying the application

Deploy the build artifacts to the AWS Cloud using the following code:

sam deploy --guided

AWS SAM prompts you for the parameters that you need to build and deploy the application. I have provided some default values where possible.

The final output of your deployment process should indicate that all stacks were built:

Successfully created/updated stack - step-pipeline in us-east-1

After deployment, you receive an email to confirm your subscription. Choose the confirmation link in the email to receive pipeline notifications.

Submitting a workload to your Step Functions state machine

To create a cluster and submit EMR jobs, the outer state machine needs a JSON payload. This contains the location of the programs in Amazon S3, the Amazon EMR CloudFormation template, and the parameter files used to launch the EMR cluster.

Creating an Amazon EC2 key pair

To use the same sample programs and EMR cluster template that you used to test your pipeline, you need to use an Amazon EC2 key pair for SSH credentials. When you create a cluster, you can specify the Amazon Elastic Compute Cloud (Amazon EC2) key pair to use for SSH connections to all cluster instances. The name of the keypair for this cluster is referenced in the emr-cluster-config.json file. See the following code:

<snip>
  {
    "ParameterKey": "Keyname",
    "ParameterValue": "emrcluster-launch"
  },
</snip>

To use the example as-is with the parameters unchanged, create an Amazon EC2 key pair on the AWS Management Console or AWS Command Line Interface (AWS CLI).

  1. On the Amazon EC2 console, under Network & Security, choose Key Pairs.
  2. On the Key Pairs page, choose Create Key Pair.
  3. For Key pair name, enter emrcluster-launch.
  4. Choose Create.
  5. When the console prompts you to save the private key file, save it in a safe place.

This is the only chance for you to save the private key file.

Inputting JSON for launching the pipeline

The simplest way for you to run the pipeline is to use the Start execution feature on the Step Functions console. The console gives you full functionality to initiate the function and submit a payload. In the example test_input.json, update the bucket values, security group, and subnet with the information for your account:

{  
    "ModelName": "PipelineTest_01",  
    "ModelProgram": "s3://<your bucket name>/testcode/kmeansandey.py",  
    "PreProcessingProgram": "s3://<your bucket name>/testcode/kmeanswsssey.py",  
    "EMRCloudFormation": "https://s3.amazonaws.com/<your bucket name>/emr-cluster-sample.yaml",  
    "EMRParameters": "https://s3.amazonaws.com/<your bucket name>/emr-cluster-config.json",  
    "JobInput": "s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/",  
    "SecurityGroup": "<your security group>",  
    "SubNet": "<your subnet>",  
    "ClusterSize": "4",  
    "ProcessingMode": ["TRAINING"]
}

The payload includes the following information:

  • ModelName – A short descriptive identifier used to identify the transient EMR cluster created during this process. This name shows on the Amazon EMR console for easy identification.
  • ModelProgram – The Amazon S3 URL location of the program that runs when the model initiates on the EMR cluster (step 3).
  • PreProcessingProgram – The Amazon S3 URL location of the program that runs when preprocessing initiates on the EMR cluster (step 2).
  • EMRCloudFormation – The S3 bucket HTTPS location of the CloudFormation template for launching the transient EMR cluster.
  • EMRParameters – The Amazon S3 HTTPS location of the parameter file supporting the Amazon EMR CloudFormation template.
  • JobInput – The Amazon S3 URL location of the input data for the preprocessing program.
  • SecurityGroup – The security group with ingress and egress rules for the launched EMR cluster
  • SubNet – The subnet identifier where you place your EMR cluster.
  • ClusterSize – Denotes the number of EMR cluster nodes to run the job and can be changed based on the compute need. I use 4 nodes as the input value for the sample program.
  • ProcessingMode – This is an array of values. The pipeline runs steps 2 and 3 for each value in the array. The value is passed into the program unchanged and can be used to internally control how the program runs. For this use case, it runs a single time on the small dataset.

Opening the Step Functions Executions page

On the Step Functions console, choose MLStateMachine. This is the outer state machine. On the detail page for the outer state machine, choose Start execution.

Entering your payload

On the New execution page, enter the JSON for your pipeline based on the example test_input.json. Choose Start execution.

Reviewing the workflow as it runs

You can see the pipeline running in the visual workflow and review the event history on the detail page. The following diagram shows the state machine definition used:

Diving into the pipelines details

There are four processes that run in the outer state machine pipeline:

  1. Step 1 launches an EMR cluster using the CloudFormation template. The AWS Lambda function downloads the template and parameter file from the specified Amazon S3 location and initiates the stack build.
  2. When the EMR cluster is ready, step 2 initiates the first set of code against the newly created EMR cluster, passing in the remaining parameters to the inner state machine. It adds the stack id, EMR cluster id, and status to the payload. These values are obtained from the output of the CloudFormation stack. See the following code:
    "ModelName": "PipelineTest_01",  
    "PreProcessingProgram": "s3://<your bucket name>/testcode/kmeanswsssey.py",  
    "JobInput": "s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/",  
    "ClusterSize": "4",  
    "ProcessingMode": ["TRAINING"],
    "StackId": "arn:aws:cloudformation:us-east-1:575444587305:stack/PipelineTest01-auto-emr-02142020041612/bc5fd7a0-4ee0-11ea-a395-0e4c53e0aefd",
    "Status": "CREATE_COMPLETE",
    "ClusterId": "j-MF6LWBLJZ88K"

The code contains the following information:

  • ModelName is used in the EMR cluster name to make it easier to identify in the console and AWS CLI output.
  • PreProcessingProgram in our use case points to the first code step (py). The code is passed through the first state machine and submitted to the second state machine and Amazon EMR.
  • JobInput, ClusterSize, ClusterId, StackId, and ProcessingMode are passthrough values that the program needs to run.

The step initiates the Lambda function awsblog-testproject-inner-sm-AddStepLambda-x45123xxxxxx, which engages the inner state machine asynchronously to run a high-level process of checking the cluster, adding a step, checking to see if the step is complete, and exiting back to the outer state machine when complete.

  1. Next, the outer state machine runs the Model program code (step 3) by submitting it to the Lambda function awsblog-testproject-inner-sm-AddStepLambda-x45123xxxxxx to engage the inner state machine for the second set of code (py). The process is the same as step 2 but the code it runs is from a different file and the output from the preprocessing step becomes the input for the step. See the following code:
    "ModelProgram": "s3://<your bucket name>/testcode/kmeansandey.py",

When the inner state machine is complete, it moves to a step that removes the first value from the ProcessingMode array. For this use case, there is only one value (TRAINING), which is removed, leaving the array empty. The next step in the state machine looks for remaining values; if there are none, it marks all steps as complete and moves to Delete EMR cluster.

  1. The final step in the outer state machine is to remove the EMR cluster. The Delete EMR cluster step passes the CloudFormation stack ID into lambda/delete_cfn_stack.py, initiating the deletion of the stack and cleaning up all the resources.

The output of the test programs is stored in Amazon S3 in two folders under the pipeline artifacts. The preprocessing folder contains data that is used to drive the output in the model folder. The following screenshot shows the folders in Amazon S3.

Conclusion

The Step Functions workflow in this post is a repeatable two-step pipeline. It starts an EMR cluster, runs a program that outputs data, and initiates a second program that depends on the previous job finishing. It then deletes all resources automatically.

You can adapt the workflow to respond to Amazon S3 events, a message received in a queue, a file checked into a code repository, or a schedule. Any event that can invoke Lambda can initiate the pipeline. For more information, see Invoking AWS Lambda functions.

You can download the example code from the GitHub repo and adapt it for your use case. Let me know in the comments what you built for your environment.


About the Authors

Mohammed “Afsar ” Jahangir Ali is a Senior Big Data Consultant with Amazon since January 2018. He is a data enthusiast helping customers shape their data lakes and analytic journeys on AWS.In his spare time, he enjoys taking pictures, listening to music, and spend time with family.

 

 

 

Wendy Neu has worked as a Data Architect with Amazon since January 2015. Prior to joining Amazon, she worked as a consultant in Cincinnati, OH helping customers integrate and manage their data from different unrelated data sources.