AWS Developer Tools Blog

Using Amazon Kinesis Firehose

Amazon Kinesis Firehose, a new service announced at this year’s re:Invent conference, is the easiest way to load streaming data into to AWS. Firehose manages all of the resources and automatically scales to match the throughput of your data. It can capture and automatically load streaming data into Amazon S3 and Amazon Redshift.

An example use for Firehose is to keep track of traffic patterns in a web application. To do that, we want to stream the records generated for each request to a web application with a record that contains the current page and the page being requested. Let’s take a look.

Creating the Delivery Stream

First, we need to create our Firehose delivery stream. Although we can do this through the Firehose console, let’s take a look at how we can automate the creation of the delivery stream with PowerShell.

In our PowerShell script, we need to set up the account ID and variables for the names of the resources we will create. The account ID is used in our IAM role to restrict access to just the account with the delivery stream.

$accountId = '<account-id>'
$roleName = '<iam-role-name>'
$s3BucketName = '<s3-bucket-name>'
$firehoseDeliveryStreamName = '<delivery-stream-name>'

Because Firehose will push our streaming data to S3, our script will need to make sure the bucket exists.

$s3Bucket = Get-S3Bucket -BucketName $s3BucketName
if($s3Bucket -eq $null)
{
    New-S3Bucket -BucketName $s3BucketName
}

We also need to set up an IAM role that gives Firehose permission to push data to S3. The role will need access to the Firehose API and the S3 destination bucket. For the Firehose access, our script will use the AmazonKinesisFirehoseFullAccess managed policy. For the S3 access, our script will use an inline policy that restricts access to the destination bucket.

$role = (Get-IAMRoles | ? { $_.RoleName -eq $roleName })

if($role -eq $null)
{
    # Assume role policy allowing Firehose to assume a role
    $assumeRolePolicy = @"
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId":"$accountId"
        }
      }
    }
  ]
}
"@

    $role = New-IAMRole -RoleName $roleName -AssumeRolePolicyDocument $assumeRolePolicy

    # Add managed policy AmazonKinesisFirehoseFullAccess to role
    Register-IAMRolePolicy -RoleName $roleName -PolicyArn 'arn:aws:iam::aws:policy/AmazonKinesisFirehoseFullAccess'

    # Add policy giving access to S3
    $s3AccessPolicy = @"
{
"Version": "2012-10-17",  
    "Statement":
    [    
        {      
            "Sid": "",      
            "Effect": "Allow",      
            "Action":
            [        
                "s3:AbortMultipartUpload",        
                "s3:GetBucketLocation",        
                "s3:GetObject",        
                "s3:ListBucket",        
                "s3:ListBucketMultipartUploads",        
                "s3:PutObject"
            ],      
            "Resource":
            [        
                "arn:aws:s3:::$s3BucketName",
                "arn:aws:s3:::$s3BucketName/*"		    
            ]    
        } 
    ]
}
"@

    Write-IAMRolePolicy -RoleName $roleName -PolicyName "S3Access" -PolicyDocument $s3AccessPolicy

    # Sleep to wait for the eventual consistency of the role creation
    Start-Sleep -Seconds 2
}

Now that the S3 bucket and IAM role are set up, we will create the delivery stream. We just need to set up an S3DestinationConfiguration object and call the New-KINFDeliveryStream cmdlet.

$s3Destination = New-Object Amazon.KinesisFirehose.Model.S3DestinationConfiguration
$s3Destination.BucketARN = "arn:aws:s3:::" + $s3Bucket.BucketName
$s3Destination.RoleARN = $role.Arn

New-KINFDeliveryStream -DeliveryStreamName $firehoseDeliveryStreamName -S3DestinationConfiguration $s3Destination 

After the New-KINFDeliveryStream cmdlet is called, it will take a few minutes to create the delivery stream. We can use the Get-KINFDeliveryStream cmdlet to check the status. As soon as it is active, we can run the following cmdlet to test our stream.

Write-KINFRecord -DeliveryStreamName $firehoseDeliveryStreamName -Record_Text "test record"

This will send one record to our stream, which will be pushed to the S3 bucket. By default, delivery streams buffer data to either 5 MB or 5 minutes before pushing to S3, so check the bucket in 5 minutes.

Writing to the Delivery Stream

In an ASP.NET application, we can write an IHttpModule so we know about every request. With an IHttpModule, we can add an event handler to the BeginRequest event and inspect where the request is coming from and going to. Here is code for our IHttpModule. The Init method adds the event handler. The RecordRequest method grabs the current URL and the request URL and sends that to the delivery stream.

using System;
using System.IO;
using System.Text;
using System.Web;

using Amazon;
using Amazon.KinesisFirehose;
using Amazon.KinesisFirehose.Model;

namespace KinesisFirehoseDemo
{
    /// 
    /// This http module adds an event handler for incoming requests.
	/// For each request a record is sent to Kinesis Firehose. For this demo a
    /// single record is sent at time with the PutRecord operation to
	/// keep the demo simple. This can be optimized by batching records and
	/// using the PutRecordBatch operation.
    /// 
    public class FirehoseSiteTracker : IHttpModule
    {
        IAmazonKinesisFirehose _client;

        // The delivery stream that was created using the setup.ps1 script.
        string _deliveryStreamName = "";

        public FirehoseSiteTracker()
        {
            this._client = new AmazonKinesisFirehoseClient(RegionEndpoint.USWest2);
        }

        public void Dispose() 
        {
            this._client.Dispose(); 
        }

        public bool IsReusable
        {
            get { return true; }
        }

        /// 
        /// Setup the event handler for BeginRequest events.
        /// 
        /// 
        public void Init(HttpApplication application)
        {
            application.BeginRequest +=
                (new EventHandler(this.RecordRequest));
        }

        /// 
        /// Write to Firehose a record with the starting page and the page being requested.
        /// 
        /// 
        /// 
        private void RecordRequest(Object source, EventArgs e)
        {
            // Create HttpApplication and HttpContext objects to access
            // request and response properties.
            HttpApplication application = (HttpApplication)source;
            HttpContext context = application.Context;

            string startingRequest = string.Empty;
            if (context.Request.UrlReferrer != null)
                startingRequest = context.Request.UrlReferrer.PathAndQuery;

            var record = new MemoryStream(UTF8Encoding.UTF8.GetBytes(string.Format("{0}t{1}n",
                startingRequest, context.Request.Path)));

            var request = new PutRecordRequest
            {
                DeliveryStreamName = this._deliveryStreamName,
                Record = new Record
                {
                    Data = record
                }
            };
            this._client.PutRecordAsync(request);
        }
    }
}

 

<system.webServer>
  <modules>
    <add name="siterecorder" type="KinesisFirehoseDemo.FirehoseSiteTracker"/>
  </modules>
</system.webServer>

Now we can navigate through our ASP.NET application and watch data flow into our S3 bucket.

What’s Next

Now that our data is flowing into S3, we have many options for what to do with that data. Firehose has built-in support for pushing our S3 data straight to Amazon Redshift, giving us lots of power for running queries and doing analytics. We could also set up event notifications to have Lambda functions or SQS pollers read the data getting pushed to Amazon S3 in real time.