AWS Database Blog

Capture changes from Amazon DocumentDB via AWS Lambda and publish them to Amazon MSK

When using a document data store as your service’s source of truth, you may need to share the changes of this source with other downstream systems. The data events that are happening within this data store can be converted to business events, which can then be sourced into multiple microservices that implement different business functionalities. Capturing the changes from data sources is called change data capture (CDC); you can implement it in different ways by different data technologies. In the case of Amazon DocumentDB (with MongoDB compatibility), you can implement CDC via change streams functionality. This feature simplifies the process to listen to committed changes to documents in a set of collections in real time. The events are also time-ordered within a stream, which makes the stream a reliable mechanism for state replication scenarios.

In this post, I show how you can capture changes from Amazon DocumentDB by using AWS Lambda implemented in NodeJS. After the Lambda function captures the change events, it publishes them to Amazon Managed Streaming for Apache Kafka (Amazon MSK).

Architecture

By completing the steps in this post, you can create a system that uses the architecture illustrated in the following image.

The flow of events starts when we make changes within a collection residing in the Amazon DocumentDB database. As the changes arrive, Amazon DocumentDB copies them into a change stream dedicated to that collection. A Lambda function connects to this change stream and polls these events. After the function filters out events other than insert, update, and delete, it publishes them to a Kafka topic in an MSK cluster.

A Lambda function is a stateless component, and it has a limited lifespan. Because the polling activity should be continuous, we need to run the Lambda function on a schedule. This architecture uses Amazon EventBridge to schedule the function to run every minute.

In this sample architecture, each Lambda function triggered by the EventBridge engine connects to Amazon DocumentDB and watches for changes for a predefined time period (15 seconds in this case). At the end of each poll cycle, the function writes the last polled resume token to another collection in the same Amazon DocumentDB database. This checkpoint mechanism allows Lambda functions to resume the polling activity without needing to replay all the events from the beginning of the stream.

This checkpointing mechanism should be in place even if we choose to use a long-running application using a virtual machine or container-based compute infrastructure. This is because if the underlying compute instance is restarted or scaled out, the new instance needs to have a starting point rather than process the whole history. A change stream can hold up to 7 days of information (determined by the change_stream_log_retention_duration parameter), which can translate to a significant number of change events for active applications.

For this post, we use Amazon DocumentDB version 4.0.

Deploy the stack

To deploy the sample architecture into your AWS environment, we use an AWS Serverless Application Model (AWS SAM) template.

The template creates the following resources in your account:

  • An Amazon DocumentDB cluster (version 4.0)
  • An MSK cluster
  • A Lambda function (function-documentdb-stream-processor) that polls the change streams event from the Amazon DocumentDB cluster and publishes them to the MSK cluster
  • An AWS Cloud9 environment, which allows you to configure source and destination systems and run your tests
  • A VPC and subnets
  • A NAT gateway and internet gateway
  • Other supporting resources such as security groups and AWS Identity and Access Management (IAM) roles

You will incur some costs after creating this environment.

  1. To start your deployment, clone the GitHub repository to your local machine and install and configure AWS SAM with a test IAM user. AWS SAM requires you to specify an Amazon Simple Storage Service (Amazon S3) bucket to hold the deployment artifacts. If you haven’t already created a bucket for this purpose, create one now. The bucket should be reachable by the IAM user you use for deploying AWS SAM packages.
  2. At the command line, navigate to the cloned GitHub repository’s folder and enter the following command to package the application:
    sam package --template template.yaml --output-template-file output_template.yaml --s3-bucket BUCKET_NAME_HERE

    Replace BUCKET_NAME_HERE with the name of the S3 bucket that holds the deployment artifacts.

    AWS SAM packages the application and copies it into the S3 bucket.

  3. When the AWS SAM package command finishes running, enter the following command to deploy the package:
    sam deploy --template-file output_template.yaml --stack-name Blogstack --capabilities CAPABILITY_IAM --parameter-overrides docDBUser=masterUsername docDBPass=masterPass docDBClusterName=docDBCluster mskClusterName=blog-msk-clstr

    In the preceding command, you can supply your own stack name by changing the stack-name parameter’s value. This template also allows you to provide the following input parameters and override their default values:

    • docDBUser
    • docDBPass
    • docDBClusterName
    • mskClusterName

When you run this command, AWS SAM shows the progress of the deployment. The deployment takes around 15 minutes and creates a main stack and a dependent stack for the AWS Cloud9 environment in AWS CloudFormation. You can also track the overall deployment status on the AWS CloudFormation console.

When the deployment is complete, AWS SAM outputs the following parameters, which you need while doing additional system configurations.

These parameters are also available on the AWS CloudFormation console, on the Outputs tab of the deployed stack named Blogstack.

Connecting to your AWS Cloud9 environment

An AWS Cloud9 environment is created for you automatically when you deploy the AWS SAM package. You need to further provision this environment with MongoDB and Kafka command line tools. To start provisioning your AWS Cloud9 environment, follow the URL that was provided by the Cloud9URL output parameter of the deployed CloudFormation stack.

When the environment starts, go to the terminal section.

Configure Amazon DocumentDB

You can now install mongo shell onto your AWS Cloud9 environment.

  1. Use the following commands in the terminal:
    echo -e "[mongodb-org-4.0] \nname=MongoDB Repository\nbaseurl= https://repo.mongodb.org/yum/amazon/2013.03/mongodb-org/4.0/x86_64/
    \ngpgcheck=1 \nenabled=1 \ngpgkey= https://www.mongodb.org/static/pgp/server-4.0.asc" | sudo tee /etc/yum.repos.d/mongodb-org-4.0.repo
    sudo yum install -y mongodb-org-shell

    You also need Amazon DocumentDB CA certificates to connect to your cluster. Use the following command to download the certificate to the current folder (~/environment):

    wget https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem
  2. Enter the following command connect to your cluster:
    mongo --ssl --host DOCUMENTDB_CLUSTER_ENDPOINT_HERE:27017 --sslCAFile rds-combined-ca-bundle.pem --username  DOCUMENTDB_USERNAME_HERE --password DOCUMENTDB_PASSWORD_HERE

    In the preceding command, provide the cluster endpoint of the Amazon DocumentDB cluster that was output from the AWS SAM installation. Also provide your username and password that you used during the sam deploy command.

  3. Create a database (blogdb): We create two collections in the database. The first collection is named blogcollection; we use it as the data source for the change stream integration.
  4. Use the following command to create the empty blogcollection:
    db.createCollection("blogcollection")
  5. Enable change stream on this collection by running the following adminCommand command:
    db.adminCommand({modifyChangeStreams: 1,
        database: "blogdb",
        collection: "blogcollection", 
        enable: true});

    You need to also enable change streams in the cluster’s parameter group before it can be used.

    You can enable Amazon DocumentDB change streams for all collections within a given database, or only for selected collections.

  6. We use the second collection, checkpoints, to store the checkpoint document that holds the last processed resume token:
    db.checkpoints.insert({_id: 1, checkpoint: 0})	
  7. You can now issue the exit command to exit the mongo shell and continue with the next step:
    exit

Configure the MSK cluster

To configure the MSK cluster, you need to install Kafka into your AWS Cloud9 environment.

  1. Use the following commands in your AWS Cloud9 terminal to download Kafka from the source, extract it, and navigate to the bin folder:
    wget https://apache.mirror.colo-serv.net/kafka/2.7.0/kafka_2.13-2.7.0.tgz
    tar -xzf kafka_2.13-2.7.0.tgz
    cd kafka_2.13-2.7.0/bin

    Kafka binaries we use in this post require Java 8 or later versions. Check your environment’s Java version with the following command:

    java -version

    If you see a version below 1.8, issue the below commands to upgrade it to Java 8.

    sudo yum -y install java-1.8.0-openjdk-devel
    sudo alternatives --config java

    Select the 1.8 versions from the list.

  2. Find the bootstrap servers of your MSK cluster: To find the bootstrap server hostnames for your MSK cluster, navigate to the Amazon MSK console and choose your cluster. In the Cluster summary pane on the Details tab, choose View client information and copy the bootstrap servers host/port pairs.
  3. Within Kafka installation’s bin directory, issue the following command to create a topic to hold the events published by function-documentdb-stream-processor:
    sudo ./kafka-topics.sh --create --topic blog-events --replication-factor 1 --partitions 1 --bootstrap-server MSK_BOOTSTRAP_SERVERS_HERE

    Replace MSK_BOOTSTRAP_SERVERS_HERE with the value of the host/port pairs from the previous step.

Test the solution

To test the setup from end to end, you need to open a second terminal in your AWS Cloud9 environment.

  1. On the Window menu, choose New Terminal.
  2. In the first terminal, make sure you’re in the bin folder of the Kafka installation and issue the following command to start listening to the records in the Kafka topic:
    sudo ./kafka-console-consumer.sh --topic blog-events --from-beginning --bootstrap-server MSK_BOOTSTRAP_SERVERS_HERE

    As before, provide the value of the bootstrap server host/port pairs.

  3. In the second terminal, use mongo shell to connect to the Amazon DocumentDB cluster the same way you did earlier.
  4. Issue the following command to insert a document into blogdb.blogcollection:
    use blogdb;
    db.blogcollection.insert({"title" : "Blog Title 1"})
  5. Add another document with the following command:
    db.blogcollection.insert({"title" : "Blog Title 2"})
  6. In the first terminal, observe the changes on the Kafka topic as you add different documents to the collection.

Cleanup

To clean up the resources you used in your account, delete the stack from the AWS CloudFormation console.

You can also delete the bucket you used for packaging and deploying the AWS SAM application.

Conclusion

This architecture shows how to capture state changes from Amazon DocumentDB via its change streams functionality and send them to Amazon MSK. You can adapt similar architectures to apply to other use cases, such as query segregation, event sourcing, data duplication, and more.

For more information about the stream’s functionality and other integrations, see Run full text search queries on Amazon DocumentDB (with MongoDB compatibility) data with Amazon OpenSearch Service and Using Change Streams with Amazon DocumentDB.

If you have any questions or comments about this post, please share them in the comments. If you have any feature requests for Amazon DocumentDB, email us at documentdb-feature-request@amazon.com


About the author

Murat Balkan is an AWS Solutions Architect based in Toronto. He helps customers across Canada to transform their businesses and build industry leading solutions on AWS.