AWS Big Data Blog
Persist Streaming Data to Amazon S3 using Amazon Data Firehose and AWS Lambda
February 9, 2024: Amazon Kinesis Data Firehose has been renamed to Amazon Data Firehose. Read the AWS What’s New post to learn more.
Streaming data analytics is becoming main-stream (pun intended) in large enterprises as the technology stacks have become more user-friendly to implement. For example, Spark-Streaming connected to an Amazon Kinesis stream is a typical model for real-time analytics.
But one area that cannot and should not be overlooked is the need to persist streaming data (unchanged) in a reliable and durable fashion – and to do it with ease. This blog post walks you through a simple and effective way to persist data to Amazon S3 from Amazon Kinesis Streams using AWS Lambda and Amazon Firehose, a new managed service from AWS.
Here’s a real use case: Hearst Publishing is a global media company behind well-known brands such as Cosmopolitan, Elle, Esquire, Seventeen, and Car and Driver, as well as television and cable entities such as A&E Networks and Esquire Network.
Hearst has embarked on the big data journey and needs to collect pertinent data from over 200+ digital sites in real time. This data gives invaluable insight into the usage of their sites and indicates the most relevant trending topics based on content. Using these data points, both historical and in real-time, Hearst could monitor and become much more agile in managing the content available to site users by giving key analytical data to content owners.
Hearst chose to use a well-respected cast of characters for an ETL process of streaming data: Streams, Spark on Amazon EMR, and S3. They also realized the need to store the unchanged data right from Streams in parallel to EMR-Spark. In line with the important big data ethic “never throw data away”, all data pulled from Streams was persisted to S3 for historical reasons and so it can be re-processed either by a different consuming team or re-analyzed with a modified processing scheme in Spark. The Amazon Kinesis Client Library (KCL) and Amazon Kinesis Connector codebase provided a consistent and highly configurable way to get data from Streams to S3:
- The KCL has built-in check-pointing for Streams (whether it be TRIM-HORIZON or LATEST).
- The KCL integrates very easily with the Amazon Kinesis connectors.
- The Connectors framework provided a way to transform, buffer, filter, and emit the Amazon Kinesis records to S3 with ease (among other specified AWS services).We can buffer data and write to S3 based on thresholds with number of records, time since last flush, or actual data buffer size limits.
These features make the KCL–Connector (KCL-C) very powerful and useful; it’s a very popular implementation. The KCL-C setup runs on an EC2 instance or fleet of instances and is easily managed with AWS CloudFormation and Auto Scaling. The KCL has become the proven way to manage getting data off Streams. The figure below shows a sample architecture with KCL.
.
Hearst, evaluating their AWS ecosystem, wanted to move as much as possible to AWS-provided services. With a lean development team and a focus on data science, there was an interest in not having to monitor EC2 instances. Thus the question was raised “How can we keep the reliability of KCL-C for our data intact but not have to keep tabs on the EC2 instance? Can’t AWS provide a service to do this so we can focus on data science?”
In short, a perfect use case for Firehose and Lambda unfolded. Looking at the needs of the process, reliability was critical along with the ability to buffer (aggregate) data into larger file sizes and persist to S3. The figure below illustrates a sample architecture with Firehose.
For this post Java is the codebase, but this can also be done in JavaScript. The code is available in its entirety on the AWS Big Data Blog repository on GitHub. Assume all services are set up in the same region. For more information, see the Amazon Data Firehose Getting Started Guide.
Set up the S3 and Streams services
You need to set up a stream (representing the raw data coming in) and an S3 bucket where the data should reside. For more information, see Step 1: Create a Stream and Create a Bucket.
Review the Lambda function with Firehose
This is where the fun happens. Take a look at the code: If you pulled the GitHub repository, this is located in the Java class com.amazonaws.proserv.lambda.KinesisToFirehose.
public class KinesisToFirehose {
private String firehoseEndpointURL = "https://firehose.us-east-1.amazonaws.com";
private String deliveryStreamName = "blogfirehose";
private String deliveryStreamRoleARN = "arn:aws:iam::<AWS Acct Id>:role/firehose_blog_role";
private String targetBucketARN = "arn:aws:s3:::dgraeberaws-blogs";
private String targetPrefix = "blogoutput/";
private int intervalInSec = 60;
private int buffSizeInMB = 2;
private AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient();
private LambdaLogger logger;
public void kinesisHandler(KinesisEvent event, Context context){
logger = context.getLogger();
setup();
for(KinesisEvent.KinesisEventRecord rec : event.getRecords()) {
logger.log("Got message ");
String msg = new String(rec.getKinesis().getData().array())+"n";
Record deliveryStreamRecord = new Record().withData (ByteBuffer.wrap(msg.getBytes()));
PutRecordRequest putRecordRequest = new PutRecordRequest()
.withDeliveryStreamName(deliveryStreamName)
.withRecord(deliveryStreamRecord);
logger.log("Putting message");
firehoseClient.putRecord(putRecordRequest);
logger.log("Successful Put");
}
}
...
}
The following private instance variables should be configured with your particular naming conventions:
- firehoseEndpointURL – The AWS endpoint where the Firehose delivery stream is hosted. Typically, you keep the Lambda function and delivery stream in the same region.
- deliveryStreamName – The actual name of the Firehose delivery stream that you are using.
- deliveryStreamRoleARN – The AWS ARN of the role which you want the Firehose delivery stream to use when writing to S3. You will create this role via the console later in this post.
- targetBucketARN – The AWS ARN of the bucket to which you want Firehose to write.
- targetPrefix – When writing to the S3 bucket and segmenting the object key with a prefix, add the segment in this variable. (At the time of this post, if you want a ‘/’ separator, you need to add it in this variable, for example, ‘somesegment/’.)
- intervalInSec – A buffer for time lapse. Firehose pushes to S3 if this threshold has been met after the last write.
- bufferSizeInMB – A buffer for aggregated payload size. Firehose pushes to S3 if this threshold has been met after the last write.
This Lambda function is configured to create the Firehose delivery stream if it does not already exist. In this post, you create the delivery stream manually from the console, being careful to have the proper private instance variable (above) set in the Lambda function to reflect the Firehose delivery stream thus created.
Create the Firehose delivery stream
Now, you can create the Firehose delivery stream using the console. For more information, see Amazon Data Firehose Getting Started Guide.
In this post, I assume that you do not have a role created that gives you Firehose delivery stream access, so you can create one now. In the list for IAM role*, choose Create new Firehose delivery IAM role.
For reference, the policy associated with the role is similar to the one below:
Permission policy (Firehose role)
{
"Version": "2012-10-17",
"Statement":
[
{
"Sid": "StmtDemo1",
"Effect": "Allow",
"Action":
["s3:AbortMultipartUpload","s3:GetBucketLocation","s3:GetObject",
"s3:ListBucket","s3:ListBucketMultipartUploads","s3:PutObject"
],
"Resource":
["arn:aws:s3:::*"]
},
{
"Sid": "StmtDemo2",
"Effect": "Allow",
"Action": ["kms:Decrypt","kms:Encrypt"],
"Resource": ["*"]
}
]
}
Trust policy (Firehose role)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "StmtDemo3",
"Effect": "Allow",
"Principal": {"Service": "firehose.amazonaws.com"},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": “YOURACCTID">
}
}
}
]
}
Finish configuring the Firehose delivery stream with the indicated configuration. For this post, set limits of 2 MB and 60 seconds for the buffer size and buffer interval, respectively.
NOTE: For this post, you will not be compressing or encrypting the data when writing to S3 from Firehose. Your actual implementation may vary.
To summarize the configuration, you are:
- Defining a name for the Firehose delivery stream
- Defining the targeted S3 bucket for output
- Adding an S3 prefix to the bucket
- Defining the buffer thresholds – in this case, they are 60 seconds and 2 MB (whichever comes first)
- Not compressing or encrypting the output data
Create the Lambda JAR distribution
Verify that your instance variables match between your Lambda function and your newly created Firehose delivery stream. Create the JAR file that Lambda will need. Because this is a Java project with Maven, execute the mvn clean package task from your project root directory. Lambda runs Java 8, so you need to compile against the Java 8 JDK.
Create the Lambda function
Now that you have the Lambda code ready to run, create the function itself. You can do this via CLI or console. For more information, see Getting Started: Authoring AWS Lambda Code in Java.
When you create the Lambda role that has a Lambda trust relationship, make sure that the policy has access to both Firehose and Streams. Here is an example:
Permissions policy (Lambda role)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["logs:*"],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": ["kinesis:*","firehose:*"],
"Resource": ["arn:aws:kinesis:*:*:*","arn:aws:firehose:*:*:*"]
}
]
}
Trust policy (Lambda role)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "StmtDemo4",
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole"
}
]
}
I won’t cover creating a Lambda function in depth in this post, but here are the highlights:
- Use the newly-created .jar file:The HANDLER value should be com.amazonaws.proserv.lambda.KinesisToFirehose::kinesisHandler
- Use the role you just created with the policy access to Firehose and Streams and the Lambda trust relationship (directly above).
- Use the defaults for Memory and Timeout.
After the upload, the Lambda function is in place; all you need to do is set the listener.
On the Event Sources tab under your new Lambda function, add an event source that is the Amazon Kinesis stream you created earlier. Select a Streams input, add your stream name, and leave the defaults. You are now connected.
Populate streams and verify results
The only thing left to do is add data to the stream and watch the S3 bucket fill up. In the Java project from Git, a helper class pumps dummy messages to Streams (com.amazonaws.proserv.PopulateKinesisData). If you are running it from your local repository, add your access key information to the resources/AwsCredentials.properties file. If you are running it from EC2, make sure the role on the instance has Streams permissions.
After you start adding messages to the stream and the thresholds are hit (2 MB at 60 seconds), you will see your targeted S3 bucket begin to populate with the prefix that you designated and the files written with an object key designating the year, month, day, and hour in which the output file from Firehose was written (prefix/yyyy/mm/dd/hr/*).
Conclusion
In this post, I have shown you how to create a reliable way to persist data from Streams to Amazon S3 using the new managed service Firehose. Firehose removes the need to manage compute servers and builds on some of the most-used tenets of streaming data persistence:
- Aggregated data based on thresholds.
- Persist data to a durable repository (in this case, S3).
The Hearst Publishing use case provided a way to reliably persist data from Streams to S3 with an aggregated output that modeled their current scheme – all with a native AWS service. As the data source was Streams, the Firehose service could run in parallel to the existing real-time data processing scheme with no impact.
If you have questions or suggestions, please leave a comment below.
—————
Related:
How Expedia Implemented Near Real-time Analysis of Interdependent Datasets