AWS Big Data Blog
Analyze Realtime Data from Amazon Kinesis Streams Using Zeppelin and Spark Streaming
Manjeet Chayel is a Solutions Architect with AWS
Streaming data is everywhere. This includes clickstream data, data from sensors, data emitted from billions of IoT devices, and more. Not surprisingly, data scientists want to analyze and explore these data streams in real time. This post shows you how you can use Spark Streaming to process data coming from Amazon Kinesis streams, build some graphs using Zeppelin, and then store the Zeppelin notebook in Amazon S3.
Zeppelin overview
Apache Zeppelin is an open source GUI which creates interactive and collaborative notebooks for data exploration using Spark. You can use Scala, Python, SQL (using Spark SQL), or HiveQL to manipulate data and quickly visualize results.
Zeppelin notebooks can be shared among several users, and visualizations can be published to external dashboards. Zeppelin uses the Spark settings on your cluster and can use Spark’s dynamic allocation of executors to let YARN estimate the optimal resource consumption.
With the latest Zeppelin release (0.5.6) included on Amazon EMR 4.7.0, you can now import notes using links to S3 JSON files, raw file URLs in GitHub, or local files. You can also download a note as a JSON file. This new functionality makes it easier to save and share Zeppelin notes, and it allows you to version your notes during development. The import feature is located on the Zeppelin home screen, and the export feature is located on the toolbar for each note.
Additionally, you can still configure Zeppelin to store its entire notebook file in S3 by adding a configuration for zeppelin-env when creating your cluster (just make sure you have already created the bucket in S3 before creating your cluster).
Streaming data walkthrough
To use this post to play around with streaming data, you need an AWS account and AWS CLI configured on your machine. The entire pattern can be implemented in few simple steps:
- Create an Amazon Kinesis stream.
- Spin up an EMR cluster with Hadoop, Spark, and Zeppelin applications from advanced options.
- Use a Simple Java producer to push random IoT events data into the Amazon Kinesis stream.
- Connect to the Zeppelin notebook.
- Import the Zeppelin notebook from GitHub.
- Analyze and visualize the streaming data.
We’ll look at each of these steps below.
Create an Amazon Kinesis stream
First, create a simple Amazon Kinesis stream, “spark-demo,” with two shards. For more information, see the AWS documentation for Creating a Stream.
Spin up an EMR cluster with Hadoop, Spark, and Zeppelin
Edit the software settings for Zeppelin by copying and pasting the configuration below. Replace the bucket name “demo-s3-bucket” with your S3 bucket name.
Notes:
- You do not have to specify S3://. This configuration sets S3 as the notebook storage location and adds the Amazon Kinesis Client Library (KCL) to the environment.
- You need the following folder structure on S3:
It takes a few minutes for the cluster to start and change to the “Waiting” state.
While this is happening, you can configure your machine to view web interfaces on the cluster. For more information, see View Web Interfaces Hosted on Amazon EMR Clusters.
Use a simple Java producer to push random IoT events into the Amazon Kinesis stream
I have implemented a simple Java producer application, using the Kinesis Producer Library, which ingests random IoT sensor data into the “spark-demo” Amazon Kinesis stream.
Download the JAR and run it from your laptop or EC2 instance (this requires Java8):
java –jar KinesisProducer.jar
Data is pushed in CSV format:
device_id,temperature,timestamp
Note: If you are using an EC2 instance, make sure that it has the required permissions to push the data into the Amazon Kinesis stream.
Connect to the Zeppelin notebook
There are several ways to connect to the UI on the master node. One method is to use a proxy extension to the browser. To learn how, see Option 2, Part 2: Configure Proxy Settings to View Websites Hosted on the Master Node.
To reach the web interfaces, you must establish an SSH tunnel with the master node using either dynamic or local port forwarding. If you establish an SSH tunnel using dynamic port forwarding, you must also configure a proxy server to view the web interface.
The following command opens dynamic port forwarding on port 8157 to the EMR master node. After running it, enable FoxyProxy on your browser using the steps in Configure FoxyProxy for Firefox.
ssh -i <<YOUR-KEY-PAIR>> -ND 8157 hadoop@<<EMR-MASTER-DNS>>>
Import the Zeppelin notebook from GitHub
In Zeppelin, choose Import note and Add from URL to import the notebook from the AWS Big Data blog GitHub repository.
Analyze and visualize streaming data
After you import the notebook, you’ll see a few lines of code and some sample SQL as paragraphs. The code in the notebook reads the data from your “spark-demo” Amazon Kinesis stream in batches of 5 seconds (this period can be modified) and stores the data into a temporary Spark table.
After the streaming context has started, Spark starts reading data from streams and populates the temporary table. You can run your SQL queries on this table.
import … val endpointUrl = "https://kinesis.us-east-1.amazonaws.com" val credentials = new DefaultAWSCredentialsProviderChain().getCredentials() require(credentials != null, "No AWS credentials found. Please specify credentials using one of the methods specified " + "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html") val kinesisClient = new AmazonKinesisClient(credentials) kinesisClient.setEndpoint("https://kinesis.us-east-1.amazonaws.com") val numShards = kinesisClient.describeStream("spark-demo").getStreamDescription().getShards().size val numStreams = numShards //Setting batch interval to 5 seconds val batchInterval = Seconds(5) val kinesisCheckpointInterval = batchInterval val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() val ssc = new StreamingContext(sc, batchInterval) // Create the DStreams val kinesisStreams = (0 until numStreams).map { i => KinesisUtils.createStream(ssc, "app-spark-demo", "spark-demo", endpointUrl, regionName,InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) } // Union all the streams val unionStreams = ssc.union(kinesisStreams) //Schema of the incoming data on the stream val schemaString = "device_id,temperature,timestamp" //Parse the data in DStreams val tableSchema = StructType( schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true))) //Processing each RDD and storing it in temporary table unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { val rowRDD = rdd.map(w => Row.fromSeq(new String(w).split(","))) val wordsDF = sqlContext.createDataFrame(rowRDD,tableSchema) wordsDF.registerTempTable("realTimeTable") })
Example SQL:
%sql SELECT device_id,timestamp, avg(temperature) AS avg_temp FROM realtimetable GROUP BY device_id,timestamp ORDER BY timestamp
You can also use pie charts.
To modify the processing logic in the foreachRDD block, gracefully stop the streaming context, re-run the foreach paragraph, and re-start the streaming context.
Summary
In this post, I’ve showed you how to use Spark Streaming from a Zeppelin notebook and directly analyze the incoming streaming data. After the analysis you can terminate the cluster; the data is available in the S3 bucket that you configured during cluster creation. I hope you’ve seen how easy it is to use Spark Streaming, Amazon Kinesis, and Zeppelin to uncover and share the business intelligence in your streaming data. Please give the process in this post a try and let us know in the comments what your results were!
If you have questions or suggestions, please leave a comment below.
———————————-
Related
Querying Amazon Kinesis Streams Directly with SQL and Spark Streaming
Want to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.