AWS Big Data Blog
Handle UPSERT data operations using open-source Delta Lake and AWS Glue
September 2024: This post was reviewed and updated for accuracy.
Many customers need an ACID transaction (atomic, consistent, isolated, durable) data lake that can log change data capture (CDC) from operational data sources. There is also demand for merging real-time data into batch data. Delta Lake framework provides these two capabilities. In this post, we discuss how to handle UPSERTs (updates and inserts) of the operational data using natively integrated Delta Lake with AWS Glue, and query the Delta Lake using Amazon Athena.
We examine a hypothetical insurance organization that issues commercial policies to small- and medium-scale businesses. The insurance prices vary based on several criteria, such as where the business is located, business type, earthquake or flood coverage, and so on. This organization is planning to build a data analytical platform, and the insurance policy data is one of the inputs to this platform. Because the business is growing, hundreds and thousands of new insurance policies are being enrolled and renewed every month. Therefore, all this operational data needs to be sent to Delta Lake in near-real time so that the organization can perform various analytics, and build machine learning (ML) models to serve their customers in a more efficient and cost-effective way.
Solution overview
The data can originate from any source, but typically customers want to bring operational data to data lakes to perform data analytics. One of the solutions is to bring the relational data by using AWS Database Migration Service (AWS DMS). AWS DMS tasks can be configured to copy the full load as well as ongoing changes (CDC). The full load and CDC load can be brought into the raw and curated (Delta Lake) storage layers in the data lake. To keep it simple, in this post we opt out of the data sources and ingestion layer; the assumption is that the data is already copied to the raw bucket in the form of CSV files. An AWS Glue ETL job does the necessary transformation and copies the data to the Delta Lake layer. The Delta Lake layer ensures ACID compliance of the source data.
The following diagram illustrates the solution architecture.
The use case we use in this post is about a commercial insurance company. We use a simple dataset that contains the following columns:
- Policy – Policy number, entered as text
- Expiry – Date that policy expires
- Location – Location type (
Urban
orRural
) - State – Name of state where property is located
- Region – Geographic region where property is located
- Insured Value – Property value
- Business Type – Business use type for property, such as
Farming
orRetail
- Earthquake – Is earthquake coverage included (
Y
orN
) - Flood – Is flood coverage included (
Y
orN
)
The dataset contains a sample of 100 insurance policies. In the case of a production dataset, it may contain millions of records.
In the following sections, we walk through the steps to perform the Delta Lake UPSERT operations. We use the AWS Management Console to perform all the steps. However, you can also automate these steps using tools like AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), Terraforms, and so on.
Prerequisites
This post is focused towards architects, engineers, developers, and data scientists who build, design, and build analytical solutions on AWS. We expect a basic understanding of the console, AWS Glue, Amazon Simple Storage Service (Amazon S3), and Athena. Additionally, the persona is able to create AWS Identity and Access Management (IAM) policies and roles, create and run AWS Glue jobs and crawlers, and is able work with the Athena query editor.
Set up an S3 bucket for full and CDC load data feeds
To set up your S3 bucket, complete the following steps:
- Log in to your AWS account and choose a Region nearest to you.
- On the Amazon S3 console, create a new bucket. Make sure the name is unique (for example,
delta-lake-cdc-blog-<some random number>
). - Create the following folders:
- $bucket_name/fullload – This folder is used for a one-time full load from the upstream data source
- $bucket_name/cdcload – This folder is used for copying the upstream data changes
- $bucket_name/delta – This folder holds the Delta Lake data files
- Copy the sample dataset and save it in a file called
full-load.csv
to your local machine. - Upload the file using the Amazon S3 console into the folder
$bucket_name/fullload
.
Set up an IAM policy and role
In this section, we create an IAM policy for the S3 bucket access and a role for AWS Glue jobs to run, and also use the same role for querying the Delta Lake using Athena.
- On the IAM console, choose Polices in the navigation pane.
- Choose Create policy.
- Select JSON tab and paste the following policy code. Replace the
{bucket_name}
you created in the earlier step.
- Name the policy
delta-lake-cdc-blog-policy
and select Create policy. - On the IAM console, choose Roles in the navigation pane.
- Choose Create role.
- Select AWS Glue as your trusted entity and choose Next.
- Select the policy you just created, and with two additional AWS managed policies:
delta-lake-cdc-blog-policy
AWSGlueServiceRole
CloudWatchFullAccess
- Choose Next.
- Give the role a name (for example,
delta-lake-cdc-blog-role
).
Set up AWS Glue jobs
In this section, we set up two AWS Glue jobs: one for full load and one for the CDC load. Let’s start with the full load job.
- On the AWS Glue console, choose ETL Jobs in the navigation pane. AWS Glue Studio opens in the right side panel.
- Select Script editor. In the popup, choose Start fresh and choose Create script.
- In the script editor, replace the code with the following code snippet
- Navigate to the Job details tab.
- Provide a name for the job (for example,
Full-Load-Job
). - For IAM Role¸ choose the role
delta-lake-cdc-blog-role
that you created earlier. - For Worker type¸ choose G 2X.
- For Job bookmark, choose Disable.
- Set Number of retries to 0.
- Under Advanced properties¸ keep the default values.
- Under Job parameters:
- Add the key
--s3_bucket
with the bucket name you created earlier as the value. - Add the key
--datalake-formats
and give the valuedelta
- Add the key
- Keep the remaining default values and choose Save.
Now let’s create the CDC load job.
- Create a second job called
CDC-Load-Job
. - Follow the steps on the Job details tab as with the previous job.
- Alternatively, you may choose “Clone job” option from the Full-Load-Job, this will carry all the job details from the full load job.
- In the script editor, enter the following code snippet for the CDC logic:
Run the full load job
On the AWS Glue console, open full-load-job
and choose Run. The job takes about 2 minutes to complete, and the job run status changes to Succeeded. Go to $bucket_name
and open the delta
folder, which contains the insurance folder. You can note the Delta Lake files in it.
Create delta table using Amazon Athena
Amazon Athena now supports querying Delta Lake tables directly, offering improved performance and seamless integration with your data lake architecture. To create a Delta Lake table in Athena, you can use a simplified CREATE EXTERNAL TABLE statement that specifies only the table location and the Delta Lake table type. Athena will automatically infer the schema and other metadata from the Delta Lake transaction log, eliminating the need for manual schema definition.
- On the Athena console, open the query editor.
- Run the following query to create
insurance_policies
delta table:
This statement creates an external table named insurance_policies
that points to a Delta Lake dataset stored in the specified S3 location. The table_type
property is set to DELTA
to indicate that this is a Delta Lake table. Once created, you can query this table using standard SQL syntax in Athena, taking advantage of Delta Lake’s performance optimizations and ACID transaction support.
Query the delta table using Athena query editor
In this section, we query the delta_insurance
table using Athena. Note that if you’re using Athena for the first time, set the query output folder to store the Athena query results (for example, s3://<your-s3-bucket>/query-output/
).
- On the Athena console, open the query editor.
- Keep the default selections for Data source and Database.
- Run the query
SELECT * FROM delta_insurance
;. This query returns a total of 25 rows, the same as what was in the full load data feed. - For the CDC comparison, run the following query and store the results in a location where you can compare these results later:
The following screenshot shows the Athena query result.
Upload the CDC data feed and run the CDC job
In this section, we update three insurance policies and insert two new policies.
- Copy the following insurance policy data and save it locally as
cdc-load.csv
:
The first column in the CDC feed describes the UPSERT operations. U
is for updating an existing record, and I
is for inserting a new record and D
is for deleting a record. In this CDC feed, there are five new policies, three updates to existing policies and two deletes.
- Upload the
cdc-load.csv
file to the$bucket_name/cdcload/
folder. - On the AWS Glue console, run
CDC-Load-Job
. This job takes care of updating the Delta Lake accordingly.
The change details are as follows:
- 1101,1102,1103,1104 and 110002 – New policies added to the table
- 1000,1003 and 1007 – These policies are updated.
- 1001 and 1005 policies are removed.
Run the query again:
As shown in the following screenshot, the changes in the CDC data feed are reflected in the Athena query results.
Advanced Analytics on Insurance Policies
To gain deeper insights into our insurance portfolio and better understand our risk exposure, we’ll perform some advanced analytics using Amazon Athena. These analyses will help us make data-driven decisions and develop more targeted strategies for risk management and business growth. We’ll focus on two key areas:
1. Risk Exposure Analysis by Region
Understanding our risk exposure across different regions is crucial for effective risk management and pricing strategies. This analysis aggregates the total insured value for each region, breaking it down by earthquake and flood risk. By examining these metrics, we can:
- Identify regions with high concentration of insured value
- Assess our exposure to specific natural disasters in different areas
- Adjust our underwriting policies or reinsurance strategies based on regional risk profiles
Run the following query in Athena query editor and observe the results.
The results look as follows:
2. Business Type Distribution and Average Policy Value
Analyzing the distribution of policies across different business types and their average insured values provides valuable insights into our customer base and potential market opportunities. This analysis will help us:
- Understand which business types are most prevalent in our portfolio
- Identify sectors with higher average policy values
- Tailor our marketing and product development efforts to target specific business segments
- Assess the balance of our portfolio across various industries
These advanced analytics queries demonstrate the power of Athena to extract meaningful insights from our Delta Lake tables. By leveraging these insights, we can make more informed decisions about risk management, pricing, and business strategy.
Run the following query in Athena query editor and observe the results.
The results look as follows:
Clean up
In this solution, we used all managed services, and there is no cost if AWS Glue jobs aren’t running. However, if you want to clean up the tasks, you can delete the two AWS Glue jobs, AWS Glue table, and S3 bucket.
Conclusion
Organizations are continuously looking at high performance, cost-effective, and scalable analytical solutions to extract the value of their operational data sources in near-real time. The analytical platform should be ready to receive changes in the operational data as soon as they occur. Typical data lake solutions face challenges to handle the changes in source data; the Delta Lake framework can close this gap. This post demonstrated how to build data lakes for UPSERT operations using AWS Glue and native Delta Lake tables, and how to query AWS Glue tables from Athena. You can implement your large scale UPSERT data operations using AWS Glue, Delta Lake and perform analytics using Amazon Athena.
References
- Introducing native Delta Lake table support with AWS Glue crawlers
- Build a high-performance, transactional data lake using open-source Delta Lake on Amazon EMR
- Build, Test and Deploy ETL solutions using AWS Glue and AWS CDK based CI/CD pipelines
- What is Delta Lake?
About the Authors
Praveen Allam is a Solutions Architect at Amazon Web Services (AWS), specializing in designing scalable, high-performance, and cost-effective enterprise-grade applications. With a passion for building innovative AWS solutions, he focuses on serverless architecture, analytics, and generative AI, helping organizations harness the power of data-driven decision-making.
Vivek Singh is Senior Solutions Architect with the AWS Data Lab team. He helps customers unblock their data journey on the AWS ecosystem. His interest areas are data pipeline automation, data quality and data governance, data lakes, and lake house architectures.
Audit History
Last reviewed and updated in September 2024 by Praveen Allam | Sr. Solutions Architect