AWS Big Data Blog

Build Write-Audit-Publish pattern with Apache Iceberg branching and AWS Glue Data Quality

Given the importance of data in the world today, organizations face the dual challenges of managing large-scale, continuously incoming data while vetting its quality and reliability. The importance of publishing only high-quality data can’t be overstated—it’s the foundation for accurate analytics, reliable machine learning (ML) models, and sound decision-making. Equally crucial is the ability to segregate and audit problematic data, not just for maintaining data integrity, but also for regulatory compliance, error analysis, and potential data recovery.

AWS Glue is a serverless data integration service that you can use to effectively monitor and manage data quality through AWS Glue Data Quality. Today, many customers build data quality validation pipelines using its Data Quality Definition Language (DQDL) because with static rules, dynamic rules, and anomaly detection capability, it’s fairly straightforward.

Apache Iceberg is an open table format that brings atomicity, consistency, isolation, and durability (ACID) transactions to data lakes, streamlining data management. One of its key features is the ability to manage data using branches. Each branch has its own lifecycle, allowing for flexible and efficient data management strategies.

This post explores robust strategies for maintaining data quality when ingesting data into Apache Iceberg tables using AWS Glue Data Quality and Iceberg branches. We discuss two common strategies to verify the quality of published data. We dive deep into the Write-Audit-Publish (WAP) pattern, demonstrating how it works with Apache Iceberg.

Strategy for managing data quality

When it comes to vetting data quality in streaming environments, two prominent strategies emerge: the dead-letter queue (DLQ) approach and the WAP pattern. Each strategy offers unique advantages and considerations.

  • The DLQ approach – Segregate problematic entries from high-quality data so that only clean data makes it into your primary dataset.
  • The WAP pattern – Using branches, segregate problematic entries from high-quality data so that only clean data is published in the main branch.

The DLQ approach

The DLQ strategy focuses on efficiently segregating high-quality data from problematic entries so that only clean data makes it into your primary dataset. Here’s how it works:

  1. As data streams in, it passes through a validation process
  2. Valid data is written directly to the table referred by downstream users
  3. Invalid or problematic data is redirected to a separate DLQ for later analysis and potential recovery

The following screenshot shows this flow.

bdb4341_0_1_dlq

Here are its advantages:

  • Simplicity – The DLQ approach is straightforward to implement, especially when there is only one writer
  • Low latency – Valid data is instantly available in the main branch for downstream consumers
  • Separate processing for invalid data – You can have dedicated jobs to process the DLQ for auditing and recovery purposes.

The DLQ strategy can present significant challenges in complex data environments. With multiple concurrent writers to the same Iceberg table, maintaining consistent DLQ implementation becomes difficult. This issue is compounded when different engines (for example, Spark, Trino, or Python) are used for writes because the DLQ logic may vary between them, making system maintenance more complex. Additionally, storing invalid data separately can lead to management overhead.

Additionally, for low-latency requirements, the processing validation step may introduce additional delays. This creates a challenge in balancing data quality with speed of delivery.

To solve those challenges in a reasonable way, we introduce the WAP pattern in the next section.

The WAP pattern

The WAP pattern implements a three-stage process:

  1. Write – Data is initially written to a staging branch
  2. Audit – Quality checks are performed on the staging branch
  3. Publish – Validated data is merged into the main branch for consumption

The following screenshot shows this flow.

bdb4341_0_2_wap

Here are its advantages:

  • Flexible data latency management – In the WAP pattern, the raw data is ingested to the staging branch without data validation, and then the high-quality data is ingested to the main branch with data validation. With this characteristic, there’s flexibility to achieve urgent, low-latency data handling on the staging branch and achieve high-quality data handling on the main branch.
  • Unified data quality management – The WAP pattern separates the audit and publish logic from the writer applications. It provides a unified approach to quality management, even with multiple writers or varying data sources. The audit phase can be customized and evolved without affecting the write or publish stages.

The primary challenge of the WAP pattern is the increased latency it introduces. The multistep process inevitably delays data availability for downstream consumers, which may be problematic for near real-time use cases. Furthermore, implementing this pattern requires more sophisticated orchestration compared to the DLQ approach, potentially increasing development time and complexity.

How the WAP pattern works with Iceberg

The following sections explore how the WAP pattern works with Iceberg.

Iceberg’s branching feature

Iceberg offers a branching feature for data lifecycle management, which is particularly useful for efficiently implementing the WAP pattern. The metadata of an Iceberg table stores a history of snapshots. These snapshots, created for each change to the table, are fundamental to concurrent access control and table versioning. Branches are independent histories of snapshots branched from another branch, and each branch can be referred to and updated separately.

When a table is created, it starts with only a main branch, and all transactions are initially written to it. You can create additional branches, such as an audit branch, and configure engines to write to them. Changes on one branch can be fast-forwarded to another branch using Spark’s fast_forward procedure, as shown in the following screenshot.

bdb4341_0_3_iceberg-branch

How to manage Iceberg branches

In this section, we cover the essential operations for managing Iceberg branches using SparkSQL. We’ll demonstrate how to use the branches, specifically, to create a new branch, write to and read from a specific branch, and set a default branch for a Spark session. These operations form the foundation for implementing the WAP pattern with Iceberg.

To create a branch, run the following SparkSQL query:

ALTER TABLE glue_catalog.db.tbl CREATE BRANCH audit

To specify a branch to be updated, use the glue_catalog.<database_name>.<table_name>.branch_<branch_name> syntax:

INSERT INTO glue_catalog.db.tbl.branch_audit VALUES (1, 'a'), (2, 'b');

To specify a branch to be queried, use the glue_catalog.<database_name>.<table_name>.branch_<branch_name> syntax:

SELECT * FROM glue_catalog.db.tbl.branch_audit;

To specify a branch for the entire Spark session scope, set the branch name to the Spark parameter spark.wap.branch. After this parameter is set, all queries will refer to the specified branch without explicit expression:

SET spark.wap.branch = audit

-- audit branch will be updated
INSERT INTO glue_catalog.db.tbl VALUES (3, 'c');

How to implement the WAP pattern with Iceberg branches

Using Iceberg’s branching feature, we can efficiently implement the WAP pattern with a single Iceberg table. Additionally, Iceberg characteristics such as ACID transactions and schema evolution are useful for handling multiple concurrent writers and varying data.

  1. Write – The data ingestion process switches branch from main and it commits updates to the audit branch, instead of the main branch. At this point, these updates aren’t accessible to downstream users who can only access the main branch.
  2. Audit – The audit process runs data quality checks on the data in the audit branch. It specifies which data is clean and ready to be provided.
  3. Publish – The audit process publishes validated data to the main branch with the Iceberg fast_forward procedure, making it available for downstream users.

This flow is shown in the following screenshot.

bdb4341_0_4_wap-w-iceberg-branch

By implementing the WAP pattern with Iceberg, we can obtain several advantages:

  • Simplicity – Iceberg branches can express multiple states of a table, such as audit and main, within one table. We can have unified data management even when handling multiple data contexts separately and uniformly.
  • Handling concurrent writers – Iceberg tables are ACID compliant, so consistent reads and writes are guaranteed even when multiple reader and writer processes run concurrently.
  • Schema evolution – If there are issues with the data being ingested, its schema may differ from the table definition. Spark supports dynamic schema merging for Iceberg tables. Iceberg tables can flexibly evolve their schema to write data with inconsistent schemas. By configuring the following parameters, when schema changes occur, new columns from the source are added to the target table with NULL values for existing rows. Columns present only in the target have their values set to NULL for new insertions or left unchanged during updates.
SET `spark.sql.iceberg.check-ordering` = false

ALTER TABLE glue_catalog.db.tbl SET TBLPROPERTIES (
    'write.spark.accept-any-schema'='true'
)
df.writeTo("glue_catalog.db.tbl").option("merge-schema","true").append()

As an intermediate wrap-up, the WAP pattern offers a robust approach to managing the balance between data quality and latency. With Iceberg branches, we can implement WAP pattern simply on single Iceberg table with handling concurrent writers and schema evolution.

Example use case

Suppose that a home monitoring system tracks room temperature and humidity. The system captures and sends the data to an Iceberg based data lake built on top of Amazon Simple Storage Service (Amazon S3). The data is visualized using matplotlib for interactive data analysis. For the system, issues such as device malfunctions or network problems can lead to partial or erroneous data being written, resulting in incorrect insights. In many cases, these issues are only detected after the data is sent to the data lake. Additionally, the correctness of such data is generally complicated.

To address these issues, the WAP pattern using Iceberg branches is applied for the system in this post. Through this approach, the incoming room data to the data lake is evaluated for quality before being visualized, and you make sure that only qualified room data is used for further data analysis. With the WAP pattern using the branches, you can achieve effective data management and promote data quality in downstream processes. The solution is demonstrated using AWS Glue Studio notebook, which is a managed Jupyter Notebook for interacting with Apache Spark.

Prerequisites

The following prerequisites are necessary for this use case:

Set up resources with AWS CloudFormation

First, you use a provided AWS CloudFormation template to set up resources to build Iceberg environments. The template creates the following resources:

  • An S3 bucket for metadata and data files of an Iceberg table
  • A database for the Iceberg table in AWS Glue Data Catalog
  • An AWS Identity and Access Management (IAM) role for an AWS Glue job

Complete the following steps to deploy the resources.

  1. Choose Launch stack.

Launch Button

  1. For the Parameters, IcebergDatabaseName is set by default. You can also change the default value. Then, choose Next.
  2. Choose Next.
  3. Choose I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Submit.
  5. After the stack creation is complete, check the Outputs The resource values are used in the following sections.

Next, configure the Iceberg JAR files to the session to use the Iceberg branch feature. Complete the following steps:

  1. Select the following JAR files from the Iceberg releases page and download these JAR files on your local machine:
    1. 1.6.1 Spark 3.3_with Scala 2.12 runtime Jar
    2. 1.6.1 aws-bundle Jar
  2. Open the Amazon S3 console and select the S3 bucket you created through the CloudFormation stack. The S3 bucket name can be found on the CloudFormation Outputs tab.
  3. Choose Create folder and create the jars path in the S3 bucket.
  4. Upload the two downloaded JAR files to s3://<IcebergS3Bucket>/jars/ from the S3 console.

Upload a Jupyter Notebook on AWS Glue Studio

After launching the CloudFormation stack, you create an AWS Glue Studio notebook to use Iceberg with AWS Glue. Complete the following steps.

  1. Download wap.ipynb.
  2. Open AWS Glue Studio console.
  3. Under Create job, select Notebook.
  4. Select Upload Notebook, choose Choose file, and upload the notebook you downloaded.
  5. Select the IAM role name, such as IcebergWAPGlueJobRole, that you created through the CloudFormation stack. Then, choose Create notebook.
  6. For Job name at the left top of the page, enter iceberg_wap.
  7. Choose Save.

Configure Iceberg branches

Start by creating an Iceberg table that contains a room temperature and humidity dataset. After creating the Iceberg table, create branches that are used for performing the WAP practice. Complete the following steps:

  1. On the Jupyter Notebook that you created in Upload a Jupyter Notebook on AWS Glue Studio, run the following cell to use Iceberg with Glue. %additional_python_modules pandas==2.2 is used to visualize the temperature and humidity data in the notebook with pandas. Before running the cell, replace <IcebergS3Bucket> with the S3 bucket name where you uploaded the Iceberg JAR files.

bdb4341_1_session-config

  1. Initialize the SparkSession by running the following cell. The first three settings, starting with spark.sql, are required to use Iceberg with Glue. The default catalog name is set to glue_catalog using spark.sql.defaultCatalog. The configuration spark.sql.execution.arrow.pyspark.enabled is set to true and is used for data visualization with pandas.

bdb4341_2_sparksession-init

  1. After the session is created (the notification Session <Session Id> has been created. will be displayed in the notebook), run the following commands to copy the temperature and humidity dataset to the S3 bucket you created through the CloudFormation stack. Before running the cell, replace <IcebergS3Bucket> with the name of the S3 bucket for Iceberg, which you can find on the CloudFormation Outputs tab.
!aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-4341/data/part-00000-fa08487a-43c2-4398-bae9-9cb912f8843c-c000.snappy.parquet s3://<IcebergS3Bucket>/src-data/current/ 
!aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-4341/data/new-part-00000-e8a06ab0-f33d-4b3b-bd0a-f04d366f067e-c000.snappy.parquet s3://<IcebergS3Bucket>/src-data/new/
  1. Configure the data source bucket name and path (DATA_SRC), Iceberg data warehouse path (ICEBERG_LOC), and database and table names for an Iceberg table (DB_TBL). Replace <IcebergS3Bucket> with the S3 bucket from the CloudFormation Outputs tab.
  2. Read the dataset and create the Iceberg table with the dataset using the Create Table As Select (CTAS) query.

bdb4341_3_ctas

  1. Run the following code to display the temperature and humidity data for each room in the Iceberg table. Pandas and matplotlib are used to visualize the data for each room. The data from 10:05 to 10:30 is displayed in the notebook, as shown in the following screenshot, with each room showing approximately 25°C for temperature (displayed as the blue line) and 52% for humidity (displayed as the orange line).
import matplotlib.pyplot as plt
import pandas as pd

CONF = [
    {'room_type': 'myroom', 'cols':['current_temperature', 'current_humidity']},
    {'room_type': 'living', 'cols':['current_temperature', 'current_humidity']},
    {'room_type': 'kitchen', 'cols':['current_temperature', 'current_humidity']}
]

fig, axes = plt.subplots(nrows=3, ncols=1, sharex=True, sharey=True)
for ax, conf in zip(axes.ravel(), CONF):
    df_room = spark.sql(f"""
        SELECT current_time, current_temperature, current_humidity, room_type
        FROM {DB_TBL} WHERE room_type = '{conf['room_type']}'
        ORDER BY current_time ASC
        """)
    pdf = df_room.toPandas()
    pdf.set_index(pdf['current_time'], inplace=True)
    plt.xlabel('time')
    plt.ylabel('temperature/humidity')
    plt.ylim(10, 60)
    plt.yticks([tick for tick in range(10, 60, 10)])
    pdf[conf['cols']].plot.line(ax=ax, grid=True, figsize=(8, 6), title=conf['room_type'], legend=False, marker=".", markersize=2, linewidth=0)

plt.legend(['temperature', 'humidity'], loc='center', bbox_to_anchor=(0, 1, 1, 5.5), ncol=2)

%matplot plt

bdb4341_4_vis-1

  1. You create Iceberg branches by running the following queries before writing data into the Iceberg table. You can create an Iceberg branch by the ALTER TABLE db.table CREATE BRANCH <branch_name> query.
ALTER TABLE iceberg_wap_db.room_data CREATE BRANCH stg
ALTER TABLE iceberg_wap_db.room_data CREATE BRANCH audit

Now, you’re ready to build the WAP pattern with Iceberg.

Build WAP pattern with Iceberg

Use the Iceberg branches created earlier to implement the WAP pattern. You start writing the newly incoming temperature and humidity data including erroneous values to the stg branch in the Iceberg table.

Write phase: Write incoming data into the Iceberg stg branch

To write the incoming data into the stg branch in the Iceberg table, complete the following steps:

  1. Run the following cell and write the data into Iceberg table.

bdb4341_5_write

  1. After the records are written, run the following code to visualize the current temperature and humidity data in the stg On the following screenshot, notice that new data was added after 10:30. The output shows incorrect readings, such as around 100°C for temperature between 10:35 and 10:52 in the living room.
fig, axes = plt.subplots(nrows=3, ncols=1, sharex=True, sharey=True)
for ax, conf in zip(axes.ravel(), CONF):
    df_room_stg = spark.sql(f"""
        SELECT current_time, current_temperature, current_humidity, room_type
        FROM {DB_TBL}.branch_stg WHERE room_type = '{conf['room_type']}'
        ORDER BY current_time ASC
        """)
    pdf = df_room_stg.toPandas()
    pdf.set_index(pdf['current_time'], inplace=True)
    plt.xlabel('time')
    plt.ylabel('temperature/humidity')
    plt.ylim(10, 110)
    plt.yticks([tick for tick in range(10, 110, 30)])
    pdf[conf['cols']].plot.line(ax=ax, grid=True, figsize=(8, 6), title=conf['room_type'], legend=False, marker=".", markersize=2, linewidth=0)

plt.legend(['temperature', 'humidity'], loc='center', bbox_to_anchor=(0, 1, 1, 5.5), ncol=2)

%matplot plt

bdb4341_6_vis-2

The new temperature data including erroneous records was written to the stg branch. This data isn’t visible to the downstream side because it hasn’t been published to the main branch. Next, you evaluate the data quality in the stg branch.

Audit phase: Evaluate the data quality in the stg branch

In this phase, you evaluate the quality of the temperature and humidity data in the stg branch using AWS Glue Data Quality. Then, the data that doesn’t meet the criteria is filtered out based on the data quality rules, and the qualified data is used to update the latest snapshot in the audit branch. Start with the data quality evaluation:

  1. Run the following code to evaluate the current data quality using AWS Glue Data Quality. The evaluation rule is defined in DQ_RULESET, where the normal temperature range is set between −10 and 50°C based on the device specifications. Any values out of this range are considered erroneous in this scenario.
from awsglue.context import GlueContext
from awsglue.transforms import SelectFromCollection
from awsglue.dynamicframe import DynamicFrame
from awsgluedq.transforms import EvaluateDataQuality
DQ_RULESET = """Rules = [ ColumnValues "current_temperature" between -10 and 50 ]"""


dyf = DynamicFrame.fromDF(
    dataframe=spark.sql(f"SELECT * FROM {DB_TBL}.branch_stg"),
    glue_ctx=GlueContext(spark.sparkContext),
    name='dyf')

dyfc_eval_dq = EvaluateDataQuality().process_rows(
    frame=dyf,
    ruleset=DQ_RULESET,
    publishing_options={
        "dataQualityEvaluationContext": "dyfc_eval_dq",
        "enableDataQualityCloudWatchMetrics": False,
        "enableDataQualityResultsPublishing": False,
    },
    additional_options={"performanceTuning.caching": "CACHE_NOTHING"},
)

# Show DQ results
dyfc_rule_outcomes = SelectFromCollection.apply(
    dfc=dyfc_eval_dq,
    key="ruleOutcomes")
dyfc_rule_outcomes.toDF().select('Outcome', 'FailureReason').show(truncate=False)
  1. The output shows the result of the evaluation. It displays Failed because some temperature data, such as 105°C, is out of the normal temperature range of −10 to 50°C.
+-------+------------------------------------------------------+
|Outcome|FailureReason                                         |
+-------+------------------------------------------------------+
|Failed |Value: 105.0 does not meet the constraint requirement!|
+-------+------------------------------------------------------+
  1. After the evaluation, filter out the incorrect temperature data in the stg branch, then update the latest snapshot in the audit branch with the valid temperature data.

bdb4341_7_write-to-audit

Through the data quality evaluation, the audit branch in the Iceberg table now contains the valid data, which is ready for downstream use.

Publish phase: Publish the valid data to the downstream side

To publish the valid data in the audit branch to main, complete the following steps:

  1. Run the fast_forward Iceberg procedure to publish the valid data in the audit branch to the downstream side.

bdb4341_8_publish

  1. After the procedure is complete, review the published data by querying the main branch in the Iceberg table to simulate the query from the downstream side.
fig, axes = plt.subplots(nrows=3, ncols=1, sharex=True, sharey=True)
for ax, conf in zip(axes.ravel(), CONF):
    df_room_main = spark.sql(f"""
        SELECT current_time, current_temperature, current_humidity, room_type
        FROM {DB_TBL} WHERE room_type = '{conf['room_type']}'
        ORDER BY current_time ASC
        """)
    pdf = df_room_main.toPandas()
    pdf.set_index(pdf['current_time'], inplace=True)
    plt.xlabel('time')
    plt.ylabel('temperature/humidity')
    plt.ylim(10, 60)
    plt.yticks([tick for tick in range(10, 60, 10)])
    pdf[conf['cols']].plot.line(ax=ax, grid=True, figsize=(8, 6), title=conf['room_type'], legend=False, marker=".", markersize=2, linewidth=0)

plt.legend(['temperature', 'humidity'], loc='center', bbox_to_anchor=(0, 1, 1, 5.5), ncol=2)

%matplot plt

The query result shows only the valid temperature and humidity data that has passed the data quality evaluation.

bdb4341_9_vis-3

In this scenario, you successfully managed data quality by applying the WAP pattern with Iceberg branches. The room temperature and humidity data, including any erroneous records, was first written to the staging branch for quality evaluation. This approach prevented erroneous data from being visualized and leading to incorrect insights. After the data was validated by AWS Glue Data Quality, only valid data was published to the main branch and visualized in the notebook. Using the WAP pattern with Iceberg branches, you can make sure that only validated data is passed to the downstream side for further analysis.

Clean up resources

To clean up the resources, complete the following steps:

  1. On the Amazon S3 console, select the S3 bucket aws-glue-assets-<ACCOUNT_ID>-<REGION> where the Notebook file (iceberg_wap.ipynb) is stored. Delete the Notebook file located in the notebook path.
  2. Select the S3 bucket you created through the CloudFormation template. You can obtain the bucket name from IcebergS3Bucket key on the CloudFormation Outputs tab. After selecting the bucket, choose Empty to delete all objects.
  3. After you confirm the bucket is empty, delete the CloudFormation stack iceberg-wap-baseline-resources.

Conclusion

In this post, we explored common strategies for maintaining data quality when ingesting data into Apache Iceberg tables. The step-by-step instructions demonstrated how to implement the WAP pattern with Iceberg branches. For use cases requiring data quality validation, the WAP pattern provides the flexibility to manage data latency even with concurrent writer applications without impacting downstream applications.


About the Authors

Tomohiro Tanaka is a Senior Cloud Support Engineer at Amazon Web Services. He’s passionate about helping customers use Apache Iceberg for their data lakes on AWS. In his free time, he enjoys a coffee break with his colleagues and making coffee at home.

Sotaro Hikita is a Solutions Architect. He supports customers in a wide range of industries, especially the financial industry, to build better solutions. He is particularly passionate about big data technologies and open source software.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.