AWS Big Data Blog

Building a Near Real-Time Discovery Platform with AWS

September 8, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.

February 9, 2024: Amazon Kinesis Data Firehose has been renamed to Amazon Data Firehose. Read the AWS What’s New post to learn more.


Assaf Mentzer is a Senior Consultant for AWS Professional Services

In the spirit of the U.S presidential election of 2016, in this post I use Twitter public streams to analyze the candidates’ performance, both Republican and Democrat, in a near real-time fashion. I show you how to integrate AWS managed services—Amazon Kinesis Firehose, AWS Lambda (Python function), and Amazon OpenSearch Service —to create an end-to-end, near real-time discovery platform.

The following screenshot is an example of a Kibana dashboard on top of geo-tagged tweet data. This screenshot was taken during the fourth republican presidential debate (November 10th, 2015).

Kibana dashboard on top of geotagged tweet data

The dashboard shows tweet data related to the presidential candidates (only tweets that contain a candidate’s name):

  • Top 10 Twitter mentions (@username) – you can see that Donald Trump is the most mentioned candidate
  • Sentiment analysis
  • Map visualization – Washington DC is the most active area

The dashboard has drill-down capabilities; choosing one of the sentiments in the pie chart or one of the @mentions in the bar chart changes the view of the entire dashboard accordingly. For example, you can see the sentiment analysis and geographic distribution for a specific candidate. The dashboard shows data from the last hour, and is configured to refresh the data every 30 seconds.

Because the platform built in this post collects all geo-tagged public Twitter data and filters data only in the dashboard layer, you can use the same solution for other use cases by just changing the filter search terms.

Use same solution for other use cases by changing filter search terms

Architecture

This platform has the following architecture:

  • A producer device (in this case, the Twitter feed) puts data into Amazon Kinesis Firehose.
  • Firehose automatically buffers the data (in this case, 5MB size or 5 minutes interval, whichever condition is satisfied first) and delivers the data to Amazon S3.
  • A Python Lambda function is triggered when a new file is created on S3 and indexes the S3 file content to Amazon OpenSearch Service.
  • The Kibana application runs on top of the Elasticsearch index to provide a visual display of the data.

Platform architecture

Important: Streaming data can be pushed directly to Amazon OpenSearch Service. The architecture described in this post is recommended when data has to be persisted on S3 for further batch/advanced analysis (lambda architecture,not related to AWS Lambda) in addition to the near-real-time analysis on top of Amazon OpenSearch Service, which might retain only “hot data” (last x hours).

Prerequisites

To create this platform, you’ll need an AWS account and a Twitter application. Sign in with your Twitter account and create a new application at https://apps.twitter.com/. Make sure your application is set for ‘read-only’ access and then choose Create My Access Token at the bottom of the Keys and Access Tokens tab. By this point, you should have four Twitter application keys: consumer key (API key), consumer secret (API secret), access token, and access token secret. Write down these keys.

Create Lambda function Execution

  1. Open the roles page in the IAM console.
  2. Choose Create role.
  3. Under Common use cases, choose Lambda.
  4. Choose Next: Permissions.
  5. Under Attach permissions policies, choose the AWSLambdaBasicExecutionRoleand  AmazonS3ReadOnlyAccess managed policies.
  6. Choose Next: Tags.
  7. Choose Next: Review.
  8. For Role name, enter s3-twitter-to-es-lambda-role.
  9. Choose Create role.

Create Amazon OpenSearch Service cluster

Start by creating an Amazon OpenSearch Service cluster that will hold your data for near real-time analysis. Amazon OpenSearch Service includes built-in support for Kibana, which is used for visualization on top of Amazon OpenSearch Service.

  1. Sign in to the Amazon OpenSearch Service console.
  2. Choose Create a new domain(or Get Started, if this is your first time in the console).
  3. Name your domain “es-twitter-demo” and choose Next.
  4. Keep the default selections and choose Next.
  5. Choose Json defined access policy and add the following policy
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "Lambda function permissions",
          "Effect": "Allow",
          "Principal": {
            "AWS": [
              "arn:aws:iam::YOUR_ACCOUNT_ID:role/service-role/s3-twitter-to-es-lambda-role"
            ]
          },
          "Action": "es:*",
          "Resource": "arn:aws:es:us-east-1:YOUR_ACCOUNT_ID:domain/es-twitter-demo/*"
        },
        {
          "Sid": "Kibana permissions by IP address",
          "Effect": "Allow",
          "Principal": {
            "AWS": "*"
          },
          "Action": ["es:ESHttpHead", "es:ESHttpGet", "es:ESHttpPost"],
          "Resource": "arn:aws:es:us-east-1:YOUR_ACCOUNT_ID:domain/es-twitter-demo/*",
          "Condition": {
            "IpAddress": {
              "aws:SourceIp": "YOUR_IP_ADDRESS/32"
            }
          }
        }
      ]
    }

    Note: The above policy applies an IP-based access policy for Kibana. Should you wish to implement username & password authentication for Kibana please refer to Amazon Cognito Authentication for Kibana

  1. Choose Confirm and create.
  2. Within ~10 minutes, your domain is ready. When the creation process has reached a status of Active, the domain should be associated with both an Amazon OpenSearch Service endpoint and a Kibana URL, which you need to store for later steps.

Create an IAM role for Firehose

Use a Firehose delivery stream to ingest the Twitter streaming data and put it to Amazon S3. Before you can ingest the data into Firehose, you need to set up an IAM role to allow Firehose to call AWS services on your behalf. In this example, the Twitter feed which is your producer application creates the Firehose delivery stream based on the IAM role.

Follow these steps to create IAM role named “firehose_delivery_role” using AWS CLI:

  1. Create the following files:
    {
      "Version": "2012-10-17",
      "Statement": {
        "Effect": "Allow",
        "Principal": {"Service": "firehose.amazonaws.com"}, "Action": "sts:AssumeRole"
      } 
    }

    s3-rw-policy.json (please replace A_BUCKET_YOU_SETUP_FOR_THIS_DEMO with your S3 bucket):

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "",
                "Effect": "Allow",
                "Action": [
                    "s3:AbortMultipartUpload",
                    "s3:GetBucketLocation",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:PutObject"
                ],
                "Resource": [
                    "arn:aws:s3:::A_BUCKET_YOU_SETUP_FOR_THIS_DEMO/*"
                ]
            }
        ]
    }
  2. Run the following commands:aws iam create-role --role-name firehose_delivery_role --assume-role-policy-document file://firehose-policy.jsonaws iam put-role-policy --role-name firehose_delivery_role --policy-name firehose-s3-rw --policy-document file://s3-rw-policy.json

Create a Lambda function

For this example, use a Python function (lambda_function.py) that is triggered when a new file is created on S3. The function does the following:

  1. Reads the file content
  2. Parses the content to JSON format (Amazon OpenSearch Service stores documents in JSON format).
  3. Analyzes Twitter data (tweet_utils.py):
    • Extracts Twitter mentions (@username) from the tweet text.
    • Extracts sentiment based on emoticons. If there’s no emoticon in the text the function uses textblob sentiment analysis.
  4. Loads the data to Amazon OpenSearch Service (twitter_to_es.py) using the elasticsearch-py library.

The Python code is available in aws-big-data-blog repository.

  1. Download the deployment packageand unzip to the s3-twitter-to-es-python folder.
  2. Modify the s3-twitter-to-es-python/config.py file by changing the value of es_hostto the Amazon OpenSearch Service endpoint of your domain.
  3. Zip the folder content on your local environment as my-s3-twitter-to-es-python.zip (important: zip the folder content, not the folder itself).
  4. Sign in to the Lambda console.
  5. Choose Create a Lambda function(or Get started now if this is your first time using the service).
  6. Choose Skipin the blueprints screen.
  7. Name your function (e.g., s3-twitter-to-es-python).
  8. Choose Python 2.7 runtimeand upload the zip file my-s3-twitter-to-es-python.zip.
  9. Make sure the Handlerfield value is lambda_function.lambda_handler.
  10. Choose s3-twitter-to-es-lambda-role (the role you created earlier).
  11. Keep memory at 128MB and choose a 2min timeout.
  12. Choose Nextand Create function, then wait until the function is created.
  13. On the Event sourcestab, choose Add event source.
  14. Choose the event source type S3, select the bucket, and choose the event type Object Created (All).
  15. Enter a value for S3 Prefix(e.g., twitter/raw-data/) to ensure the function doesn’t trigger when data is uploaded elsewhere in the bucket.
  16. Make sure that the event source is enabled and click Submit.

Feed the producer with Twitter streaming data

Your producer is a Node.js application that connects to the Twitter feed via the Twitter stream API and puts the streaming data into Firehose. The code is available aws-big-data-blog repository.

To use the producer application, you have to install Node.js (go to https://nodejs.org to install it on your local machine). Alternatively, you can launch a t2.micro EC2 instance based on the Amazon Linux AMI and run the following command:

sudo yum -y install nodejs npm --enablerepo=epel

  1. Download the application, unzip the file, and run npm install from the twitter-streaming-firehose-nodejs  folder.
  2. Modify the Config.js file with your settings (change <YOUR PARAMETERS> as follows:
  • firehose
    • DeliveryStreamName – Name your stream. The app creates the delivery stream if it does not exist.
    • BucketARN: Use the bucket matched to the Lambda function.
    • RoleARN: Get your account ID from the IAM dashboard users sign-in link https://Your_AWS_Account_ID.signin.aws.amazon.com/console/. Use the Firehose role you created earlier (“firehose_delivery_role”).
    • Prefix: Use the same s3 prefix that you used in your Lambda function event source (e.g., twitter/raw-data/).
  • twitter – Enter your twitter application keys.
  • region – Enter your Firehose region (e.g., us-east-1, us-west-2, eu-west-1).
  • Make sure your aws credentials are configured under <HOME FOLDER>/.aws/credentials as follows:

[default]

aws_access_key_id=

aws_secret_access_key=

Now that your Config.js file is modified, you can open a console window and initiate execution of your program by running the following command:

node twitter_stream_producer_app

Wait a few seconds until the delivery stream is active, and then you should see Twitter data on your screen. The app collect tweets from the US but you can modify the locations in Config.js file. For more information, go to twitter geolocation.

Discover and analyze data

Wait a few minutes until Firehose has time to deliver enough files to Amazon S3 to make it interesting to review. The files should appear under the following bucket:

s3://<bucket>/<prefix>/<year>/<month>/<day>/<hour>/

Open Kibana in your browser using your Kibana URL. To start discovering the data stored in Amazon OpenSearch Service, you need to create an index pattern pointing to your Elasticsearch index, which is like a ‘database’ in a relational database. For more information, go to What is an Elasticsearch Index?.

Create an index pattern as follows:

Create an index pattern

On the Discover tab, choose Add near the text field on the left sidebar. You should get the following result:

Start exploring the data by choosing any field in the left sidebar and filter. You can search for a specific term by replacing the asterisk (*) in the search field with your terms. You can also filter by time by choosing the Time Filter icon at the top right.

For example, you can search for the term “Trump” to discover and understand the data related to one of the candidates.

search for the term Trump to discover and understand the data related to one of the candidates

In this 2016 election discovery platform, you can analyze the performance of the presidential candidates: How many tweets they got, the sentiment of those tweets (positive/negative/neutral/confused), and how the tweets are geographically distributed (identifying politically active areas).

Because this is a near real-time discovery platform, you can measure the immediate impact of political events on the candidates’ popularity (for example, during a political debate).

Create a dashboard

To visualize candidates’ popularity in Twitter (in how many tweets a candidate was mentioned), create a top mentions bar chart.

  1. On the Discover tab, choose the mentions field on the left sidebar.
  2. Choose Visualize (ignore the warning).

Building_Discovery_Platform_Image_8

  1. On the X-Axis tab, change the size from 20 to 10 and choose Apply.
  2. Choose the Save Visualization icon at the top right.
  3. Enter a name and choose Save.

To analyze how tweets related to the 2016 election are geographically distributed in order to identify politically active areas, create a tile map.

  1. On the Discover tab, choose the coordinates.coordinates field.
  2. Choose Visualize.

Note: By default, in the Node.js app, tweets are collected only from the U.S.

  1. To center the map, choose the crop Building_Discovery_Platform_Image_9_small icon.
  2. Choose Save Visualization.

To identify candidates’ popularity (or unpopularity), visualize the sentiments field. Because there are only 4 potential values (positive/negative/neutral/confused), you can use a pie chart visualization.

  1. On the Visualize tab, choose the New Visualization icon (Building_Discovery_Platform_Image_10_small).
  2. Choose Pie chart.
  3. Choose new search, Split Slices, Terms aggregation, and the sentiments field.
  4. Choose Apply and Save Visualization.

Combine all the visualizations into a single dashboard.

  1. On the Dashboard tab, choose Add Visualization (Building_Discovery_Platform_Image_11_small) at the top right corner, and select a visualization.
  2. Repeat the previous step for all other visualizations.
  3. Choose Save Dashboard, enter a name for your dashboard, and choose Save.

Now you can search for the presidential candidates in the data. Put the following search terms in the search filter field:

realDonaldTrump,realBenCarson,JebBush,tedcruz,ChrisChristie,JohnKasich,
GovMikeHuckabee,RandPaul,MarcoRubio,CarlyFiorina,JebBush,HillaryClinton,
MartinOMalley,BernieSanders

Search for candidates in the data

You’ve got yourself a dashboard! Select your preferred candidate in the bar chart to drill down to performance.

Conclusion

AWS managed services, like Amazon Kinesis Firehose, AWS Lambda, and Amazon OpenSearch Service, take care of provisioning and maintaining the infrastructure components when building near real time applications and enable you to focus on your business logic.

You can quickly and easily tie these services together to create a near real-time discovery platform. For this post, we analyzed the performance of the 2016 presidential candidates, but this type of platform can be used for a variety of other use cases.

If you have questions or suggestions, please leave a comment below.

————————–

Related

Getting Started with Amazon OpenSearch Service and Kibana on Amazon EMR