AWS Database Blog

Run complex queries on massive amounts of data stored on your Amazon DocumentDB clusters using Apache Spark running on Amazon EMR

In this post, we demonstrate how to set up Amazon EMR to run complex queries on massive amounts of data stored in your Amazon DocumentDB (with MongoDB compatibility) clusters using Apache Spark.

Amazon DocumentDB (with MongoDB compatibility) is a fully managed native JSON document database that makes it easy and cost effective to operate critical document workloads at virtually any scale without managing infrastructure. You can use the same application code written using MongoDB API (versions 3.6, 4.0, and 5.0) compatible drivers, and tools to run, manage, and scale workloads on Amazon DocumentDB without worrying about managing the underlying infrastructure. As a document database, Amazon DocumentDB makes it straightforward to store, query, and index JSON data.

Apache Spark is an open source, distributed processing system used for big data workloads. It uses in-memory caching and optimized query runs for fast analytic queries against data of any size. Spark provides an interface for programming clusters with implicit data parallelism and fault tolerance.

Amazon EMR is the industry-leading cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning (ML) using open source frameworks such as Apache Spark, Apache Hive, and Presto.

Based on your cluster usage, your Amazon DocumentDB storage can grow, up to 128 TiB. With Amazon DocumentDB elastic clusters, storage can grow up to 4 PiB. In some use cases, you may need to run an analytics workload on this massive amount of data stored in one or more Amazon DocumentDB clusters, often with ad hoc query patterns or the need to scan large volumes of data. Apache Spark is a great choice for running ad hoc queries on such massive data, especially when they require memory-intensive non-indexed field filtering, multiple stages of processing, fault tolerance between stages, and joining multiple data sources. Because Amazon DocumentDB is MongoDB API compatible, you can run these data jobs on Amazon DocumentDB using the MongoDB Spark connector.

Some examples where Apache Spark where you can use to run data jobs against Amazon DocumentDB are the following:

  1. Data analytics teams wants to augment ad-hoc reports with unstructured unindexed data stored in DocumentDB without replicating the data to another storage system.
  2. Data science teams want to run complex DS/ML pipelines using Apache Spark and subsequently write the results to a downstream storage.
  3. Data analytics team might want to combine multiple sources of data, for example live data from Amazon DocumentDB and archived data from Amazon Simple Storage Service (Amazon S3), to generate unified reports.

All of these jobs have some common characteristics:

  1. They need computation to be done over large volumes of data that don’t fit into memory of a single machine
  2. They all need ad-hoc access patterns not always supported by the indices created in Amazon DocumentDB for the application usage
  3. To complete successfully within acceptable run-time, they need to be executed in a distributed environment with ample compute capacity and failure recovery mechanism in place.

Solution overview

In the following sections, we show how to create an Amazon DocumentDB cluster and load data, and create and configure an EMR cluster with Apache Spark. After the solution is in place, you can run a sample Spark application and run a sample query.

architecture diagram

Prerequisites

We assume that you have the following prerequisites:

  • Basic service knowledge regarding operating and developing applications with Amazon DocumentDB and Amazon EMR
  • The appropriate permissions to create, delete, modify the service and resources we will be using in your AWS account
  • An AWS Keypair

This solution involves setting up and using AWS resources, so it will incur costs in your account. Refer to AWS Pricing for more information. We strongly recommend that you set this up in a non-production instance and run end-to-end validations before you implement this solution in a production environment.

Create an Amazon DocumentDB cluster

You can use an existing instance-based cluster or create a new Amazon DocumentDB instance-based cluster. You can also use Amazon DocumentDB elastic clusters that elastically scale to handle millions of reads and writes per second with petabytes of storage.

Load data into the Amazon DocumentDB cluster

To load data into the Amazon DocumentDB cluster, use the following code from an Amazon Elastic Compute Cloud (Amazon EC2) instance that can connect to Amazon DocumentDB and has mongorestore utility installed(this data was generated using mongodump version 100.9.4):

  1. Download the Amazon DocumentDB Certificate Authority (CA) certificate required to authenticate to your cluster
    wget https://truststore.pki.rds.amazonaws.com/global/global-bundle.pem
  2. Download the data.gz file from GitHub location. This file contains mock data generated using Steps to run NoSQLBench against Amazon DocumentDB.
  3. Run the following mongorestore command to load the data to your Amazon DocumentDB cluster. This loads 1 Million documents.
mongorestore --gzip --uri="<<documentdb_uri>>" --nsInclude="test_emr_db.test_emr_coll" --drop --numInsertionWorkersPerCollection=10 --archive=data.gz

You should see the following message if this command runs successfully.

data load completed

The following is a sample document snippet:

{
        "_id" : "18566647",
        "user_id" : "3dcc6cbb-4864-493b-a8cb-f35b06c09a0c",
        "created_on" : 1414491227,
        "gender" : "F",
        "full_name" : "Anthony Gammon",
        "married" : true,
        "address" : {
                "primary" : {
                        "city" : "Port Gamble",
                        "cc" : "VC"
                },
                "secondary" : {
                }
        } - - - - - -

Create IAM Roles for EMR

If this is the first time, you’re launching an EMR cluster, then create the default IAM service roles, which will be required to launch our EMR Cluster, using the following aws cli command

aws emr create-default-role

The command creates the following two IAM roles

  1. EMR_DefaultRole
  2. EMR_EC2_DefaultRole

Create an EMR cluster with Apache Spark

In this section, we walk through the steps to create and configure an EMR cluster with Apache Spark.

Choose the MongoDB Spark connector

The MongoDB Spark connector version 10.2.x uses the $collStats operator to create default read partitions. Because Amazon DocumentDB doesn’t support this operator as of this writing, you need to use the MongoDB Spark connector version 3.0.2. Note that you can also write your own partitioner class with a higher version of the Spark connector, but that is beyond the scope of this post.

Choose the Apache Spark version

The MongoDB Spark connector version 3.0.2 supports Spark versions 3.1.x. Therefore, you need to choose an Amazon EMR version that supports this version of Spark. Amazon EMR version 6.5.0 is the latest version of Amazon EMR, as per the chart provided in Application versions in Amazon EMR 6.x releases.

Prepare and upload the bootstrap script to load CA certs to the default Java trust store

When connecting to a TLS-enabled Amazon DocumentDB cluster from a Java Spark application on an EMR cluster, the Spark driver node as well as each of the Spark worker nodes must have the certificates uploaded to the default Java trust store, as described in Connecting Programmatically to Amazon DocumentDB. To achieve that, you need to create a script file and upload it to an Amazon Simple Storage Service (Amazon S3) bucket. You provide the S3 URI of this file as a bootstrap script while creating the EMR cluster.

Complete the following steps:

  1. Create an S3 bucket using the instructions in Store and Retrieve a File with Amazon S3.
  2. Create a file in your local machine and name it docdbcerts.sh. Copy the following content into the file and save it.
    mydir=/tmp/
    truststore=/etc/pki/java/cacerts
    storepassword=changeit
    curl -sS "https://truststore.pki.rds.amazonaws.com/global/global-bundle.pem" > ${mydir}/global-bundle.pem
    awk 'split_after == 1 {n++;split_after=0} /-----END CERTIFICATE-----/ {split_after=1}{print > "rds-ca-" n ".pem"}' < ${mydir}/global-bundle.pem
    for CERT in rds-ca-*; do
      alias=$(openssl x509 -noout -text -in $CERT | perl -ne 'next unless /Subject:/; s/.*(CN=|CN = )//; print')
      echo "Importing $alias"
      sudo keytool -import -file ${CERT} -alias "${alias}" -storepass ${storepassword} -keystore ${truststore} -noprompt
      rm $CERT
    done
    rm ${mydir}/global-bundle.pem
    echo "Trust store content is: "
    keytool -list -v -keystore "$truststore" -storepass ${storepassword} | grep Alias | cut -d " " -f3- | while read alias
    do
       expiry=`keytool -list -v -keystore "$truststore" -storepass ${storepassword} -alias "${alias}" | grep Valid | perl -ne 'if(/until: (.*?)\n/) { print "$1\n"; }'`
       echo " Certificate ${alias} expires in '$expiry'"
    done
  3. Upload the docdbcerts.sh file to the S3 bucket using the instructions in Store and Retrieve a File with Amazon S3.
  4. Note the S3 URI of the file that you uploaded.

Create an EMR cluster

Complete the following steps to create your EMR cluster:

  1. On the Amazon EMR console, choose Create cluster.
    amazon emr landing page
  2. For Name, enter docdb-spark-emr-cluster.
  3. For Amazon EMR release, choose emr-6.5.0.
  4. For Application bundle, keep the default Spark software.
    amazon emr application bundle
  5. In the Node configuration section, change the value for EBS root volume to 50 GB. This is just an example – depending on your job local storage requirement, you will need to adjust the value.
    set instance storage
  6. In the Cluster scaling and provisioning section, set the core and task instance size to 3.
    scaling and provisioning rules
  7. In the Networking section, for Virtual private cloud (VPC), enter you Amazon DocumentDB cluster VPC. Choose an existing public subnet or create one.
    network settings
  8. Choose Add to add a bootstrap action.
  9. For Name, enter a name.
  10. For Script location, enter the S3 URI of the bootstrap script you uploaded earlier.
  11. Choose Add bootstrap action.
    bootstrap actions
  12. For Amazon EC2 key pair for SSH into the cluster, enter a key pair to be able to log in to the primary node of the EMR cluster.
    ssh key selection
  13. In the Identity and Access Management (IAM) roles section, select EMR_DefaultRole for Amazon EMR service role and EMR_EC2_DefaultRole for EC2 instance profile for Amazon EMR.
    IAM roles
  14. Leave the remaining settings as default and choose Create cluster.
    creating cluster

You can monitor the status of your cluster on the Amazon EMR console.

  1. Choose the cluster ID, scroll down to Network and security, and expand EC2 security groups (firewall).
  2. Note down the EMR managed security groups for the primary node and core and task nodes.
    cluster created

Modify security groups

Complete the following steps to modify your security groups:

  1. Add inbound rules of your Amazon DocumentDB cluster to allow incoming traffic from the security groups you noted earlier on port 27017.
  2. Add an inbound rule to the Primary node security group to allow SSH from your local machine IP.

Run a sample Spark application

Complete the following steps to run a sample Spark application:

  1. SSH to the primary node of the EMR cluster.
    get ssh detailssuccessful ssh
  2. Launch the Spark shell with the MongoDB Spark connector:
    spark-shell --conf "spark.mongodb.output.uri=<<docdb_uri>>" --conf "spark.mongodb.input.uri=<<docdb_uri>>" --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.2

The Amazon DocumentDB URI that you provide to Spark should contain the default database and collection name. Also, remove the tlscafile option from the uri – Java looks up TLS certificates from the truststore we populated during bootstrap actions. In our case, it would look like the following code:

mongodb://<< documentdb _user>>:<< documentdb _password>>@<< documentdb _cluster_endpoint>>:27017/test_emr_db.test_emr_coll?tls=true&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false

Run a query

Let’s find the distribution of the data over the cities identified by the nested field address.primary.city. This a straightforward query for demonstration purposes; in real-world applications, you would use Spark to run much more complex and compute-intensive scenarios. Spark is an effective tool when the query requires distributed computing on large volumes of data requiring resources (CPU and memory) that don’t fit in a single machine.

  1. Import the mongodb Spark library:
    import com.mongodb.spark._
  2. Create a Spark data frame abstracting the database and collection provided while launching the Spark shell (in the Amazon DocumentDB URI):
    val df = MongoSpark.load(spark)
  3. Run the following code to view the structure of data as interpreted by Spark:
    df.printSchema()

    data frame schema

  4. Before you run SQL queries on your dataset, you must register a temporary view for the dataset:
    df.createOrReplaceTempView("data")
  5. Run a query on the view:
    val city_data = spark.sql("SELECT address.primary.city,count(*)FROM data group by address.primary.city order by count (*) ")
    
    city_data.show()

After the task is complete, the results will be displayed. Note that, by default, only 20 rows are displayed, but you can override this setting.

results

In this section we showed you how to use distributed query processing capabilities of Apache Spark running on an EMR cluster, to execute a query on the data stored in Amazon DocumentDB. As next step, identify the analytics jobs you run on your large Amazon DocumentDB datasets and run them using the approach we described. This exercise will help you understand the potential benefits of running these jobs using Spark.

Clean up

Complete the following steps to clean up your resources:

  1. Terminate the EMR cluster.
  2. Remove the security group settings from the Amazon DocumentDB cluster that allows incoming traffic from the EMR cluster.
  3. Optionally, delete the Amazon DocumentDB test data. Use the following commands from mongo shell
    use test_emr_db
    
    db.dropDatabase()
  4. Optionally, delete the Amazon DocumentDB cluster.

Conclusion

This post we showed you how to set up an EMR cluster with the appropriate software versions and connect to a TLS-enabled Amazon DocumentDB cluster using a simple Spark application. All the best practices relating to Amazon EMR, Amazon DocumentDB, Apache Spark, and the MongoDB Spark connector should be applied while creating a production-grade application.

Leave your thoughts and questions in the comments.


About the author

pictureSourav Biswas is a Senior DocumentDB Specialist Solutions Architect at Amazon Web Services (AWS). He has been helping AWS DocumentDB customers successfully adopt the service and implement best practices around it. Before joining AWS, he worked extensively as an application developer and solutions architect for various noSQL vendors