AWS Database Blog

Continuously replicate Amazon DynamoDB changes to Amazon Aurora PostgreSQL using AWS Lambda

Amazon DynamoDB is a fully managed, serverless, key-value NoSQL database designed to run high-performance applications at any scale. Amazon Aurora is a MySQL and PostgreSQL-compatible relational database built for the cloud. Aurora combines the performance and availability of traditional enterprise databases with the simplicity and cost-effectiveness of open-source databases. Serverless technologies eliminate infrastructure management tasks like capacity provisioning and patching, and increase agility throughout your application stack.

In this post, we present a solution that handles real-time data changes at scale by replicating data from DynamoDB to Amazon Aurora PostgreSQL-Compatible Edition using Amazon DynamoDB Streams and AWS Lambda.

Use case

In our use case, a customer had legacy reporting applications, business intelligence (BI) tools, and a data warehouse running on their on-premises environment. As a long-term plan, they wanted to modernize their data warehouse to Amazon Redshift. In the meanwhile, to support downstream legacy reporting environment, they replicated data from DynamoDB to Amazon Aurora PostgreSQL-Compatible Edition, enabling users to run one-time and aggregation queries. Although it’s not a common pattern to replicate data from DynamoDB to Amazon Aurora PostgreSQL, they preferred bringing data to Amazon Aurora PostgreSQL, which temporarily allowed them to continue running their existing legacy applications while also enabling them to begin their Amazon Redshift migration journey.

Solution overview

With DynamoDB, you only need to define a partition key and optionally a sort key to work with your data. In contrast, with a relational database such as Amazon Aurora, you to define a table schema for every attribute you plan to work with. To replicate changes from a DynamoDB table, you use DynamoDB Streams to capture a time-ordered sequence of item-level modifications and store this information in a log for up to 24 hours. Replication will start only after DynamoDB streams are enabled. This means ,if you have existing data in DynamoDB table and if that needs to be replicated to Aurora, it should be handled through one-time load like DynamoDB data export to Amazon S3 or Exporting and importing DynamoDB data using AWS Data Pipeline. DynamoDB is schemaless, you are responsible for keeping the relational database structure up to date in case you add a new attribute to DynamoDB to avoid breaking the replication. While Aurora can handle hundreds-of-thousands of transactions per second (TPS), if the incoming TPS in DynamoDB is more than that, there could be latency on Aurora. It is important to understand the TPS requirement and align with latency SLA before the solution is implemented.

While working with customer, we also discussed the option of streaming data from DynamoDB to Aurora using Amazon Data Firehose. But the customer preferred to use DynamoDB Streams because it’s an out-of-the-box solution without additional cost and met the customer’s service level agreement (SLA) on data retention aligning within a 24-hour window.

The following diagram illustrates the solution architecture and workflow.

To enable the data replication between databases, you can complete the following high-level steps:

  1. Create a DynamoDB table.
  2. Configure DynamoDB to SQL table mapping.
  3. Enable DynamoDB Streams in the DynamoDB table.
  4. Create a Lambda function with Powertools for AWS Lambda for Amazon CloudWatch Logs and AWS Secrets Manager parameters.
  5. Configure a Lambda trigger for the DynamoDB stream.
  6. Validate DynamoDB changes in Amazon RDS.

Prerequisites

To follow this post, the following prerequisites are required:

Configure DynamoDB to SQL table mapping

The following table shows the mapping between the DynamoDB table and the SQL database. In our use case, we mapped one DynamoDB table to one Aurora PostgreSQL table.

DynamoDB Table (Employees) SQL Table (Employees)
Id (PrimaryKey), Type: Number Id (PrimaryKey), Type: Number
empName, Type: String Name, Typre: Varchar(20)
empDepartment, Type: String Department, Type: Varchar(10)

we use Id on both tables as the primary key.

DynamoDB SQL
INSERT INSERT
MODIFY UPDATE
REMOVE DELETE

Create a DynamoDB table

The following AWS CLI command creates a DynamoDB table called Employees with the partition key Id as a number:

aws dynamodb create-table \
    --table-name Employees \
    --attribute-definitions \
        AttributeName=Id,AttributeType=N \
    --key-schema \
        AttributeName=Id,KeyType=HASH \
    --provisioned-throughput \
        ReadCapacityUnits=5,WriteCapacityUnits=5 \
    --table-class STANDARD

The table is configured with provisioned throughput with 5 read capacity units (RCUs) and 5 write capacity units (WCUs). For more details on pricing of provisioned capacity, refer Pricing for Provisioned Capacity

Enable DynamoDB Streams on the DynamoDB table

To enable DynamoDB Streams, complete the following steps:

  1. On the DynamoDB console, navigate to the table you created and choose the Exports and streams tab.
  2. For DynamoDB stream details, choose Turn on.

When DynamoDB Streams is enabled, all Data Manipulation Language (DML) actions in the DynamoDB table are captured as an item in the stream.

  1. For View type, select New image to capture the new updated values, we use new values to replace in the destination.
  2. Choose Turn on stream.

Same can be done via CLI as well, following command enable streaming on employee table with New Image view type.

aws dynamodb update-table \
    --table-name Employees \
    --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE

Create a Lambda function for CloudWatch Logs and Secrets Manager parameters

In this use case, we use a Lambda function called SQL Replicator, which is invoked by DynamoDB Streams when there are data modifications in your DynamoDB table. This function replicates changes to Aurora PostgreSQL Serverless and its log is captured in CloudWatch Logs using Powertools for Lambda. The Lambda code is written in Python. We use the Psycopg database adapter for the PostgreSQL connection and Powertools for Lambda (Python) for the logger and secrets store.

Lambda role policy

The Lambda role is constructed with the following AWS managed policies:

You can create the destination RDS database secrets in Secrets Manager for consumption from Lambda function. For direct integration, refer to Improve security of Amazon RDS master database credentials using AWS Secrets Manager.

The following Python code is for the Lambda function that syncs data from DynamoDB to PostgreSQL. It includes the following actions:

  • Import necessary libraries like json, psycopg2, and aws_lambda_powertools
  • Initialize a logger from aws_lambda_powertools to log information
  • Retrieve RDS database credentials from Secrets Manager
  • Connect to the PostgreSQL database using psycopg2
  • For each record in the DynamoDB event, perform CRUD operations on PostgreSQL based on the event type (INSERT, MODIFY, REMOVE).
  • Run SQL queries using psycopg2 cursor to insert, update, and delete records in the PostgreSQL database
  • Log relevant information using the logger at each step
  • Verify the data synced by selecting records from PostgreSQL
  • Commit transactions at the end
# Library imports
import json
import psycopg2
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities import parameters

#Initialize Powertools Logger 
logger = Logger()

#instruct Logger to log the incoming event
@logger.inject_lambda_context(log_event=True)
#Lambda Handler 
def lambda_handler(event, context):
    try:
        # Retrieve the secret value from AWS Secrets Manager
        secret_value = json.loads(parameters.get_secret(name="dev/rds"))

        #DB Connect 
        mydb = psycopg2.connect(
        user=secret_value["username"], 
        password=secret_value["password"], 
        host=secret_value["host"], 
        dbname=secret_value["engine"], 
        port= secret_value["port"]
        )

        mycursor = mydb.cursor()

        # For every record in the event
        for record in event["Records"]:
            event_name = record["eventName"]
            #based on the record event 
            if event_name == "INSERT":
                logger.info("Inserting record", extra = { "record": record})
                sql_script = "INSERT INTO public.\"Employees\" VALUES (%s,%s,%s)"
                sql_value = (
                    int(record.get("dynamodb",{}).get("Keys",{}).get("Id",{}).get("N")),
                    record.get("dynamodb",{}).get("NewImage",{}).get("empName",{}).get("S","NA"),
                    record.get("dynamodb",{}).get("NewImage",{}).get("empDepartment",{}).get("S","NA"),
                )
                mycursor.execute(sql_script,sql_value)
                logger.info("Record inserted successfully into Employees table")

            elif event_name == "MODIFY":
                logger.info("Modifying record", extra = { "record": record})
                sql_script = "UPDATE public.\"Employees\" SET \"Name\" = %s, \"Department\" = %s WHERE \"Id\" = %s"
                sql_value = (
                    record.get("dynamodb",{}).get("NewImage",{}).get("empName",{}).get("S", record.get("dynamodb",{}).get("NewImage",{}).get("empName",{}).get("S","NA")),
                    record.get("dynamodb",{}).get("NewImage",{}).get("empDepartment",{}).get("S", record.get("dynamodb",{}).get("NewImage",{}).get("empDepartment",{}).get("S","NA")),
                    int(record.get("dynamodb",{}).get("Keys",{}).get("Id",{}).get("N")),
                )
                mycursor.execute(sql_script,sql_value)
                logger.info("Record modified successfully into Employees table")
            
            elif event_name == "REMOVE":
                logger.info("Removing record", extra = { "record": record})
                sql_script = "DELETE FROM public.\"Employees\" WHERE \"Id\" = %s"
                sql_value = (
                    int(record.get("dynamodb",{}).get("Keys",{}).get("Id",{}).get("N")),
                )
                mycursor.execute(sql_script,sql_value)
                logger.info("Record removed successfully from Employees table")
        
        #Verifying with the select the select statement        
        mycursor.execute('SELECT * FROM public."Employees"')
        records = mycursor.fetchall()
        mycursor.close()
        mydb.commit()

        logger.info("Received event: ", extra = { "records": records})

    except (Exception, psycopg2.Error) as error:
        logger.exception("Error while fetching data from PostgreSQL", extra= {"error": error})

In the function code, replace “dev/rds” with your secret’s name for Lambda to use for database authentication. For more information about creating the RDS database secret, refer to Create an AWS Secrets Manager database secret.

Secrets Manager secret value

The following is the secret value for your reference:

{"username":"admin","password":"xxxxxx","engine":"postgres","host":"database.cluster-abcdefghjklmn.us-east-1.rds.amazonaws.com","port":5432,"dbClusterIdentifier":"database-3"}

Because Amazon Aurora is deployed in a VPC, we attached the Lambda to the same VPC. We configured security group rules on both Lambda and Amazon Aurora to allow connectivity between them. For more information, refer to Using a Lambda function to access an Amazon RDS database. You can also refer to Troubleshooting for Amazon RDS for additional help.

Configure a Lambda trigger for the DynamoDB stream

With triggers, you can build applications that react to data modifications in DynamoDB tables. For more information about DynamoDB integration with Lambda, refer to DynamoDB Streams and AWS Lambda triggers.

The trigger will run a Lambda function, and if it returns an error, it retries the batch until it processes successfully or the data expires. You can also configure a Lambda function to retry with a smaller batch, limit the number of retries, and other options. For more information on batching, refer to Polling and batching streams.

The function trigger is configured with a batch size of 100 based on the DynamoDB DML volume.

Validate DynamoDB changes in Amazon RDS

With the DynamoDB stream and Lambda function set up, you can now validate the data delivery. You can use the AWS CLI or console to perform DynamoDB inserts, updates, and deletes. In this post, we provide AWS CLI sample code. You can use the PostgreSQL client to connect (for this post, we use pgAdmin) and validate the data.

Insert

Use the following code in the AWS CLI to perform an insert:

aws dynamodb put-item \
--table-name Employees --item "{\"Id\": {\"N\": \"2001\"},\"empDepartment\": {\"S\": \"AnyDept\"},\"empName\": {\"S\": \"Akua\"}}"

The following screenshot shows the inserted value in the table.

Update

Use the following code in the AWS CLI to perform an update:

aws dynamodb update-item --table-name Employees \
--key "{\"Id\": {\"N\": \"2001\"}}" \
--update-expression "SET  empDepartment = :newval" \
--expression-attribute-values "{\":newval\": {\"S\": \"ExDept\"}}"

The following screenshot shows the updated value in the table.

Delete

Use the following code in the AWS CLI to perform a delete:

aws dynamodb delete-item --table-name Employees --key "{\"Id\": {\"N\": \"2001\"}}"

The following screenshot shows that the record has been deleted.

Clean up

To avoid incurring charges, delete the resources you created as part of this post:

  1. Delete the Lambda function.
  2. Delete the DynamoDB table.
  3. Delete the Aurora PostgreSQL Serverless database.

Summary

In this post, you built an event-driven solution to continuously to replicate DynamoDB changes to Aurora using Lambda. This solution solved a customer problem by supporting the downstream legacy reporting workloads and enabled them to perform one-time and aggregation queries.

Try out the solution and if you have any comments or questions, leave them in the comments section.


About the Authors

Aravind Hariharaputran is Database Consultant with the Professional Services team at Amazon Web Services. He is passionate about databases in general with Microsoft SQL Server as his specialty. He helps build technical solutions that assist customers to migrate and optimize their on-premises database workload to the AWS Cloud. He enjoys spending time with family and playing cricket.


Sakthivel Chellapparimanam is Cloud Application Architect with AWS Professional Services at Amazon Web Services. He helps customer in building cloud applications and migrating applications to cloud.