AWS Big Data Blog
Build a real-time streaming generative AI application using Amazon Bedrock, Amazon Managed Service for Apache Flink, and Amazon Kinesis Data Streams
Generative artificial intelligence (AI) has gained a lot of traction in 2024, especially around large language models (LLMs) that enable intelligent chatbot solutions. Amazon Bedrock is a fully managed service that offers a choice of high-performing foundation models (FMs) from leading AI companies such as AI21 Labs, Anthropic, Cohere, Meta, Mistral AI, Stability AI, and Amazon through a single API, along with a broad set of capabilities to help you build generative AI applications with security, privacy, and responsible AI. Use cases around generative AI are vast and go well beyond chatbot applications; for instance, generative AI can be used for analysis of input data such as sentiment analysis of reviews.
Most businesses generate data continuously in real-time. Internet of Things (IoT) sensor data, application log data from your applications, or clickstream data generated by users of your website are only some examples of continuously generated data. In many situations, the ability to process this data quickly (in real-time or near real-time) helps businesses increase the value of insights they get from their data.
One option to process data in real-time is using stream processing frameworks such as Apache Flink. Flink is a framework and distributed processing engine for processing data streams. AWS provides a fully managed service for Apache Flink through Amazon Managed Service for Apache Flink, which enables you to build and deploy sophisticated streaming applications without setting up infrastructure and managing resources.
Data streaming enables generative AI to take advantage of real-time data and provide businesses with rapid insights. This post looks at how to integrate generative AI capabilities when implementing a streaming architecture on AWS using managed services such as Managed Service for Apache Flink and Amazon Kinesis Data Streams for processing streaming data and Amazon Bedrock to utilize generative AI capabilities. We focus on the use case of deriving review sentiment in real-time from customer reviews in online shops. We include a reference architecture and a step-by-step guide on infrastructure setup and sample code for implementing the solution with the AWS Cloud Development Kit (AWS CDK). You can find the code to try it out yourself on the GitHub repo.
Solution overview
The following diagram illustrates the solution architecture. The architecture diagram depicts the real-time streaming pipeline in the upper half and the details on how you gain access to the Amazon OpenSearch Service dashboard in the lower half.
The real-time streaming pipeline consists of a producer that is simulated by running a Python script locally that is sending reviews to a Kinesis Data Stream. The reviews are from the Large Movie Review Dataset and contain positive or negative sentiment. The next step is the ingestion to the Managed Service for Apache Flink application. From within Flink, we are asynchronously calling Amazon Bedrock (using Anthropic Claude 3 Haiku) to process the review data. The results are then ingested into an OpenSearch Service cluster for visualization with OpenSearch Dashboards. We directly call the PutRecords API of Kinesis Data Streams within the Python script for the sake of simplicity and to cost-effectively run this example. You should consider using an Amazon API Gateway REST API as a proxy in front of Kinesis Data Streams when using a similar architecture in production, as described in Streaming Data Solution for Amazon Kinesis.
To gain access to the OpenSearch dashboard, we need to use a bastion host that is deployed in the same private subnet within your virtual private cloud (VPC) as your OpenSearch Service cluster. To connect with the bastion host, we use Session Manager, a capability of Amazon Systems Manager, which allows us to connect to our bastion host securely without having to open inbound ports. To access it, we use Session Manager to port forward the OpenSearch dashboard to our localhost.
The walkthrough consists of the following high-level steps:
- Create the Flink application by building the JAR file.
- Deploy the AWS CDK stack.
- Set up and connect to OpenSearch Dashboards.
- Set up the streaming producer.
Prerequisites
For this walkthrough, you should have the following prerequisites:
- An AWS account.
- Java 11 or later.
- Apache Maven 3.9.6 or later.
- The AWS Command Line Interface (AWS CLI) installed. For instructions, refer to Get started with the AWS CLI.
- The AWS CDK installed. For instructions, refer to Install the AWS CDK.
- Python 3.9 or later.
- The Session Manager plugin installed. The plugin is required for access to OpenSearch Dashboards using Session Manager. For instructions, refer to Install the Session Manager plugin for the AWS CLI.
- Model access to Anthropic’s Claude model on Amazon Bedrock. For instructions, refer to Add model access.
- The dataset used is the Large Movie Review Dataset from the following paper: Andrew L. Maas, Raymond E. Daly, Peter T. Pham, Dan Huang, Andrew Y. Ng, and Christopher Potts. (2011). Learning Word Vectors for Sentiment Analysis. The 49th Annual Meeting of the Association for Computational Linguistics (ACL 2011).
Implementation details
This section focuses on the Flink application code of this solution. You can find the code on GitHub. The StreamingJob.java file inside the flink-async-bedrock
directory file serves as entry point to the application. The application uses the FlinkKinesisConsumer
, which is a connector for reading streaming data from a Kinesis Data Stream. It applies a map transformation to convert each input string into an instance of Review class object, resulting in DataStream<Review>
to ease processing.
The Flink application uses the helper class AsyncDataStream
defined in the StreamingJob.java file to incorporate an asynchronous, external operation into Flink. More specifically, the following code creates an asynchronous data stream by applying the AsyncBedrockRequest
function to each element in the inputReviewStream
. The application uses unorderedWait
to increase throughput and reduce idle time because event ordering is not required. The timeout is set to 25,000 milliseconds to give the Amazon Bedrock API enough time to process long reviews. The maximum concurrency or capacity is limited to 1,000 requests at a time. See the following code:
The Flink application initiates asynchronous calls to the Amazon Bedrock API, invoking the Anthropic Claude 3 Haiku foundation model for each incoming event. We use Anthropic Claude 3 Haiku on Amazon Bedrock because it is Anthropic’s fastest and most compact model for near-instant responsiveness. The following code snippet is part of the AsyncBedrockRequest.java
file and illustrates how we set up the required configuration to call the Anthropic’s Claude Messages API to invoke the model:
Prompt engineering
The application uses advanced prompt engineering techniques to guide the generative AI model’s responses and provide consistent responses. The following prompt is designed to extract a summary as well as a sentiment from a single review:
The prompt instructs the Anthropic Claude model to return the extracted sentiment and summary in JSON format. To maintain consistent and well-structured output by the generative AI model, the prompt uses various prompt engineering techniques to improve the output. For example, the prompt uses XML tags to provide a clearer structure for Anthropic Claude. Moreover, the prompt contains an example to enhance Anthropic Claude’s performance and guide it to produce the desired output. In addition, the prompt pre-fills Anthropic Claude’s response by pre-filling the Assistant message. This technique helps provide a consistent output format. See the following code:
Build the Flink application
The first step is to download the repository and build the JAR file of the Flink application. Complete the following steps:
- Clone the repository to your desired workspace:
- Move to the correct directory inside the downloaded repository and build the Flink application:
Maven will compile the Java source code and package it in a distributable JAR format in the directory flink-async-bedrock/target/ named flink-async-bedrock-0.1.jar
. After you deploy your AWS CDK stack, the JAR file will be uploaded to Amazon Simple Storage Service (Amazon S3) to create your Managed Service for Apache Flink application.
Deploy the AWS CDK stack
After you build the Flink application, you can deploy your AWS CDK stack and create the required resources:
- Move to the correct directory
cdk
and deploy the stack:
This will create the required resources in your AWS account, including the Managed Service for Apache Flink application, Kinesis Data Stream, OpenSearch Service cluster, and bastion host to quickly connect to OpenSearch Dashboards, deployed in a private subnet within your VPC.
- Take note of the output values. The output will look similar to the following:
Set up and connect to OpenSearch Dashboards
Next, you can set up and connect to OpenSearch Dashboards. This is where the Flink application will write the extracted sentiment as well as the summary from the processed review stream. Complete the following steps:
- Run the following command to establish connection to OpenSearch from your local workspace in a separate terminal window. The command can be found as output named
accessOpenSearchClusterOutput
.- For Mac/Linux, use the following command:
-
- For Windows, use the following command:
It should look similar to the following output:
- Create the required index in OpenSearch by issuing the following command:
- For Mac/Linux, use the following command:
-
- For Windows, use the following command:
- After the session is established, you can open your browser and navigate to
https://localhost:8157/_dashboards
. Your browser might consider the URL not secure. You can ignore this warning. - Choose Dashboards Management under Management in the navigation pane.
- Choose Saved objects in the sidebar.
- Import
export.ndjson
, which can be found in the resources folder within the downloaded repository.
- After you import the saved objects, you can navigate to Dashboards under My Dashboard in the navigation pane.
At the moment, the dashboard appears blank because you haven’t uploaded any review data to OpenSearch yet.
Set up the streaming producer
Finally, you can set up the producer that will be streaming review data to the Kinesis Data Stream and ultimately to the OpenSearch Dashboards. The Large Movie Review Dataset was originally published in 2011 in the paper “Learning Word Vectors for Sentiment Analysis” by Andrew L. Maas, Raymond E. Daly, Peter T. Pham, Dan Huang, Andrew Y. Ng, and Christopher Potts. Complete the following steps:
- Download the Large Movie Review Dataset here.
- After the download is complete, extract the
.tar.gz
file to retrieve the folder namedaclImdb 3
or similar that contains the review data. Rename the review data folder toaclImdb
. - Move the extracted dataset to
data/
inside the repository that you previously downloaded.
Your repository should look like the following screenshot.
- Modify the
DATA_DIR
path inproducer/producer.py
if the review data is named differently. - Move to the producer directory using the following command:
- Install the required dependencies and start generating the data:
The OpenSearch dashboard should be populated after you start generating streaming data and writing it to the Kinesis Data Stream. Refresh the dashboard to view the latest data. The dashboard shows the total number of processed reviews, the sentiment distribution of the processed reviews in a pie chart, and the summary and sentiment for the latest reviews that have been processed.
When you have a closer look at the Flink application, you will notice that the application marks the sentiment field with the value error whenever there is an error with the asynchronous call made by Flink to the Amazon Bedrock API. The Flink application simply filters the correctly processed reviews and writes them to the OpenSearch dashboard.
For robust error handling, you should write any incorrectly processed reviews to a separate output stream and not discard them completely. This separation allows you to handle failed reviews differently than successful ones for simpler reprocessing, analysis, and troubleshooting.
Clean up
When you’re done with the resources you created, complete the following steps:
- Delete the Python producer using Ctrl/Command + C.
- Destroy your AWS CDK stack by returning to the root folder and running the following command in your terminal:
- When asked to confirm the deletion of the stack, enter yes.
Conclusion
In this post, you learned how to incorporate generative AI capabilities in your streaming architecture using Amazon Bedrock and Managed Service for Apache Flink using asynchronous requests. We also gave guidance on prompt engineering to derive the sentiment from text data using generative AI. You can build this architecture by deploying the sample code from the GitHub repository.
For more information on how to get started with Managed Service for Apache Flink, refer to Getting started with Amazon Managed Service for Apache Flink (DataStream API). For details on how to set up Amazon Bedrock, refer to Set up Amazon Bedrock. For other posts on Managed Service for Apache Flink, browse through the AWS Big Data Blog.
About the Authors
Felix John is a Solutions Architect and data streaming expert at AWS, based in Germany. He focuses on supporting small and medium businesses on their cloud journey. Outside of his professional life, Felix enjoys playing Floorball and hiking in the mountains.
Michelle Mei-Li Pfister is a Solutions Architect at AWS. She is supporting customers in retail and consumer packaged goods (CPG) industry on their cloud journey. She is passionate about topics around data and machine learning.