AWS Database Blog

Build a streaming ETL pipeline on Amazon RDS using Amazon MSK

Customers who host their transactional database on Amazon Relational Database Service (Amazon RDS) often seek architecture guidance on building streaming extract, transform, load (ETL) pipelines to destination targets such as Amazon Redshift. Building a streaming ETL pipeline on Amazon Web Services (AWS) typically involves using one or more AWS services to handle the data flow in real time. To implement this solution, you should clearly define your streaming ETL requirements, including the data sources, transformation logic, and the destination data store.

This post outlines the architecture pattern for creating a streaming data pipeline using Amazon Managed Streaming for Apache Kafka (Amazon MSK). Amazon MSK offers a fully managed Apache Kafka service, enabling you to ingest and process streaming data in real time. This makes Kafka flexible and effective for handling real-time data across a wide range of industry use cases. Furthermore, Amazon Redshift integrates with MSK, facilitating low-latency, high-speed ingestion of streaming data directly into an Amazon Redshift materialized view.

Both streaming ETL and batch ETL are approaches used for data integration, processing, and analysis. Organizations may use a combination of both streaming ETL and batch ETL approaches, depending on the nature of their data processing needs. We begin with an architectural overview to understand the various components involved in the streaming data pipeline.

Streaming data pipeline architecture overview

A streaming pipeline begins with unprocessed asynchronous event streams and finishes with a structured table of optimized insights, allowing data analysts to perform SQL queries, as shown in the following figure.

workflow

The example architecture includes:

  1. Source: A transactional database running on Amazon RDS for SQL Server.
  2. Stream ingestion: A Debezium connector running on Amazon MSK Connect infrastructure to ingest the data stream.
  3. Stream storage: A fully managed Amazon MSK cluster as stream storage.
  4. Stream processing: AWS Glue to handle data transformation and processing between the MSK cluster and the analytics target.
  5. Destination: An Amazon Redshift cluster as the final data store for running analytics queries.

These components work together to create a scalable, reliable, and efficient streaming ETL architecture that can handle real-time data processing and analytics.

Solution overview

In this example architecture, we construct a pipeline that reads from an RDS for SQL Server as a data source and write the transformed data into an Amazon Redshift cluster. We use change data capture feature within SQL Server to track changes made to the data. The functional change data capture (CDC) configuration is critical for streaming use cases as it captures only the changes made to data in real-time, allowing for efficient updates to downstream systems without the need for full data refresh for each change capture. This ensures that the target system have access to the latest information instantly. To learn more about CDC, visit Change data capture (CDC)

In RDS for SQL Server, we create a sampledb database and table patientinfo.

The patientinfo table schema consists of:

  • ID: An integer auto-incrementing field, serving as the primary key.
  • FirstName: Stores up to 100 characters for a patient’s first name.
  • LastName: Stores up to 100 characters for a patient’s last name.
  • State: A two-character field for the state abbreviation.
  • SSN: Stores up to 10 characters for a patient’s Social Security Number.
  • DOB: A DATETIME field for storing the patient’s date of birth

In Amazon Redshift, we create materialized view to analyze the stream of patient records in a Kafka topic. By combining Kafka (through MSK) and Amazon Redshift with materialized views, you can efficiently handle streaming patient data and generate analytics such as patient counts, patient verification, age distributions, and data quality checks in real time. Optionally, we also show you how to use AWS Glue to transform the incoming stream of data. The following figure shows the workflow of the solution.

ArchDiagram

The components of the workflow are:

  1. RDS for SQL Server uses CDC to capture and store the stream of changes
  2. Debezium connector running on MSK Connect uses a polling mechanism to pull the changes from the CDC tables.
  3. Debezium connector serializes the data and writes data into Kafka topics in near real time.
  4. In Amazon Redshift, you can configure a materialized view to capture changes from the Managed Streaming Kafka topic and refresh its data. 

Prerequisites

For the solution in this post, we provide an AWS CloudFormation template that creates all the necessary AWS resources. Before deploying this template, ensure that you have an Amazon Elastic Computer Cloud (Amazon EC2) key pair ready, because you will need one for the deployment process. If you don’t have one, you can follow the instructions in Create a key pair for your Amazon EC2 instance.

  1. An Amazon Virtual Private Cloud (Amazon VPC) with one public subnet and three private subnets.
  2. Networking infrastructure, including an internet gateway, NAT gateway, elastic IP address, route tables, and security groups.
  3. AWS Identity and Access Management (IAM) roles and instance profiles.
  4. An Amazon Simple Storage Service (Amazon S3) bucket to store SQL Server connector files.
  5. Two Amazon EC2 instances: a Windows instance that serves as a bastion host and a Linux instance that will be used to interact with the MSK cluster and the RDS for SQL Server instance.
  6. Two AWS Secrets Manager secrets: one for the bastion host and RDS for SQL Server signin credentials, and one for the Amazon Redshift cluster credentials.
  7. An Amazon MSK cluster.
  8. An Amazon RDS for SQL Server instance.
  9. An Amazon Redshift cluster with provisioned deployment model.

Deploy the solution

To deploy the provided CloudFormation template:

  1. Choose Launch Stack.
  2. Enter a name for the stack in the Stack name
  3. Input the value for the ClientIPCIDR
  4. For the EC2KeyPair parameter, select an appropriate EC2 key pair.
  5. Leave the remaining parameters as the default.
  6. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names, then choose Submit.

The CloudFormation template requires approximately 35 minutes to deploy. After the template has finished provisioning the specified resources, the Outputs section displays information about the newly created resources, as shown in the following figure.

CF-output

Connect to the EC2 Windows instance using RDP client

To access the EC2 Windows instance, follow the instructions in the Connect to your Windows instance. The signin credentials for the user can be retrieved from the secret named <StackName>-bastion-rds-credentials.

Amazon RDS for SQL Server configuration

Amazon RDS supports CDC for your database instances running Microsoft SQL Server. To enable CDC on your database and specific tables, complete the following steps:

  1. Upon successful connection to the Windows instance, open SQL Server Management Studio (SSMS).
  2. Connect to the RDS for SQL Server database instance. The database credentials can be found in the same secret, <StackName>-bastion-rds-credentials.
  3. Choose New Query.
  4. Create a sample database.
    CREATE DATABASE sampledb
    GO
  5. Create a sample table.
    USE sampledb
    GO
    CREATE TABLE patientinfo
    (
    ID INT IDENTITY NOT NULL PRIMARY KEY,
    FirstName VARCHAR(100),
    LastName VARCHAR (100),
    State VARCHAR (2),
    SSN VARCHAR (10),
    DOB DATETIME)
    GO
  6. To enable CDC for an RDS database instance, run the dbo.rds_cdc_enable_db stored procedure.
    USE sampledb
    GO
    EXEC msdb.dbo.rds_cdc_enable_db 'sampledb'
    GO
  7. After CDC is enabled on the database, you can start tracking specific tables. You can choose the tables to track by running sp_cdc_enable_table.
    USE sampledb
    GO
    EXEC sys.sp_cdc_enable_table
     @source_schema = N'dbo',
     @source_name = N'patientinfo',
     @role_name = Null,
     @supports_net_changes = 1
    GO
  8. Configure and verify the capture job status using sp_cdc_help_change_data_capture.
    USE sampledb
    GO
    EXEC sys.sp_cdc_help_change_data_capture
    GO
  9. Adjust the polling intervals to the requirements of your use case. In this example, we will keep the default 5 seconds using sp_cdc_change_job.
    USE sampledb
    GO
    EXEC sys.sp_cdc_change_job @job_type = 'capture' ,@pollinginterval = 5
    GO
  10. Validate the configuration using sp_cdc_help_jobs.
    EXEC sys.sp_cdc_help_jobs
    GO

Amazon MSK cluster configuration

We use MSK Connect, which uses the Apache Kafka Connect framework for connecting Kafka clusters with external systems such as relational databases, search indexes, or a filesystem. In this example architecture, we download and deploy the SQL Server Connector by Debezium fully integrated with MSK Connect to stream changes from SQL Server databases to Kafka clusters without code changes.

Create a Debezium source connector

The CloudFormation template provides a sample ZIP file for Debezium 2.6. You can find this custom plugin ZIP file for the Debezium connector in the S3 bucket created by the template.

However, we suggest downloading the latest stable release instead.

  1. Download the SQL Server connector plugin for the latest stable release from the Debezium site.
  2. Download and extract the MSK config provider zip file.
  3. Place the following archives into the S3 bucket provided in the output of the CloudFormation template.
    1. The debezium-connector-sqlserverfolder
    2. The msk-config-providers-0.1.0-with-dependencies.zip file

    The following files will be zipped.

    [ec2-user@ip-10-0-0-24 custom-plugin]$ ls -ltr
    drwxr-xr-x. 2 ec2-user ec2-user 16384 Mar 7 2023 lib
    -rw-r--r--. 1 ec2-user ec2-user 20747 Mar 7 2023 msk-config-providers-0.1.0.jar
    -rw-r--r--. 1 ec2-user ec2-user 15338921 Mar 7 2023 msk-config-providers-0.1.0-with-dependencies.zip
    -rw-r--r--. 1 ec2-user ec2-user 2898637 Apr 2 07:48 debezium-connector-sqlserver-2.6.0.Final-plugin.tar.gz
    drwxr-xr-x. 2 ec2-user ec2-user 16384 Sep 5 13:47 debezium-connector-sqlserver
  4. Upload the ZIP file into the folder sqlserver_connector_plugin within S3 bucket. For information on how to upload files to Amazon S3, see Uploading objects in the Amazon S3 user guide.
    s3

Create a secret for your database credentials

During the creation of the CloudFormation template, a secret containing the credentials for the RDS database is generated. To ensure that the MSK role has the required permissions to access and retrieve these secrets, the following permissions related to Secrets Manager have been added to the role’s policy.

{ 
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "secretsmanager:GetResourcePolicy",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret",
                "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": [
                "arn:aws:secretsmanager:<region>:<account>:secret:<secret-name>"
            ],
            "Effect": "Allow"
        }
    ]
}
 
       
 

Configure Amazon MSK

Use the following procedures to configure Amazon MSK.

Create a worker configuration using the Amazon MSK console

  1. In the navigation pane, under MSK Connect, choose Worker configurations
  2. Choose Create worker configuration.
  3. For the worker configuration name, enter msk-secrets-manager-worker-config.
  4. Copy the Secrets Manager configuration code and paste it into the Worker configuration text box. Then, update the <secrets-manager-region> placeholder with the AWS Region where your Secrets Manager is located.
    key.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=false
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=false
    offset.storage.topic=offsets_my_debezium_source_connector
    
    config.providers=secretsmanager
    config.providers.secretsmanager.class=com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
    config.providers.secretsmanager.param.region=<Secrets Manager Region>
  5. Choose Create worker configuration.

Create a custom plugin for SQL Server using the Amazon MSK console.

  1. In the navigation pane, under MSK Connect, choose Custom plugins.
  2. Choose Create custom plugin.
  3. Choose Browse S3
  4. In the list of S3 buckets, choose the bucket that has the ZIP file for the plugin.
  5. In the list of objects, select the box to the left of the ZIP file for the plugin, then choose Choose.
  6. For the custom plugin name enter sql-server-custom-plugin.
  7. Choose Create custom plugin.

Create a connector.

  1. In the navigation pane, expand MSK Connect, then choose Connectors.
  2. Choose Create connector.
  3. In the list of plugins, choose sql-server-custom-plugin from the previous step, then choose Next.
  4. For the connector name enter sql-server-connector.
  5. In the list of clusters, choose your-msk-cluster.
  6. Copy the following configuration settings.
    connector.class=io.debezium.connector.sqlserver.SqlServerConnector
    driver.encrypt=false
    tasks.max=1
    schema.history.internal.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    include.schema.changes=true
    topic.prefix=sampledb
    schema.history.internal.kafka.topic=internal.dbhistory.sampledb
    schema.history.internal.producer.security.protocol=SASL_SSL
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter
    schema.history.internal.producer.sasl.mechanism=AWS_MSK_IAM
    schema.history.internal.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    database.user=${secretsmanager:<SecretName>:username}
    database.password=${secretsmanager:<SecretName>:password}
    database.hostname=<RDS for SQL Server Endpoint>
    schema.history.internal.kafka.bootstrap.servers=<Bootstrap brokers>
    database.names=sampledb
    schema.history.internal.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    database.server.name=sampledb
    database.port=1433
    key.converter.schemas.enable=false
    driver.trustServerCertificate=true
    value.converter.schemas.enable=false
    schema.history.internal.consumer.sasl.mechanism=AWS_MSK_IAM
    schema.history.internal.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    database.include.list=sampledb
    schema.history.internal.consumer.security.protocol=SASL_SSL
  7. Substitute the following placeholder strings with values relevant to your use case, and paste it into the Connector Configuration.
    Parameter Review the CloudFormation Output and replace value as shown
    database.user database.user=${secretsmanager:<StackName-bastion-rds-credentials>:username}
    database.password database.password=${secretsmanager:<StackName-bastion-rds-credentials>:password}
    database.hostname database.hostname= dbname.xxxxxxxxxx.us-east-2.rds.amazonaws.com
    schema.history.internal.kafka.bootstrap.servers schema.history.internal.kafka.bootstrap.servers=
    b-3.mskclusterxxxxxxx.xxxxxxx.c7.kafka.us-east-2.amazonaws.com:9098,
    b-2.mskclusterxxxxxxx.xxxxxxx.c7.kafka.us-east-2.amazonaws.com:9098,
    b-1.mskclusterxxxxxxx.xxxxxxx.c7.kafka.us-east-2.amazonaws.com:9098
    To get the list of brokers, visit Getting the bootstrap brokers for an Amazon cluster

    For more information, see Debezium connector for SQL Server

  8. Next, configure the connector capacity. You can choose between two capacity modes: Autoscaled and Provisioned. For information about these two options, see Connector capacity.
  9. Choose a custom worker configuration and select msk-secrets-manager-worker-config. For information about creating custom worker configurations, see Workers.
  10. Next, specify the service execution role. Choose <StackName>- MSKConnectServiceIAMRole, which is created by the CloudFormation template.
  11. Choose Next, review the security information, then choose Next
  12. Specify the logging options that you want, then choose Next. For information about logging, see Logging for MSK Connect.
  13. Choose Create connector.

Amazon Redshift configuration

  1. Open the Amazon Redshift console.
  2. From the navigator pane, choose Editor, then Query editor V2. The query editor v2 opens in a new browser tab.
  3. Create connection with the AWS Secrets Manager authentication method to the Amazon Redshift database.
  4. Choose the secret, <StackName>-redshift-credentials—created by the CloudFormation—which contains database and sign-in credentials.RS-editor
  5. Create an external schema after updating the Amazon Redshift IAM role Amazon Resource Name (ARN) and MSK cluster ARN. These ARNs can also be retrieved from the Outputs section of the Cloudformation template.
    CREATE EXTERNAL SCHEMA myschema
    FROM MSK
    IAM_ROLE 'arn:aws:iam::<AWSAccountId>:role/<StackName>-RedshiftMSKRole'
    AUTHENTICATION IAM
    CLUSTER_ARN 'arn:aws:kafka:<Region>:<AWSAccountId>:cluster/<MSKClusterName>/xx-xxxxxxxxxxxx-xx';

    ext-schema

  6. Run the following SQL Statement to enable case sensitivity for database objects.
    SET enable_case_sensitive_identifier TO true;
  7. Switch back to SQL Management studio and insert records into the table patientinfo with CDC enabled.
    INSERT INTO patientinfo VALUES('Tom','davis','NJ', 5641434802,'7-1-2009')
    INSERT INTO patientinfo VALUES('Shelly','wilson','NY', 3701179724,'6-30-1988')
    INSERT INTO patientinfo VALUES('steph','williams','CT', 7927828402,'10-29-1998')
    INSERT INTO patientinfo VALUES('steph','williams','OH', 5699867456,'11-3-1995')
    INSERT INTO patientinfo VALUES('kevin','brown','NH', 4807085926,'4-19-1991')
    GO
  8. Go back to the Redshift Query editor V2 and create a materialized view with AUTO REFRESH set to YES. See Autorefreshing a materialized view for more information.
    CREATE MATERIALIZED VIEW msk_topic_patientinfo AUTO REFRESH YES AS
    SELECT "kafka_partition",
    "kafka_offset",
    "kafka_timestamp_type",
    "kafka_timestamp",
    "kafka_key",
    JSON_PARSE("kafka_value") as Data,
    "kafka_headers"
    FROM myschema."sampledb.sampledb.dbo.patientinfo";

  9. Refresh the view manually.
    REFRESH MATERIALIZED VIEW msk_topic_patientinfo;
  10. Query the data in the materialized view
    select data from msk_topic_patientinfo;
    select count(data) from msk_topic_patientinfo;

Using AWS Glue Streaming to transform (Optional)

You can also transform the incoming stream of records in real time using AWS Glue Streaming. You can create streaming ETL jobs that run continuously, consume data from both streaming sources Amazon Kinesis Data Streams and Amazon MSK. To learn more about configuring streaming jobs within AWS Glue, see Using a streaming data source. The following figure shows AWS Glue transforms and processes data ingested from an MSK cluster and writes it to various analytics targets.

Glue-workflow

Creating connections in AWS Glue Studio for MSK and Amazon Redshift.

  1. Open the AWS Glue Studio
  2. In the navigation pane under Data Catalog, choose Choose Create connection.Glue-Connector
  3. Select Apache Kafka as the data source and choose Next.Glue-datasource
  4. On the next screen, enter the details of MSK cluster. See Creating a Kafka connection for additional details to complete the step.
  5. Connection details: Choose Amazon managed Streaming for Apache Kafka (MSK) and select MSK Cluster.
  6. Authentication: Choose IAM Authentication.
  7. Encryption: Choose Require SSL Connection, if applicable.
  8. Network options: Choose the correct VPC, subnet and security groups.
  9. Repeat the same step for Amazon Redshift. Choose create connection and choose Amazon Redshift as the Data source and choose Next.
  10. On the next screen, select the provisioned Amazon Redshift cluster from the dropdown and secrets. See Redshift connections for additional details to complete the step.glue-config-2
  11. After completing the preceding steps, you should see both of the connections.Connector-1

Now, you can create a visual ETL job in AWS Glue Studio from scratch.

  1. Open the AWS Glue Studio console
  2. Choose ETL jobs from the navigation pane.
  3. Choose Visual ETL and configure the job specifying the source, transforms, and target.This post doesn’t provide the specific details of Kafka topics or Amazon Redshift table mapping configurations. Instead, it presents a high-level approach to help you understand the general process of using AWS Glue for data ingestion and transformation.
  4. In the following figure is an example of a custom transformation with AWS Glue managed transforms Change Schema (applying mapping) to add or remove columns.
    GS-2
  5. You can also use other AWS Glue managed transforms such as Join , SQL Query or Detect Sensitive data.
    GS-workflow
  6. Save and run the Glue Streaming job. For more information, see Managing ETL jobs with AWS Glue Studio
    GS-studio-3

Clean up

It’s a best practice to delete resources that you’re no longer using so you don’t incur unintended charges.

Complete the following steps to clean up the resources you created for this post:

  1. Open the Amazon S3 console and find the bucket that was created using the CloudFormation template.
  2. Delete all the folders and objects inside the buckets created by the CloudFormation template.
  3. Delete the S3 bucket.
  4. Delete the MSK Connector, Custom plugins, and Worker configuration.
  5. Go to the CloudFormation console and delete the stack that was used to create the resources.
  6. Delete the AWS Glue job.

Conclusion

In this post, we outlined the steps to configure real-time analytics for change data capture (CDC) from relational databases running on Amazon RDS for SQL Server. We demonstrated how to use a Debezium connector to stream the CDC data through Amazon MSK and refresh the data in an Amazon Redshift data warehouse. Implementing a streaming ETL approach reduces latency, enabling businesses to respond swiftly to changes with timely insights. To learn more about real-time streaming ETL, visit What is Real Time Data Streaming ? and What is Zero ETL?.


About the authors

Sudhir Amin is a Sr. Solutions Architect at Amazon Web Services based in New York. In his role, he provides architectural guidance and technical assistance to enterprise customers across different industry verticals, accelerating their cloud adoption. He’s a big fan of snooker, combat sports such as boxing and UFC, and loves traveling to countries with rich wildlife reserves where he gets to see world’s most majestic animals up close.

Vishal Srivastava is a Senior Technical Account Manager at AWS. He primarily collaborates with the financial services sector, assisting them through the process of adopting and leveraging cloud technologies. Apart from his professional commitments, he is an avid sports fan and enjoys traveling with his family.