AWS Big Data Blog
Build a data lakehouse in a hybrid Environment using Amazon EMR Serverless, Apache DolphinScheduler, and TiDB
While helping our customers build systems on AWS, we found out that a large number of enterprise customers who pay great attention to data security and compliance, such as B2C FinTech enterprises, build data-sensitive applications on premises and use other applications on AWS to take advantage AWS managed services. Using AWS managed services can greatly simplify daily operation and maintenance, as well as help you achieve optimized resource utilization and performance.
This post discusses a decoupled approach of building a serverless data lakehouse using AWS Cloud-centered services, including Amazon EMR Serverless, Amazon Athena, Amazon Simple Storage Service (Amazon S3), Apache DolphinScheduler (an open source data job scheduler) as well as PingCAP TiDB, a third-party data warehouse product that can be deployed either on premises or on the cloud or through a software as a service (SaaS).
Solution overview
For our use case, an enterprise data warehouse with business data is hosted on an on-premises TiDB platform, an AWS Global Partner that is also available on AWS through AWS Marketplace.
The data is then processed by an Amazon EMR Serverless Job to achieve data lakehouse tiering logic. Different tiering data are stored in separate S3 buckets or separate S3 prefixes under the same S3 bucket. Typically, there are four layers in terms of data warehouse design.
- Operational data store layer (ODS) – This layer stores raw data of the data warehouse
- Data warehouse stage layer (DWS) – This layer is a temporary staging area within the data warehousing architecture where data from various sources is loaded, cleaned, transformed, and prepared before being loaded into the data warehouse database layer;
- Data warehouse database layer (DWD) – This layer is the central repository in a data warehousing environment where data from various sources is integrated, transformed, and stored in a structured format for analytical purposes;
- Analytical data store (ADS) – This layer is a subset of the data warehousing that is specifically designed and optimized for a particular business function, department, or analytical purpose.
For this post, we only use ODS and ADS layers to demonstrate the technical feasibility.
The schema of this data is managed through the AWS Glue Data Catalog, and can be queried using Athena. The EMR Serverless Jobs are orchestrated using Apache DolphinScheduler deployed in cluster mode on Amazon Elasctic Compute Cloud (Amazon EC2) instances, with meta data stored in an Amazon Relational Database Service (Amazon RDS) for MySQL instance.
Using DolphinScheduler as the data lakehouse job orchestrator offers the following advantages:
- Its distributed architecture allows for better scalability, and the visual DAG designer makes workflow creation more intuitive for team members with varying technical expertise
- It provides more granular task-level controls and supports a wider range of task types out-of-the-box, including Spark, Flink, and machine learning (ML) workflows, without requiring additional plugin installations;
- Its multi-tenancy feature enables better resource isolation and access control across different teams within an organization.
However, DolphinScheduler requires more initial setup and maintenance effort, making it more suitable for organizations with strong DevOps capabilities and a desire for complete control over their workflow infrastructure.
The following diagram illustrates the solution architecture.
Prerequisites
You need to create an AWS account and set up an AWS Identity and Access Management (IAM) user as a prerequisite for the following implementation. Complete the following steps:
For AWS account signing up, please follow up the actions guided per page link.
- Create an AWS account.
- Sign in to the account using the root user for the first time.
- One the IAM console, create an IAM user with
AdministratorAccess
Policy. - Use this IAM user to log in AWS Management Console rather the root user.
- On the IAM console, choose Users in the navigation pane.
- Navigate to your user, and on the Security credentials tab, create an access key.
- Store the access key and secret key in a secure place and use them for further API access of the resources of this AWS account.
Set up DolphinScheduler, IAM configuration, and the TiDB Cloud table
In this section, we walk through the steps to install DolphinScheduler, complete additional IAM configurations to enable the EMR Serverless job, and provision the TiDB Cloud table.
Install DolphinScheduler on an EC2 instance with an RDS for MySQL instance storing DolphinScheduler metadata. The production deployment mode of DolphinScheduler is cluster mode. In this blog, we use pseudo cluster mode which has the same installation steps as cluster mode, and could achieve resource economy. We name the EC2 instance ds-pseudo
.
Make sure the inbound rule of the security group attached to the EC2 instance allows port 12345’s TCP traffic. Then complete the following steps:
- Log in to Amazon EC2 as the root user, and install
jvm
: - Switch to
dir /usr/local/src
: - Install Apache Zookeeper:
- Check the Python version:
The version should be 3.9 or above. It is recommended that you use Amazon Linux 2023 or later as the Amazon EC2 operating system (OS); Python version 3.9 meets the requirement. For detail information, refer to Python in AL2023.
- Install Dolphinscheduler
- Download the
dolphinscheduler
package: - Download the
mysql
connector package: - Copy specific
mysql
connector JAR file to the following destinations: - Add the user
dolphinscheduler
, and make sure the directoryapache-dolphinscheduler
and the files under it are owned by the userdolphinscheduler
:
- Download the
- Install the
mysql
client: - On the Amazon RDS console, provision an RDS for MySQL instance with the following configurations:
- For Database Creation Method, select Standard create.
- For Engine options, choose MySQL.
- For Edition: choose MySQL 8.0.35.
- For Templates: select Dev/Test.
- For Availability and durability, select Single DB instance.
- For Credentials management, select Self-managed.
- For Connectivity, select Connect to an EC2 compute resource, and choose the EC2 instance created earlier.
- For Database Authentication: choose Password Authentication.
- Navigate to the
ds- mysql
database details page, and under Connectivity & security, copy the RDS for MySQL endpoint.
- Configure the intance:
- Configure the
dolphinscheduler
configuration file: - Revise
dolphinscheduler_env.sh
: - On the Amazon EC2 console, navigate to the instance details page and copy the private IP address.
- Revise
install_env.sh
: - Configure the
dolphinscheduler
configuration file: - Install DolphinScheduler:
- Start DolphinScheduler after installation:
- Open the DolphinScheduler console:
After input the initial username and password, press Login button to enter into the dashboard shown as below.
Configure IAM role to enable the EMR serverless job
The EMR serverless job role needs to have permission to access a specific S3 bucket to read job scripts and potentially write results, and also have permission to access AWS Glue to read the Data Catalog which stores the tables’ meta data. For detailed guidance, please refer to Grant permission to use EMR Serverless or EMR Serverless Samples.
The following screenshot shows the IAM role configured with the trust policy attached.
The IAM role should have the following permissions policies attached, as shown in the following screenshot.
Provision the TiDB Cloud table
- To provision the TiDB Cloud table, complete the following steps:
- Register for TiDB Cloud.
- Create a serverless cluster, as shown in the following screenshot. For this post, we name the cluster
Cluster0
.
- Choose
Cluster0
, then choose SQL Editor to create a database namedtest
:
Synchronize data between on-premises TiDB and AWS
In this section, we discuss how to synchronize historical data as well as incremental data between TiDB and AWS.
Use TiDB Dumpling to sync historical data from TiDB to Amazon S3
Use the commands in this section to dump data stored in TiDB as CSV files into a S3 bucket. For full details on how to achieve a data sync from on-premises TiDB to Amazon S3, see Export data to Amazon S3 cloud storage. For this post, we use TiDB tool Dumpling. Complete the following steps:
- Log in to the EC2 instance created earlier as root.
- Run the following command to install TiUP:
- Run the following command to install Dumpling:
- Run the following command to achieve target database table dumpling to the specific S3 bucket.
- To acquire the TiDB serverless connection information, navigate to the TiDB Cloud console and choose Connect.
You can collect the specific connection information of test database from the following screenshot.
Yan can view the data stored in the S3 bucket on the Amazon S3 console.
You can use Amazon S3 Select to query the data and get results similar to the following screenshot, confirming that the data has been ingested into testtable
.
Use TiDB Dumpling with a self-managed checkpoint to sync incremental data from TiDB to Amazon S3
To achieve incremental data synchronization using TiDB Dumpling, it’s essential to self-manage the check point of the target synchronized data. One recommended way is to store the ID of the final ingested record into a certain media (such as Amazon ElastiCache for Redis, Amazon DynamoDB) to achieve a self-managing checkpoint when running the shell/Python job that trigges TiDB Dumpling. The prerequisite for implementing this is that the target table has a monotonically increasing id field as its primary key.
You can use the following TiDB Dumpling command to filter the exported data:
Use the TiDB CDC connector to sync incremental data from TiDB to Amazon S3
The advantage of using TiDB CDC connector to achieve incremental data synchronization from TiDB to Amazon S3 is that there is built-in change data capture (CDC) mechanism, and because the backend engine is Flink, the performance is fast. However, there is one trade-off: you need to create several Flink tables to map the ODS tables on AWS.
For instructions to implement the TiDB CDC connector, refer to TiDB CDC.
Use an EMR serverless job to sync historical and incremental data from a Data Catalog table to the TiDB table
Data usually flows from on premises to the AWS Cloud. However, in some cases, the data might flow from the AWS Cloud to your on-premises database.
After landing on AWS, the data will be wrapped up and managed by the Data Catalog by created Athena tables with the specific tables’ schema. The table DDL script is as follows:
The screenshot below showcases the DDL running result using Athena console.
The data stored in testtable
table is queried using select * from testable
SQL. The query result is shown as follows:
In this case, an EMR serverless spark job can accomplish the work of synchronizing data from an AWS Glue table to your on premises table.
If the Spark job is written in Scala, the sample code is as below:
You can acquire the TiDB serverless endpoint connection information on the TiDB console by choosing Connect, as shown earlier in this post.
After you have wrapped the Scala code as JAR file using SBT, you can submit the job to EMR Serverless with the following AWS Command Line Interface (AWS CLI) command:
If the Spark job is written in PySpark, the sample code is as follows:
You can submit the job to EMR Serverless using the following AWS CLI command:
The preceding PySpark code and AWS CLI command achieves outbound parameter input as well: the table name (specifically testspark
) is ingested into the SQL sentence when submitting the job.
EMR Serverless job pperation essentials
An EMR Serverless application is a resource pool concept. An application holds a certain capacity of compute, memory, and storage resources for jobs running on it to use. You can configure the resource capacity using AWS CLI or the console. Because it’s a resource pool, EMR Serverless application creation is usually a one-time action with the initial capacity and maximum capacity being configured.
An EMR Serverless job is a working unit that actually processes the compute task. In order for a job to work, you need to set the EMR Serverless application ID, the execution IAM role (discussed previously), and the specific application configuration (the resources the job is planning to use). Although you can create the EMR Serverless job on the console, it’s recommended to create the EMR Serverless job using the AWS CLI for further integration with the scheduler and scripts.
For more details on EMR Serverless application creation and EMR Serverless job provisioning, refer to EMR Serverless Hive query or EMR Serverless PySpark job
DolphinScheduler integration and job orchestration
DolphinScheduler is a modern data orchestration platform. It’s agile to create high- performance workflows with low code. It also provides a powerful UI, dedicated to solving complex task dependencies in the data pipeline and providing various types of jobs out of the box.
DolphinScheduler is developed and maintained by WhaleOps, and available in AWS Marketplace as WhaleStudio.
DolphinScheduler has been natively integrated with Hadoop: DolphinScheduler cluster mode is by default recommended to be deployed on a Hadoop cluster (usually on HDFS data nodes), and the HQL scripts uploaded to DolphinScheduler Resource Manager are stored by default on HDFS, and can be orchestrated using the following native Hive shell command:
Moreover, for specific case in which the orchestration DAGs are quite complicated, each DAG consists of several jobs (for example, more than 300), and almost all the jobs are HQL scripts stored in DolphinScheduler Resource Manager.
Complete the steps listed in this section to achieve a seamless integration between DolphinScheduler and EMR Serverless.
Switch the storage layer of DolphinScheduler Resource Center from HDFS to Amazon S3
Edit the common.properties
files under directories /usr/local/src/apache-dolphinscheduler/api-server/
and directory /usr/local/src/apache-dolphinscheduler/worker-server/conf
. The following code snippet shows the part of the file that needs to be revised:
After editing and saving the two files, restart the api-server
and worker-server
by running the following commands, under folder path /usr/local/src/apache-dolphinscheduler/
You can validate whether switching the storage layer to Amazon S3 was successful by uploading a script using DolphinScheduler Resource Center Console, check if the file appears in relevant S3 bucket folder.
Before verifying that Amazon S3 is now the storage location of DolphinScheduler, you need to create a tenant on the DolphinScheduler console and bundle the admin user with the tenant, as illustrated in the following screenshots:
After that, you can create a folder on the DolphinScheduler console, and check whether the folder is visible on the Amazon S3 console.
Make sure the job scripts uploaded from Amazon S3 are available in the DolphinScheduler Resource Center
After accomplishing the first task, you can upload the scripts from the DolphinScheduler Resource Center console, and confirm that the scripts are stored in Amazon S3. However, in practice, you need to migrate all scripts directly to Amazon S3. You can find and modify the scripts stored in Amazon S3 using DolphinScheduler Resource Center console. To do so, you can revise the metadata table t_ds_resources
by inserting all the scripts’ metadata. The table schema of table t_ds_resources
is shown in the following screenshot.
The insert command is as follows:
Now there are two records in the table t_ds_resoruces
.
You can access relevant records on the DolphinScheduler console.
The following screenshot shows the files on the Amazon S3 console.
Make the DolphinScheduler DAG orchestrator aware of the jobs’ status so the DAG can move forward or take relevant actions
As mentioned earlier, DolphinScheduler is natively integrated with the Hadoop ecosystem, and the HQL scripts can be orchestrated by the DolphinScheduler DAG orchestrator via Hive -f xxx.sql
command. As a result, when the scripts changed to shell scripts or Python scripts (EMR Severless jobs needs to be orchestrated via shell scripts or Python scripts rather than the simple Hive command), the DAG orchestrator can start the job, but can’t get the real time status of the job, and therefore can’t continue the workflow to further steps. Because the DAGs in this case are very complicated, it’s not feasible to amend the DAGs; instead we follow a lift-and-shift strategy.
We use the following scripts to capture jobs’ status and take appropriate actions.
Persist the application ID list with the following code:
Enable the DolphinScheduler step status auto-check using a Linux shell:
Clean up
To clean up your resources, we recommend using APIs through the following steps:
- Delete the EC2 instance:
- Find the instance using the following command:
- Delete the instance using the following command:
- Delete the RDS instance:
- Find the instance using the following command:
- Delete the instance using the following command:
- Delete the EMR Serverless application
- Find the EMR Serverless application using the following command:
- Delete the EMR Serverless application using the following command:
Conclusion
In this post, we discussed how EMR Serverless, as AWS managed serverless big data compute engine, integrates with popular OSS products like TiDB and DolphinScheduler. We discussed how to achieve data synchronization between TiDB and the AWS Cloud, and how to use DolphineScheduler to orchestrate EMR Serverless jobs.
Try out the solution with your own use case, and share your feedback in the comments.
About the Author
Shiyang Wei is Senior Solutions Architect at Amazon Web Services. He is specializing in cloud system architecture and solution design for the financial industry. Particularly, he focused on big data and machine learning applications in finance, as well as the impact of regulatory compliance on cloud architecture design in the financial sector. He has over 10 years of experience in data domain development and architectural design.