AWS Big Data Blog

Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy

The adoption of Apache Spark has increased significantly over the past few years, and running Spark-based application pipelines is the new normal. Spark jobs that are in an ETL (extract, transform, and load) pipeline have different requirements—you must handle dependencies in the jobs, maintain order during executions, and run multiple jobs in parallel. In most of these cases, you can use workflow scheduler tools like Apache Oozie, Apache Airflow, and even Cron to fulfill these requirements.

Apache Oozie is a widely used workflow scheduler system for Hadoop-based jobs. However, its limited UI capabilities, lack of integration with other services, and heavy XML dependency might not be suitable for some users. On the other hand, Apache Airflow comes with a lot of neat features, along with powerful UI and monitoring capabilities and integration with several AWS and third-party services. However, with Airflow, you do need to provision and manage the Airflow server. The Cron utility is a powerful job scheduler. But it doesn’t give you much visibility into the job details, and creating a workflow using Cron jobs can be challenging.

What if you have a simple use case, in which you want to run a few Spark jobs in a specific order, but you don’t want to spend time orchestrating those jobs or maintaining a separate application? You can do that today in a serverless fashion using AWS Step Functions. You can create the entire workflow in AWS Step Functions and interact with Spark on Amazon EMR through Apache Livy.

In this post, I walk you through a list of steps to orchestrate a serverless Spark-based ETL pipeline using AWS Step Functions and Apache Livy.

Input data

For the source data for this post, I use the New York City Taxi and Limousine Commission (TLC) trip record data. For a description of the data, see this detailed dictionary of the taxi data. In this example, we’ll work mainly with the following three columns for the Spark jobs.

Column name Column description
RateCodeID Represents the rate code in effect at the end of the trip (for example, 1 for standard rate, 2 for JFK airport, 3 for Newark airport, and so on).
FareAmount Represents the time-and-distance fare calculated by the meter.
TripDistance Represents the elapsed trip distance in miles reported by the taxi meter.

The trip data is in comma-separated values (CSV) format with the first row as a header. To shorten the Spark execution time, I trimmed the large input data to only 20,000 rows. During the deployment phase, the input file tripdata.csv is stored in Amazon S3 in the <<your-bucket>>/emr-step-functions/input/ folder.

The following image shows a sample of the trip data:

Solution overview

The next few sections describe how Spark jobs are created for this solution, how you can interact with Spark using Apache Livy, and how you can use AWS Step Functions to create orchestrations for these Spark applications.

At a high level, the solution includes the following steps:

  1. Trigger the AWS Step Function state machine by passing the input file path.
  2. The first stage in the state machine triggers an AWS Lambda
  3. The Lambda function interacts with Apache Spark running on Amazon EMR using Apache Livy, and submits a Spark job.
  4. The state machine waits a few seconds before checking the Spark job status.
  5. Based on the job status, the state machine moves to the success or failure state.
  6. Subsequent Spark jobs are submitted using the same approach.
  7. The state machine waits a few seconds for the job to finish.
  8. The job finishes, and the state machine updates with its final status.

Let’s take a look at the Spark application that is used for this solution.

Spark jobs

For this example, I built a Spark jar named spark-taxi.jar. It has two different Spark applications:

  1. MilesPerRateCode – The first job that runs on the Amazon EMR cluster. This job reads the trip data from an input source and computes the total trip distance for each rate code. The output of this job consists of two columns and is stored in Apache Parquet format in the output path.

The following are the expected output columns:

  • rate_code – Represents the rate code for the trip.
  • total_distance – Represents the total trip distance for that rate code (for example, sum(trip_distance)).
  1. RateCodeStatus – The second job that runs on the EMR cluster, but only if the first job finishes successfully. This job depends on two different input sets:
  • csv – The same trip data that is used for the first Spark job.
  • miles-per-rate – The output of the first job.

This job first reads the tripdata.csv file and aggregates the fare_amount by the rate_code. After this point, you have two different datasets, both aggregated by rate_code. Finally, the job uses the rate_code field to join two datasets and output the entire rate code status in a single CSV file.

The output columns are as follows:

  • rate_code_id – Represents the rate code type.
  • total_distance – Derived from first Spark job and represents the total trip distance.
  • total_fare_amount – A new field that is generated during the second Spark application, representing the total fare amount by the rate code type.

Note that in this case, you don’t need to run two different Spark jobs to generate that output. The goal of setting up the jobs in this way is just to create a dependency between the two jobs and use them within AWS Step Functions.

Both Spark applications take one input argument called rootPath. It’s the S3 location where the Spark job is stored along with input and output data. Here is a sample of the final output:

The next section discusses how you can use Apache Livy to interact with Spark applications that are running on Amazon EMR.

Using Apache Livy to interact with Apache Spark

Apache Livy provides a REST interface to interact with Spark running on an EMR cluster. Livy is included in Amazon EMR release version 5.9.0 and later. In this post, I use Livy to submit Spark jobs and retrieve job status. When Amazon EMR is launched with Livy installed, the EMR master node becomes the endpoint for Livy, and it starts listening on port 8998 by default. Livy provides APIs to interact with Spark.

Let’s look at a couple of examples how you can interact with Spark running on Amazon EMR using Livy.

To list active running jobs, you can execute the following from the EMR master node:

curl localhost:8998/sessions

If you want to do the same from a remote instance, just change localhost to the EMR hostname, as in the following (port 8998 must be open to that remote instance through the security group):

curl http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/sessions

To submit a Spark job through Livy pointing to the application jar in S3, you can do the following:

curl -X POST --data '{"file": "s3://<<bucket-location>/spark.jar", "className": "com.example.SparkApp "]}' -H "Content-Type: application/json" http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/batches

Spark submit through Livy returns a session ID that starts from 0. Using that session ID, you can retrieve the status of the job as shown following:

curl http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/batches/0 | python -m json.tool

Through Spark submit, you can pass multiple arguments for the Spark job and Spark configuration settings. You can also do that using Livy, by passing the S3 path through the args parameter, as shown following:

curl -X POST --data '{"file": "s3://<<bucket-location>>/spark.jar", "className": "com.example.SparkApp", “args”: [“s3://bucket-path”]}' -H "Content-Type: application/json" http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/batches

All Apache Livy REST calls return a response as JSON, as shown in the following image:

If you want to pretty-print that JSON response, you can pipe command with Python’s JSON tool as follows:

curl http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/batches/3 | python -m json.tool

For a detailed list of Livy APIs, see the Apache Livy REST API page. This post uses GET /batches and POST /batches.

In the next section, you create a state machine and orchestrate Spark applications using AWS Step Functions.

Using AWS Step Functions to create a Spark job workflow

AWS Step Functions automatically triggers and tracks each step and retries when it encounters errors. So your application executes in order and as expected every time. To create a Spark job workflow using AWS Step Functions, you first create a Lambda state machine using different types of states to create the entire workflow.

First, you use the Task state—a simple state in AWS Step Functions that performs a single unit of work. You also use the Wait state to delay the state machine from continuing for a specified time. Later, you use the Choice state to add branching logic to a state machine.

The following is a quick summary of how to use different states in the state machine to create the Spark ETL pipeline:

  • Task state – Invokes a Lambda function. The first Task state submits the Spark job on Amazon EMR, and the next Task state is used to retrieve the previous Spark job status.
  • Wait state – Pauses the state machine until a job completes execution.
  • Choice state – Each Spark job execution can return a failure, an error, or a success state So, in the state machine, you use the Choice state to create a rule that specifies the next action or step based on the success or failure of the previous step.

Here is one of my Task states, MilesPerRateCode, which simply submits a Spark job:

"MilesPerRate Job": {
      "Type": "Task",
      "Resource":"arn:aws:lambda:us-east-1:xxxxxx:function:blog-miles-per-rate-job-submit-function",
      "ResultPath": "$.jobId",
      "Next": "Wait for MilesPerRate job to complete"
 }

This Task state configuration specifies the Lambda function to execute. Inside the Lambda function, it submits a Spark job through Livy using Livy’s POST API. Using ResultPath, it tells the state machine where to place the result of the executing task. As discussed in the previous section, Spark submit returns the session ID, which is captured with $.jobId and used in a later state.

The following code section shows the Lambda function, which is used to submit the MilesPerRateCode job. It uses the Python request library to submit a POST against the Livy endpoint hosted on Amazon EMR and passes the required parameters in JSON format through payload. It then parses the response, grabs id from the response, and returns it. The Next field tells the state machine which state to go to next.

from botocore.vendored import requests
import json

def lambda_handler(event, context):
  headers = { "content-type": "application/json" }
  url = 'http://xxxxxx.compute-1.amazonaws.com:8998/batches'
  payload = {
    'file' : 's3://<<s3-location>>/emr-step-functions/spark-taxi.jar',
    'className' : 'com.example.MilesPerRateCode',
    'args' : [event.get('rootPath')]
  }
  res = requests.post(url, data = json.dumps(payload), headers = headers, verify = False)
  json_data = json.loads(res.text)
  return json_data.get('id')

Just like in the MilesPerRate job, another state submits the RateCodeStatus job, but it executes only when all previous jobs have completed successfully.

Here is the Task state in the state machine that checks the Spark job status:

"Query RateCodeStatus job status": {
   "Type": "Task",
   "Resource": "arn:aws:lambda:us-east-1:xxxxx:function:blog-spark-job-status-function",
   "Next": "RateCodeStatus job complete?",
   "ResultPath": "$.jobStatus"
}

Just like other states, the preceding Task executes a Lambda function, captures the result (represented by jobStatus), and passes it to the next state. The following is the Lambda function that checks the Spark job status based on a given session ID:

from botocore.vendored import requests
import json

def lambda_handler(event, context):
    jobid = event.get('jobId')
    url     = 'http://xxxx.compute-1.amazonaws.com:8998/batches/' + str(jobid)
    res = requests.get(url)
    json_data = json.loads(res.text)
    return json_data.get('state')

In the Choice state, it checks the Spark job status value, compares it with a predefined state status, and transitions the state based on the result. For example, if the status is success, move to the next state (RateCodeJobStatus job), and if it is dead, move to the MilesPerRate job failed state.

"MilesPerRate job complete?": {
   "Type": "Choice",
   "Choices": [{
       "Variable": "$.jobStatus",
       "StringEquals": "success",
       "Next": "RateCodeStatus Job"
   },{
       "Variable": "$.jobStatus",
       "StringEquals": "dead",
       "Next": "MilesPerRate job failed"
   }],
   "Default": "Wait for MilesPerRate job to complete"
}

Walkthrough using AWS CloudFormation

To set up this entire solution, you need to create a few AWS resources. To make it easier, I have created an AWS CloudFormation template. This template creates all the required AWS resources and configures all the resources that are needed to create a Spark-based ETL pipeline on AWS Step Functions.

This CloudFormation template requires you to pass the following four parameters during initiation.

Parameter Description
ClusterSubnetID The subnet where the Amazon EMR cluster is deployed and Lambda is configured to talk to this subnet.
KeyName The name of the existing EC2 key pair to access the Amazon EMR cluster.
VPCID The ID of the virtual private cloud (VPC) where the EMR cluster is deployed and Lambda is configured to talk to this VPC.
S3RootPath The Amazon S3 path where all required files (input file, Spark job, and so on) are stored and the resulting data is written.

IMPORTANT: These templates are designed only to show how you can create a Spark-based ETL pipeline on AWS Step Functions using Apache Livy. They are not intended for production use without modification. And if you try this solution outside of the us-east-1 Region, download the necessary files from CloudFormation template location, upload the files to the buckets in your Region, edit the script as appropriate, and then run it.

To launch the CloudFormation stack, choose Launch Stack:

Launching this stack creates the following list of AWS resources.

Logical ID Resource Type Description
StepFunctionsStateExecutionRole IAM role IAM role to execute the state machine and have a trust relationship with the states service.
SparkETLStateMachine AWS Step Functions state machine State machine in AWS Step Functions for the Spark ETL workflow.
LambdaSecurityGroup Amazon EC2 security group Security group that is used for the Lambda function to call the Livy API.

RateCodeStatusJobSubmitFunction

 

AWS Lambda function Lambda function to submit the RateCodeStatus job.
MilesPerRateJobSubmitFunction AWS Lambda function Lambda function to submit the MilesPerRate job.
SparkJobStatusFunction AWS Lambda function Lambda function to check the Spark job status.
LambdaStateMachineRole IAM role IAM role for all Lambda functions to use the lambda trust relationship.
EMRCluster Amazon EMR cluster EMR cluster where Livy is running and where the job is placed.

During the AWS CloudFormation deployment phase, it sets up S3 paths for input and output. Input files are stored in the <<s3-root-path>>/emr-step-functions/input/ path, whereas spark-taxi.jar is copied under <<s3-root-path>>/emr-step-functions/.

The following screenshot shows how the S3 paths are configured after deployment. In this example, I passed a bucket that I created in the AWS account s3://tm-app-demos for the S3 root path. Please change this to your bucket name when you run this solution in your account.

If the CloudFormation template completed successfully, you will see Spark-ETL-State-Machine in the AWS Step Functions dashboard, as follows:

Choose the Spark-ETL-State-Machine state machine to take a look at this implementation. The AWS CloudFormation template built the entire state machine along with its dependent Lambda functions, which are now ready to be executed.

On the dashboard, choose the newly created state machine, and then choose New execution to initiate the state machine. It asks you to pass input in JSON format. This input goes to the first state MilesPerRate Job, which eventually executes the Lambda function blog-miles-per-rate-job-submit-function.

Pass the S3 root path as input:

{

“rootPath”: “s3://tm-app-demos”

}

Then choose Start Execution:

The rootPath value is the same value that was passed when creating the CloudFormation stack. It can be an S3 bucket location or a bucket with prefixes, but it should be the same value that is used for AWS CloudFormation. This value tells the state machine where it can find the Spark jar and input file, and where it will write output files. After the state machine starts, each state/task is executed based on its definition in the state machine.

At a high level, the following represents the flow of events:

  1. Execute the first Spark job, MilesPerRate.
  2. The Spark job reads the input file from the location <<rootPath>>/emr-step-functions/input/tripdata.csv. If the job finishes successfully, it writes the output data to <<rootPath>>/emr-step-functions/miles-per-rate.
  3. If the Spark job fails, it transitions to the error state MilesPerRate job failed, and the state machine stops. If the Spark job finishes successfully, it transitions to the RateCodeStatus Job state, and the second Spark job is executed.
  4. If the second Spark job fails, it transitions to the error state RateCodeStatus job failed, and the state machine stops with the Failed status.
  5. If this Spark job completes successfully, it writes the final output data to the <<rootPath>>/emr-step-functions/rate-code-status/ It also transitions the RateCodeStatus job finished state, and the state machine ends its execution with the Success status.

This following screenshot shows a successfully completed Spark ETL state machine:

The right side of the state machine diagram shows the details of individual states with their input and output.

When you execute the state machine for the second time, it fails because the S3 path already exists. The state machine turns red and stops at MilePerRate job failed. The following image represents that failed execution of the state machine:

You can also check your Spark application status and logs by going to the Amazon EMR console and viewing the Application history tab:

I hope this walkthrough paints a picture of how you can create a serverless solution for orchestrating Spark jobs on Amazon EMR using AWS Step Functions and Apache Livy. In the next section, I share some ideas for making this solution even more elegant.

Next steps

The goal of this post is to show a simple example that uses AWS Step Functions to create an orchestration for Spark-based jobs in a serverless fashion. To make this solution robust and production ready, you can explore the following options:

  • In this example, I manually initiated the state machine by passing the rootPath as input. You can instead trigger the state machine automatically. To run the ETL pipeline as soon as the files arrive in your S3 bucket, you can pass the new file path to the state machine. Because CloudWatch Events supports AWS Step Functions as a target, you can create a CloudWatch rule for an S3 event. You can then set AWS Step Functions as a target and pass the new file path to your state machine. You’re all set!
  • You can also improve this solution by adding an alerting mechanism in case of failures. To do this, create a Lambda function that sends an alert email and assigns that Lambda function to a Fail That way, when any part of your state fails, it triggers an email and notifies the user.
  • If you want to submit multiple Spark jobs in parallel, you can use the Parallel state type in AWS Step Functions. The Parallel state is used to create parallel branches of execution in your state machine.

With Lambda and AWS Step Functions, you can create a very robust serverless orchestration for your big data workload.

Cleaning up

When you’ve finished testing this solution, remember to clean up all those AWS resources that you created using AWS CloudFormation. Use the AWS CloudFormation console or AWS CLI to delete the stack named Blog-Spark-ETL-Step-Functions.

Summary

In this post, I showed you how to use AWS Step Functions to orchestrate your Spark jobs that are running on Amazon EMR. You used Apache Livy to submit jobs to Spark from a Lambda function and created a workflow for your Spark jobs, maintaining a specific order for job execution and triggering different AWS events based on your job’s outcome.
Go ahead—give this solution a try, and share your experience with us!

 


Additional Reading

If you found this post useful, be sure to check out Building a Real World Evidence Platform on AWS and Building High-Throughput Genomics Batch Workflows on AWS: Workflow Layer (Part 4 of 4).


About the Author

Tanzir Musabbir is an EMR Specialist Solutions Architect with AWS. He is an early adopter of open source Big Data technologies. At AWS, he works with our customers to provide them architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena & AWS Glue. Tanzir is a big Real Madrid fan and he loves to travel in his free time.