AWS Big Data Blog

Break data silos and stream your CDC data with Amazon Redshift streaming and Amazon MSK

Data loses value over time. We hear from our customers that they’d like to analyze the business transactions in real time. Traditionally, customers used batch-based approaches for data movement from operational systems to analytical systems. Batch load can run once or several times a day. A batch-based approach can introduce latency in data movement and reduce the value of data for analytics. Change Data Capture (CDC)-based approach has emerged as alternative to batch-based approaches. A CDC-based approach captures the data changes and makes them available in data warehouses for further analytics in real-time.

CDC tracks changes made in source database, such as inserts, updates, and deletes, and continually updates those changes to target database. When the CDC is high-frequency, the source database is changing rapidly, and the target database (i.e., usually a data warehouse) needs to reflect those changes in near real-time.

With the explosion of data, the number of data systems in organizations has grown. Data silos causes data to live in different sources, which makes it difficult to perform analytics.

To gain deeper and richer insights, you can bring all the changes from different data silos into one place, like data warehouse. This post showcases how to use streaming ingestion to bring data to Amazon Redshift.

Redshift streaming ingestion provides low latency, high-throughput data ingestion, which enables customers to derive insights in seconds instead of minutes. It’s simple to set up, and directly ingests streaming data into your data warehouse from Amazon Kinesis Data Streams and Amazon Managed Streaming for Kafka (Amazon MSK) without the need to stage in Amazon Simple Storage Service (Amazon S3). You can create materialized views using SQL statements. After that, using materialized-view refresh, you can ingest hundreds of megabytes of data per second.

Solution overview

In this post, we create a low-latency data replication between Amazon Aurora MySQL to Amazon Redshift Data Warehouse, using Redshift streaming ingestion from Amazon MSK. Using Amazon MSK, we securely stream data with a fully managed, highly available Apache Kafka service. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. We store CDC events in Amazon MSK, for a set duration of time, which makes it possible to deliver CDC events to additional destinations such as Amazon S3 data lake.

We deploy Debezium MySQL source Kafka connector on Amazon MSK Connect. Amazon MSK Connect makes it easy to deploy, monitor, and automatically scale connectors that move data between Apache Kafka clusters and external systems such as databases, file systems, and search indices. Amazon MSK Connect is a fully compatible with Apache Kafka Connect, which enables you to lift and shift your Apache Kafka Connect applications with zero code changes.

This solution uses Amazon Aurora MySQL hosting the example database salesdb. Users of the database can perform the row-level INSERT, UPDATE, and DELETE operations to produce the change events in the example salesdb database. Debezium MySQL source Kafka Connector reads these change events and emits them to the Kafka topics in Amazon MSK. Amazon Redshift then read the messages from the Kafka topics from Amazon MSK using Amazon Redshift Streaming feature. Amazon Redshift stores these messages using materialized views and process them as they arrive.

You can see how CDC performs create event by looking at this example here. We are going to use OP field – its mandatory string describes the type of operation that caused the connector to generate the event, in our solution for processing. In this example, c indicates that the operation created a row. Valid values for OP field are:

  • c = create
  • u = update
  • d = delete
  • r = read (applies to only snapshots)

The following diagram illustrates the solution architecture:

This image shows the architecture of the solution. we are reading from Amazon Aurora using the Debezium connector for MySQL. Debezium Connector for MySQL is deployed on Amazon MSK Connect and ingesting the events inside Amazon MSK which are being ingested further to Amazon Redshift MV

The solution workflow consists of the following steps:

  • Amazon Aurora MySQL has a binary log (i.e., binlog) that records all operations(INSERT, UPDATE, DELETE) in the order in which they are committed to the database.
  • Amazon MSK Connect runs the source Kafka Connector called Debezium connector for MySQL, reads the binlog, produces change events for row-level INSERT, UPDATE, and DELETE operations, and emits the change events to Kafka topics in amazon MSK.
  • An Amazon Redshift-provisioned cluster is the stream consumer and can read messages from Kafka topics from Amazon MSK.
  • A materialized view in Amazon Redshift is the landing area for data read from the stream, which is processed as it arrives.
  • When the materialized view is refreshed, Amazon Redshift compute nodes allocate a group of Kafka partition to a compute slice.
  • Each slice consumes data from the allocated partitions until the view reaches parity with last Offset for the Kafka topic.
  • Subsequent materialized view refreshes read data from the last offset of the previous refresh until it reaches parity with the topic data.
  • Inside the Amazon Redshift, we created stored procedure to process CDC records and update target table.

Prerequisites

This post assumes you have a running Amazon MSK Connect stack in your environment with the following components:

  • Aurora MySQL hosting a database. In this post, you use the example database salesdb.
  • The Debezium MySQL connector running on Amazon MSK Connect, which connects Amazon MSK in your Amazon Virtual Private Cloud (Amazon VPC).
  • Amazon MSK cluster

If you don’t have an Amazon MSK Connect stack, then follow the instructions in the MSK Connect lab setup and verify that your source connector replicates data changes to the Amazon MSK topics.

You should provision the Amazon Redshift cluster in same VPC of Amazon MSK cluster. If you haven’t deployed one, then follow the steps here in the AWS Documentation.

We use AWS Identity and Access Management (AWS IAM) authentication for communication between Amazon MSK and Amazon Redshift cluster. Please make sure you have created an AWS IAM role with a trust policy that allows your Amazon Redshift cluster to assume the role. For information about how to configure the trust policy for the AWS IAM role, see Authorizing Amazon Redshift to access other AWS services on your behalf. After it’s created, the role should have the following AWS IAM policy, which provides permission for communication with the Amazon MSK cluster.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "MSKIAMpolicy",
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:Connect"
            ],
            "Resource": [
                "arn:aws:kafka:*:0123456789:cluster/xxx/xxx",
                "arn:aws:kafka:*:0123456789:topic/*/*/*"
            ]
        },
        {
            "Sid": "MSKPolicy",
            "Effect": "Allow",
            "Action": [
                "kafka:GetBootstrapBrokers"
            ],
            "Resource": "arn:aws:kafka:*:0123456789:cluster/xxx/xxx"
        }
    ]
}

Please replace the ARN containing xxx from above example policy with your Amazon MSK cluster’s ARN.

  • Also, verify that Amazon Redshift cluster has access to Amazon MSK cluster. In Amazon Redshift Cluster’s security group, add the inbound rule for MSK security group allowing port 9098. To see how to manage redshift cluster security group, refer Managing VPC security groups for a cluster.

image shows, how to add the inbound rule for MSK security group allowing port 9098, In Amazon Redshift Cluster’s security group

  • And, in the Amazon MSK cluster’s security group add the inbound rule allowing port 9098 for leader IP address of your Amazon Redshift Cluster, as shown in the following diagram. You can find the IP address for your Amazon Redshift Cluster’s leader node on properties tab of Amazon Redshift cluster from AWS Management Console.

image shows how to add the inbound rule allowing port 9098 for leader IP address of your Amazon Redshift Cluster,in the Amazon MSK cluster’s security group

Walkthrough

Navigate to the Amazon Redshift service from AWS Management Console, then set up Amazon Redshift streaming ingestion for Amazon MSK by performing the following steps:

  1. Enable_case_sensitive_identifier to true – In case you are using default parameter group for Amazon Redshift Cluster, you won’t be able to set enable_case_sensitive_identifier to true. You can create new parameter group with enable_case_sensitive_identifier to true and attach it to Amazon Redshift cluster. After you modify parameter values, you must reboot any clusters that are associated with the modified parameter group. It may take few minutes for Amazon Redshift cluster to reboot.

This configuration value that determines whether name identifiers of databases, tables, and columns are case sensitive. Once done, please open a new Amazon Redshift Query Editor V2, so that config changes we made are reflected, then follow next steps.

  1. Create an external schema that maps to the streaming data source.
CREATE EXTERNAL SCHEMA MySchema
FROM MSK
IAM_ROLE 'arn:aws:iam::YourRole:role/msk-redshift-streaming'
AUTHENTICATION IAM
CLUSTER_ARN 'arn:aws:kafka:us-east-1:2073196*****:cluster/MSKCluster-msk-connect-lab/849b47a0-65f2-439e-b181-1038ea9d4493-10'; // Replace last part with your cluster ARN, this is just for example.//

Once done, verify if you are seeing below tables created from MSK Topics:

image shows tables created from MSK Topics

  1. Create a materialized view that references the external schema.
CREATE MATERIALIZED VIEW customer_debezium AUTO REFRESH YES AS
SELECT
*,
json_parse(kafka_value) as payload
from
"dev"."myschema"."salesdb.salesdb.CUSTOMER" ; // Replace myshecma with name you have given to your external schema in step 2 //

Now, you can query newly created materialized view customer_debezium using below command.

SELECT * FROM "dev"."public"."customer_debezium" order by refresh_time desc;

Check the materialized view is populated with the CDC records

  1. REFRESH MATERIALIZED VIEW (optional). This step is optional as we have already specified AUTO REFRESH AS YES while creating MV (materialized view).
REFRESH MATERIALIZED VIEW "dev"."public"."customer_debezium";

NOTE: Above the materialized view is auto-refreshed, which means if you don’t see the records immediately, then you have wait for few seconds and rerun the select statement. Amazon Redshift streaming ingestion view also comes with the option of a manual refresh, which allow you to manually refresh the object. You can use the following query that pulls streaming data to Redshift object immediately.

SELECT * FROM "dev"."public"."customer_debezium" order by refresh_time desc;

images shows records from the customer_debezium MV

Process CDC records in Amazon Redshift

In following steps, we create the staging table to hold the CDC data, which is target table that holds the latest snapshot and stored procedure to process CDC records and update in target table.

  1. Create staging table: The staging table is a temporary table that holds all of the data that will be used to make changes to the target table, including both updates and inserts.
CREATE TABLE public.customer_stg (
customer_id character varying(256) ENCODE raw
distkey
,
customer_name character varying(256) ENCODE lzo,
market_segment character varying(256) ENCODE lzo,
ts_ms bigint ENCODE az64,
op character varying(2) ENCODE lzo,
record_rank smallint ENCODE az64,
refresh_time timestamp without time zone ENCODE az64
) DISTSTYLE KEY
SORTKEY
(customer_id); // In this particular example, we have used LZO encoding as LZO encoding works well for CHAR and VARCHAR columns that store very long character strings. You can use BYTEDICT as well if it matches your use case. //
  1. Create target table

We use customer_target table to load the processed CDC events.

CREATE TABLE public.customer_target (
customer_id character varying(256) ENCODE raw
distkey
,
customer_name character varying(256) ENCODE lzo,
market_segment character varying(256) ENCODE lzo,
refresh_time timestamp without time zone ENCODE az64
) DISTSTYLE KEY
SORTKEY
(customer_id);
  1. Create Last_extract_time debezium table and Inserting Dummy value.

We need to store the timestamp of last extracted CDC events. We use of debezium_last_extract table for this purpose. For initial record we insert a dummy value, which enables us to perform a comparison between current and next CDC processing timestamp.

CREATE TABLE public.debezium_last_extract (
process_name character varying(256) ENCODE lzo,
latest_refresh_time timestamp without time zone ENCODE az64
) DISTSTYLE AUTO;

Insert into public.debezium_last_extract VALUES ('customer','1983-01-01 00:00:00');

SELECT * FROM "dev"."public"."debezium_last_extract";
  1. Create stored procedure

This stored procedure processes the CDC records and updates the target table with the latest changes.

CREATE OR REPLACE PROCEDURE public.incremental_sync_customer()

LANGUAGE plpgsql

AS $$

DECLARE

sql VARCHAR(MAX) := '';

max_refresh_time TIMESTAMP;

staged_record_count BIGINT :=0;

BEGIN

-- Get last loaded refresh_time number from target table

sql := 'SELECT MAX(latest_refresh_time) FROM debezium_last_extract where process_name =''customer'';';

EXECUTE sql INTO max_refresh_time;

-- Truncate staging table

EXECUTE 'TRUNCATE customer_stg;';

-- Insert (and transform) latest change record for member with sequence number greater than last loaded sequence number into temp staging table

EXECUTE 'INSERT INTO customer_stg ('||

'select coalesce(payload.after."CUST_ID",payload.before."CUST_ID") ::varchar as customer_id,payload.after."NAME"::varchar as customer_name,payload.after."MKTSEGMENT" ::varchar as market_segment, payload.ts_ms::bigint,payload."op"::varchar, rank() over (partition by coalesce(payload.after."CUST_ID",payload.before."CUST_ID")::varchar order by payload.ts_ms::bigint desc) as record_rank, refresh_time from CUSTOMER_debezium where refresh_time > '''||max_refresh_time||''');';

sql := 'SELECT COUNT(*) FROM customer_stg;';

EXECUTE sql INTO staged_record_count;

RAISE INFO 'Staged member records: %', staged_record_count;

// replace customer_stg with your staging table name //

-- Delete records from target table that also exist in staging table (updated/deleted records)

EXECUTE 'DELETE FROM customer_target using customer_stg WHERE customer_target.customer_id = customer_stg.customer_id';

// replace customer_target with your target table name //

-- Insert all records from staging table into target table

EXECUTE 'INSERT INTO customer_target SELECT customer_id,customer_name, market_segment, refresh_time FROM customer_stg where record_rank =1 and op <> ''d''';

-- Insert max refresh time to control table

EXECUTE 'INSERT INTO debezium_last_extract SELECT ''customer'', max(refresh_time) from customer_target ';

END

$$

images shows stored procedure with name incremental_sync_customer created in above step

Test the solution

Update example salesdb hosted on Amazon Aurora

  1. This will be your Amazon Aurora database and we access it from Amazon Elastic Compute Cloud (Amazon EC2) instance with Name= KafkaClientInstance.
  2. Please replace the Amazon Aurora endpoint with value of your Amazon Aurora endpoint and execute following command and the use salesdb.
mysql -f -u master -h mask-lab-salesdb.xxxx.us-east-1.rds.amazonaws.com --password=S3cretPwd99

image shows the details of the RDS for MySQL
  1. Do an update, insert , and delete in any of the tables created. You can also do update more than once to check the last updated record later in Amazon Redshift.

image shows the insert, updates and delete operations performed on RDS for MySQL

  1. Invoke the stored procedure incremental_sync_customer created in the above steps from Amazon Redshift Query Editor v2. You can manually run proc using following command or schedule it.
call incremental_sync_customer();
  1. Check the target table for latest changes. This step is to check latest values in target table. You’ll see that all the updates and deletes that you did in source table are shown at top as a result order by refresh_time.
SELECT * FROM "dev"."public"."customer_target" order by refresh_time desc;

image shows the records from from customer_target table in descending order

Extending the solution

In this solution, we showed CDC processing for the customer table, and you can use the same approach to extend it to other tables in the example salesdb database or add more databases to MSK Connect configuration property database.include.list.

Our proposed approach can work with any MySQL source supported by Debezium MySQL source Kafka Connector. Similarly, to extend this example to your workloads and use-cases, you need to create the staging and target tables according to the schema of the source table. Then you need to update the coalesce(payload.after."CUST_ID",payload.before."CUST_ID")::varchar as customer_id statements with the column names and types in your source and target tables. Like in example stated in this post, we used LZO encoding as LZO encoding, which works well for CHAR and VARCHAR columns that store very long character strings. You can use BYTEDICT as well if it matches your use case. Another consideration to keep in mind while creating target and staging tables is choosing a distribution style and key based on data in source database. Here we have chosen distribution style as key with Customer_id, which are based on source data and schema update by following the best practices mentioned here.

Cleaning up

  1. Delete all the Amazon Redshift clusters
  2. Delete Amazon MSK Cluster and MSK Connect Cluster
  3. In case you don’t want to delete Amazon Redshift clusters, you can manually drop MV and tables created during this post using below commands:
drop MATERIALIZED VIEW customer_debezium;
drop TABLE public.customer_stg;
drop TABLE public.customer_target;
drop TABLE public.debezium_last_extract;

Also, please remove inbound security rules added to your Amazon Redshift and Amazon MSK Clusters, along with AWS IAM roles created in the Prerequisites section.

Conclusion

In this post, we showed you how Amazon Redshift streaming ingestion provided high-throughput, low-latency ingestion of streaming data from Amazon Kinesis Data Streams and Amazon MSK into an Amazon Redshift materialized view. We increased speed and reduced cost of streaming data into Amazon Redshift by eliminating the need to use any intermediary services.

Furthermore, we also showed how CDC data can be processed rapidly after generation, using a simple SQL interface that enables customers to perform near real-time analytics on variety of data sources (e.g., Internet-of-Things [ IoT] devices, system telemetry data, or clickstream data) from a busy website or application.

As you explore the options to simplify and enable near real-time analytics for your CDC data,

We hope this post provides you with valuable guidance. We welcome any thoughts or questions in the comments section.


About the Authors

Umesh Chaudhari is a Streaming Solutions Architect at AWS. He works with AWS customers to design and build real time data processing systems. He has 13 years of working experience in software engineering including architecting, designing, and developing data analytics systems.

Vishal Khatri is a Sr. Technical Account Manager and Analytics specialist at AWS. Vishal works with State and Local Government helping educate and share best practices with customers by leading and owning the development and delivery of technical content while designing end-to-end customer solutions.