Amazon Web Services ブログ

AWS Lambda を使用した Amazon DynamoDB の変更を Amazon Aurora PostgreSQL へ継続的に複製する方法

(本記事は 2024/05/14に投稿された Continuously replicate Amazon DynamoDB changes to Amazon Aurora PostgreSQL using AWS Lambda を翻訳した記事です。)

Amazon DynamoDB は、あらゆる規模で高性能アプリケーションを実行できるように設計された、フルマネージド型のサーバーレスなキーバリュー NoSQL データベースです。Amazon Aurora は、クラウド向けに構築された MySQL および PostgreSQL と互換性のあるリレーショナルデータベースです。Aurora は、従来のエンタープライズデータベースのパフォーマンスと可用性と、オープンソースデータベースのシンプルさとコスト効率を兼ね備えています。サーバーレステクノロジーにより、キャパシティのプロビジョニングやパッチ適用などのインフラストラクチャ管理のタスクが不要になり、アプリケーションスタック全体の俊敏性を向上させます。

この投稿では、Amazon DynamoDB StreamsAWS Lambda を使用して DynamoDB から Amazon Aurora PostgreSQL 互換エディションにデータをレプリケートすることで、大規模なリアルタイムのデータ変更を処理するソリューションを紹介します。

ユースケース

私たちのこのユースケースでは、お客様はオンプレミス環境でレガシーレポートアプリケーション、ビジネスインテリジェンス(BI)ツール、データウェアハウスを実行していました。長期計画として、データウェアハウスを Amazon Redshift にモダナイズしたいと考えていました。一方、ダウンストリームのレガシーレポート環境をサポートするために、DynamoDB から Amazon Aurora PostgreSQL 互換エディションにデータをレプリケートし、ユーザーがワンタイムクエリや集計クエリを実行できるようにしました。DynamoDB から Amazon Aurora PostgreSQL にデータをレプリケートするのは一般的なパターンではありませんが、お客様はデータを Amazon Aurora PostgreSQL に持ち込むことを希望しました。これにより、一時的に既存のレガシーアプリケーションを実行し続けることができ、同時に Amazon Redshift への移行を開始することができました。

ソリューション概要

DynamoDB では、パーティションキーと、オプションでソートキーを定義するだけでデータを操作できます。一方、Amazon Aurora などのリレーショナルデータベースでは、扱う属性ごとにテーブルスキーマを定義する必要があります。DynamoDB テーブルから変更をレプリケートするには、DynamoDB Streams を使用して、アイテムレベルの変更を時系列でキャプチャし、この情報を最大 24 時間ログに保存します。レプリケーションは DynamoDB Streams が有効になってから開始されます。つまり、DynamoDB テーブルに既存のデータがあり、それを Aurora にレプリケートする必要がある場合は、DynamoDB データを Amazon S3 にエクスポートするか、AWS Data Pipeline を使用して DynamoDB データをエクスポートおよびインポートしたりするなど、1 回限りのロードで対処する必要があります。DynamoDB はスキーマレスであるため、レプリケーションが中断されないように、DynamoDB に新しい属性を追加する場合は、リレーショナルデータベースの構造を最新の状態に保つ必要があります。Aurora は 1 秒あたり数十万件のトランザクション(TPS)を処理できますが、DynamoDB が受信する TPS がそれを超えると、Aurora でレイテンシーが発生する可能性があります。ソリューションを実装する前に、TPS の要件を理解し、レイテンシーの SLA に整合させることが重要です。

お客様と作業を進める中で、Amazon Data Firehose を使用して DynamoDB から Aurora にデータをストリーミングするオプションについても話し合いました。しかし、お客様は、追加コストなしですぐに使用できるソリューションであり、24 時間以内のデータ保持に関するお客様のサービスレベルアグリーメント(SLA)を満たしていることから、DynamoDB Streams の利用を希望しました。

次の図は、ソリューションのアーキテクチャとワークフローを示しています。

データベース間のデータレプリケーションを有効にするには、次の手順を実行します。

  1. DynamoDB テーブルを作成します。
  2. DynamoDB から SQL へのテーブルマッピングを構成します。
  3. DynamoDB テーブルの DynamoDB Streams を有効にします。
  4. Powertools for AWS Lambda を使用して Amazon CloudWatch Logs および AWS Secrets Manager パラメータ用の Lambda 関数を作成します。
  5. DynamoDB Streams の Lambda トリガーを設定します。
  6. DynamoDB の変更を Amazon RDS で検証します。

前提条件

この記事を読むには、次の前提条件が必要です。

DynamoDB から SQL へのテーブルマッピングの構成

次の表は、DynamoDB テーブルと SQL データベース間のマッピングを示しています。このユースケースでは、1 つの DynamoDB テーブルを 1 つの Aurora PostgreSQL テーブルにマッピングしています。

DynamoDB Table (Employees) SQL Table (Employees)
Id (PrimaryKey), Type: Number Id (PrimaryKey), Type: Number
empName, Type: String Name, Typre: Varchar(20)
empDepartment, Type: String Department, Type: Varchar(10)

両方のテーブルの Id を主キーとして使用します。

DynamoDB SQL
INSERT INSERT
MODIFY UPDATE
REMOVE DELETE

DynamoDB テーブルの作成

次の AWS CLI コマンドは、パーティションキー Id を数値として指定した Employees という名前の DynamoDB テーブルを作成しています。

aws dynamodb create-table \
    --table-name Employees \
    --attribute-definitions \
        AttributeName=Id,AttributeType=N \
    --key-schema \
        AttributeName=Id,KeyType=HASH \
    --provisioned-throughput \
        ReadCapacityUnits=5,WriteCapacityUnits=5 \
    --table-class STANDARD

このテーブルは、5 つの読み取りキャパシティーユニット(RCU)と 5 つの書き込みキャパシティーユニット(WCU)でプロビジョニングされたスループットで構成されています。プロビジョニングされたキャパシティ料金の詳細については、プロビジョニングされたキャパシティの料金を参照してください。

DynamoDB テーブルで DynamoDB Streamsの有効化

DynamoDB Streams を有効にするには、次の手順を実行します。

  1. DynamoDB コンソールで、作成したテーブルに移動し、エクスポートおよびストリームタブを選択します。
  2. DynamoDB ストリームの詳細について、オンにするを選択します。

DynamoDB Streams を有効にすると、DynamoDB テーブルのすべてのデータ操作言語(DML)アクションがストリーム内の項目としてキャプチャされます。

  1. 表示タイプでは新しく更新された値をキャプチャするために新しいイメージを選択し、新しい値を使って宛先を置き換えます。
  2. ストリームをオンにするを選択します。

同様のことは CLI でも実行できます。次のコマンドは、新しいイメージの表示タイプを使用して Employees テーブルでのストリーミングを有効にします。

aws dynamodb update-table \
    --table-name Employees \
    --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE

CloudWatch Logs と Secrets Manager パラメータ用の Lambda 関数を作成

このユースケースでは、SQL Replicator と呼ばれる Lambda 関数を使用します。この関数は、DynamoDB テーブルでデータが変更されたときに DynamoDB Streams によって呼び出されます。この関数は Aurora PostgreSQL Serverless への変更をレプリケートし、そのログは Powertools for Lambda を使用して CloudWatch Logs にキャプチャされます。Lambda のコードは Python で記述されています。PostgreSQL の接続には Psycopg データベースアダプターを使用し、logger と シークレットストア には Powertools for Lambda (Python) を使用します。

Lambda ロールポリシー

Lambda ロールは、次の AWS 管理ポリシーを使用して構築されています。

Secrets Manager で送信先の RDS データベースシークレットを作成し、Lambda 関数から使用することができます。統合については、Improve security of Amazon RDS master database credentials using AWS Secrets Manager を参照してください。

次の Python コードは、DynamoDB から PostgreSQL にデータを同期する Lambda 関数用です。これには以下のアクションが含まれています。

  • jsonpsycopg2aws_lambda_powertools などの必要なライブラリをインポートします。
  • ログを記録するために、aws_lambda_powertools から logger を初期化します。
  • Secrets Manager から RDS データベースの認証情報を取得します。
  • psycopg2 を使用して PostgreSQL データベースに接続します。
  • DynamoDB イベントの各レコードについて、イベントタイプ(INSERT、 MODIFY、REMOVE)に基づいて PostgreSQL で CRUD 操作を実行します。
  • psycopg2 cursor を使用して SQL クエリを実行し、PostgreSQL データベース内のレコードを insert、update、deleteします。
  • 各ステップで logger を使用して関連情報を記録します。
  • PostgreSQL からレコードを選択して同期されたデータを検証します。
  • 最後にトランザクションをコミットします。
# Library imports
import json
import psycopg2
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities import parameters

#Initialize Powertools Logger 
logger = Logger()

#instruct Logger to log the incoming event
@logger.inject_lambda_context(log_event=True)
#Lambda Handler 
def lambda_handler(event, context):
    try:
        # Retrieve the secret value from AWS Secrets Manager
        secret_value = json.loads(parameters.get_secret(name="dev/rds"))

        #DB Connect 
        mydb = psycopg2.connect(
        user=secret_value["username"], 
        password=secret_value["password"], 
        host=secret_value["host"], 
        dbname=secret_value["engine"], 
        port= secret_value["port"]
        )

        mycursor = mydb.cursor()

        # For every record in the event
        for record in event["Records"]:
            event_name = record["eventName"]
            #based on the record event 
            if event_name == "INSERT":
                logger.info("Inserting record", extra = { "record": record})
                sql_script = "INSERT INTO public.\"Employees\" VALUES (%s,%s,%s)"
                sql_value = (
                    int(record.get("dynamodb",{}).get("Keys",{}).get("Id",{}).get("N")),
                    record.get("dynamodb",{}).get("NewImage",{}).get("empName",{}).get("S","NA"),
                    record.get("dynamodb",{}).get("NewImage",{}).get("empDepartment",{}).get("S","NA"),
                )
                mycursor.execute(sql_script,sql_value)
                logger.info("Record inserted successfully into Employees table")

            elif event_name == "MODIFY":
                logger.info("Modifying record", extra = { "record": record})
                sql_script = "UPDATE public.\"Employees\" SET \"Name\" = %s, \"Department\" = %s WHERE \"Id\" = %s"
                sql_value = (
                    record.get("dynamodb",{}).get("NewImage",{}).get("empName",{}).get("S", record.get("dynamodb",{}).get("NewImage",{}).get("empName",{}).get("S","NA")),
                    record.get("dynamodb",{}).get("NewImage",{}).get("empDepartment",{}).get("S", record.get("dynamodb",{}).get("NewImage",{}).get("empDepartment",{}).get("S","NA")),
                    int(record.get("dynamodb",{}).get("Keys",{}).get("Id",{}).get("N")),
                )
                mycursor.execute(sql_script,sql_value)
                logger.info("Record modified successfully into Employees table")
            
            elif event_name == "REMOVE":
                logger.info("Removing record", extra = { "record": record})
                sql_script = "DELETE FROM public.\"Employees\" WHERE \"Id\" = %s"
                sql_value = (
                    int(record.get("dynamodb",{}).get("Keys",{}).get("Id",{}).get("N")),
                )
                mycursor.execute(sql_script,sql_value)
                logger.info("Record removed successfully from Employees table")
        
        #Verifying with the select the select statement        
        mycursor.execute('SELECT * FROM public."Employees"')
        records = mycursor.fetchall()
        mycursor.close()
        mydb.commit()

        logger.info("Received event: ", extra = { "records": records})

    except (Exception, psycopg2.Error) as error:
        logger.exception("Error while fetching data from PostgreSQL", extra= {"error": error})

関数コードの「dev/rds」を、Lambda がデータベース認証に利用するシークレットの名前に置き換えます。RDS データベースのシークレットの作成については、AWS Secrets Manager データベースシークレットの作成するを参照してください。

Secrets Manager のシークレットの値

参照用のシークレットの値は次のとおりです。

{"username":"admin","password":"xxxxxx","engine":"postgres","host":"database.cluster-abcdefghjklmn.us-east-1.rds.amazonaws.com","port":5432,"dbClusterIdentifier":"database-3"}

Amazon Aurora は VPC にデプロイされているため、Lambda を同じ VPC にアタッチしました。Lambda と Amazon Aurora の両方にセキュリティグループルールを設定して、両者の接続を許可しています。詳細については、Lambda 関数を使用して Amazon RDS データベースにアクセスするを参照してください。追加のヘルプについては、Amazon RDS のトラブルシューティングを参照することもできます。

DynamoDB Streams の Lambda トリガーを設定

Lambda トリガーを使用すると、DynamoDB テーブルのデータ変更に対応するアプリケーションを構築することができます。DynamoDB と Lambda の統合の詳細については、DynamoDB Streams と AWS Lambda のトリガーを参照してください。

トリガーは Lambda 関数を実行し、エラーが返された場合には、正常に処理されるか、データの有効期限が切れになるまでバッチを再試行します。より小さなバッチで再試行したり、再試行回数を制限したり、その他のオプションを Lambda 関数に設定したりすることもできます。バッチ処理の詳細については、ポーリングストリームとバッチストリームを参照してください。

Lambda トリガーは、DynamoDB の DML ボリュームに基づいてバッチサイズが 100 に設定されています。

Amazon RDS で DynamoDB の変更を検証する

DynamoDB Streams と Lambda 関数をセットアップしたら、データ配信を検証することができます。AWS CLI またはコンソールを使用して DynamoDB の insert、update、delete を実行できます。この記事では、AWS CLI のサンプルコードを提供します。PostgreSQL クライアントを使用して接続し(この記事では pgAdmin を使用)、データを検証できます。

Insert

AWS CLI で次のコードを使用して insert を実行します。

aws dynamodb put-item \
--table-name Employees --item "{\"Id\": {\"N\": \"2001\"},\"empDepartment\": {\"S\": \"AnyDept\"},\"empName\": {\"S\": \"Akua\"}}"

Update

AWS CLI で次のコードを使用して update を実行します。

aws dynamodb update-item --table-name Employees \
--key "{\"Id\": {\"N\": \"2001\"}}" \
--update-expression "SET  empDepartment = :newval" \
--expression-attribute-values "{\":newval\": {\"S\": \"ExDept\"}}"

次のスクリーンショットは、テーブル内の update された値を示しています。

Delete

AWS CLI で次のコードを使用して delete を実行します。

aws dynamodb delete-item --table-name Employees --key "{\"Id\": {\"N\": \"2001\"}}"

次のスクリーンショットは、レコードが delete されたことを示しています。

クリーンアップ

  1. Lambda 関数を削除します。
  2. DynamoDB テーブルを削除します。
  3. Aurora PostgreSQL Serverless データベースを削除します。

まとめ

この記事では、Lambda を使用して DynamoDB の変更を Aurora に継続的にレプリケートするイベント駆動型ソリューションを構築しました。このソリューションは、ダウンストリームのレガシーレポートワークロードをサポートすることでお客様の問題を解決し、1 回限りのクエリと集計クエリを実行できるようにしました。

このソリューションを試してみて、コメントや質問がある場合はコメント欄に残してください。

著者について

Aravind Hariharaputran は、Amazon Web Services のプロフェッショナルサービスチームのデータベースコンサルタントです。彼は Microsoft SQL Server を専門とするデータベース全般に情熱を注いでいます。お客様がオンプレミスのデータベースワークロードを AWS クラウドに移行して最適化するのを支援する技術ソリューションの構築を支援しています。家族と過ごしたり、クリケットをしたりすることを楽しんでいます

Sakthivel Chellapparimanam は、Amazon Web Services の AWS プロフェッショナルサービスのクラウドアプリケーションアーキテクトです。お客様のクラウドアプリケーションの構築とクラウドへのアプリケーションの移行を支援しています。