Amazon Web Services ブログ

Amazon Kinesis Data Streams および AWS Lambda を使用して、Amazon RDS for PostgreSQL の変更をストリーミングする

この記事では、Amazon Relational Database Service (Amazon RDS) for PostgreSQL 中央データベースを、Amazon Kinesis Data Streams にその変更をストリーミングすることで、他のシステムと統合する方法について説明します。以前の記事、「Amazon Kinesis を使用したデータベースの変更をストリーミングする」では、変更を Kinesis へストリーミングすることによって、MySQL データベース用の中央 RDS を他のシステムに統合する方法についてお話ししました。この記事では、さらに進んで、AWS Lambda 関数を使用して Amazon RDS for PostgreSQL の変更をキャプチャし、その変更を Kinesis Data Streams にストリーミングする方法を説明します。

次の図は、分散システムにおける一般的なアーキテクチャ設計を表しています。これには、信頼できる唯一の情報源と呼ばれる中央ストレージと、この中央ストレージを使用するいくつかの派生「サテライト」システムが含まれます。

この設計アーキテクチャーを用いて、データの整合性を維持するためにこのシステムのトランザクション機能を活用しながら、リレーショナルデータベースを中央データストアとして使用することができます。このコンテクストにおいての派生システムとは、全文検索システムであり、この信頼できる唯一の情報源の変更を観察し、それらの変更を変換かつフィルタリングし、最終的にその内部インデックスを更新するものです。もう 1 つの例は、OLAP クエリにより適したカラムナストレージです。一般に、中央リレーショナルシステムの個々の行が変更された時にアクションを実行する必要があるシステムはどれも、派生データストアにするのに適しているとい言えます。

この種のアーキテクチャの単純な実装では、変更された行を検索するために定期的にクエリを発行する派生システムがあります。これは基本的に SELECT ベースのクエリ (バッチ処理システムとも呼ばれます) で中央データベースをポーリングします。一方で、このアーキテクチャにより適した実装は、非同期の更新ストリームを使用するアーキテクチャです。

データベースには通常、行のすべての変更が格納されるトランザクションログがあります。ですので、この変更のストリームが外部のオブザーバシステムに公開されている場合、このシステムはこれらのストリームに接続し、行の変更を処理およびフィルタリングできる可能性があります。 この記事では、PostgreSQL を中央データベースとして、また Kinesis Data Stream をメッセージバスとして使用して、この基本的な実装を紹介します。

通常、PostgreSQL Write-Ahead Logging (WAL) ファイルは、マスター上のすべての変更を読み込んだ後、ローカルに適用するリードレプリカに公開されます。リードレプリカからデータを読み取る代わりに、wal2json 出力プラグインを用いた論理デコードを使用して、WAL の内容を直接デコードします。プラグインはこの GitHub リポジトリからダウンロードできます。論理デコードとは、データベースのテーブルに対するすべての持続的な変更を、一貫性がありかつ分かりやすい形式に抽出するプロセスのことです。この形式だと、データベースの内部状態を詳細に把握していなくても解釈することが可能です。

PostgreSQL WAL ログの RDS からの変更を継続的に読み取って、変更を Kinesis Data Streams にプッシュするには、AWS Lambda 関数を使用します。上位レベルでは、エンドツーエンドプロセスは次のようになります。

このメソッドの重要な点の 1 つは、コンシューマーが SQL クエリを受け取らないことです。それらは公開される可能性もありますが、一般に、SQL 互換のデータのレプリカを維持しない限り、オブザーバは SQL にあまり関心がありません。代わりに、変更された複数のエンティティ (複数行) をそれぞれ受け取ります。

このアプローチの利点は、コンシューマーは SQL を理解する必要がなく、信頼できる唯一の情報源は誰が変更を使用するかを知る必要はないということです。つまり、異なるチームが、チーム間で連携して必要なデータ形式に対処することなく、作業できることを意味します。さらに良いのは、Kinesis Data Streams クライアントが特定の時点から読み取る機能を備えているため、各コンシューマーは独自のペースでメッセージを処理できることです。ですので、メッセージバスは、システムを統合するのにあまり利用されていない方法なのです。

この記事で使用されている例では、行フェッチャーは中央データベースに接続する通常の Python プロセスであり、リードレプリカをシミュレートします。データベースは、Amazon RDS か、PostgreSQL の任意のインストールのいずれかです。Amazon RDS の場合、RDS インスタンスホストにカスタムソフトウェアをインストールすることができないため、フェッチャープロセスを別のホスト (例えば、Amazon EC2、または Lambda 関数) にインストールする必要があります。外部インストールの場合、フェッチャープロセスをデータベースと同じホストにインストールできます。

マスター PostgreSQL インスタンスを準備する

PostgreSQL マスター (信頼できる唯一の情報源) を、通常のレプリケーションのマスターであるように設定する必要があります。WAL ログを有効にする必要があります。つまり、RDS for PostgreSQL の場合、バックアップを有効にするだけです。また、Amazon RDS の PostgreSQL 論理レプリケーションは、新しいパラメータ、新しいレプリケーション接続タイプ、および新しいセキュリティロールによって有効になります。レプリケーションのクライアントは、PostgreSQL DB インスタンス上のデータベースへのレプリケーション接続を確立できる任意のクライアントです。詳細については、「Amazon RDSのPostgreSQL の論理レプリケーション」を参照してください。

論理レプリケーションスロットは、ストリームのレシーバーについて何も分かっていません。よって、論理レプリケーションスロットを設定してスロットから読み取らないと、データが DB インスタンスのストレージに書き込まれ、ストレージがすぐにいっぱいとなる可能性があります。PostgreSQL の論理レプリケーションの最も一般的なクライアントは、AWS Database Migration Service (AWS DMS) または Amazon EC2 インスタンス上のカスタム管理ホストです。例えば、AWS DMS タスクを構成していて、変更を積極的に使用しない場合、DB サーバー上のストレージがいっぱいとなる可能性があります。

Amazon RDS PostgreSQL DB インスタンスの論理レプリケーションを有効にするには、次の操作を行う必要があります。

  1. Amazon RDS 上にある PostgreSQL データベースの論理レプリケーションを開始する AWS ユーザーアカウントに、rds_superuserrds_replication があることを確認します。 Rds_replication ロールは、論理スロットを管理し、論理スロットを使用してデータをストリーミングする権限を与えます。
  2. Rds.logical_replication パラメータを 1 に設定します。これは静的パラメータで、実行には再起動が必要です。このパラメータを適用するには、wal_levelmax_wal_sendersmax_replication_slots、および max_connections パラメータを設定します。これらのパラメータ変更は WAL 生成を増加させる可能性があるので、rds.logical_replication パラメータは論理スロットを使用している時にのみ設定する必要があります。注 :
    • デフォルトの wal_level は REPLICA です。
    • Rds.logical_replication を 1 に設定すると、wal_level が LOGICAL に設定されます。
  3. 論理レプリケーションスロットを作成し、デコードプラグインを選択します。この例では、wal2json を使用して、次の手順に示すように、Python を用いてレプリケーションスロットを作成します。

DB インスタンスを変更する

新しいパラメータグループを使用するようにインスタンスを変更した後、インスタンスには「pending reboot」と表示されます。再起動して、新しいパラメータグループを有効にする必要があります。インスタンスを再起動したら、次のように新しいパラメータを確認できるはずです。

postgres> show wal_level;
+-------------+
| wal_level   |
|-------------|
| logical     |
+-------------+
SHOW
Time: 0.002s

postgres> show max_replication_slots;
+-------------------------+
| max_replication_slots   |
|-------------------------|
| 5                       |
+-------------------------+
SHOW
Time: 0.002s

RDS DB インスタンスの変更についての詳細は、「PostgreSQL データベースエンジンを実行している DB インスタンスの変更」を参照してください。

データベースユーザーに権限を追加する

Amazon RDS によって作成されたデフォルトの master ユーザーを使用している場合は、すでに必要な権限を持っている可能性があります。そうでない場合は、次の例のように、REPLICATION 権限を持つユーザーを作成する必要があります。

postgres> create user repluser password 'replpass';
CREATE ROLE
Time: 0.003s

postgres> grant rds_replication to repluser;
GRANT ROLE
Time: 0.002s

Kinesis データストリームを作成する

Kinesis データストリームと boto3 クライアントの認証情報が必要です。クライアントの認証情報については、Boto 3 ドキュメントを参照してください。AWS CLI または Amazon Kinesis コンソールのどちらかを使用して、データストリームを作成できます。

次の AWS CLI コマンドを使用して、Kinesis データストリームを作成します。

aws kinesis create-stream --stream-name Foo --shard-count 1

または、Amazon Kinesis コンソールを開き、ナビゲーションペインで [データストリーム] を選択します。次に、[Kinesis ストリームの作成] を選択します。

ストリームの名前とシャードの数を入力します。この例では、シャードが 1 つあります。

数分後には、ストリームは行の変更を受け入れる準備ができているはずです!

AWS CLI ユーザーに権限を割り当てる

AWS Identity and Access Management (IAM) を使用して、このストリームにアクセスする CLI ユーザーに権限を付与できます。

この例では、権限のあるユーザーは kinesis-rds-user です。新しいユーザーを作成したり、既存のユーザーを使用することはできますが、Kinesis データストリームへの書き込み権限を追加する必要があります。

ストリームに固有のポリシーを作成できます。この例では、Kinesis データストリームに完全にアクセスできる標準ポリシーを使用しています。

メッセージを Kinesis データストリームに公開する

WAL ログからの変更を読み込んで、Amazon Kinesis に公開するには、次のいずれかを使用します。

  • アクセスキー ID とシークレットアクセスキーを持つ Amazon EC2 マシン
  • IAM ロールのある Lambda 関数

ここでは、シンプルな Python プログラムを使って、上記 2 つの方法を紹介します。ただし、この例はテストコードであり、本番に対応していないことに注意してください。

オプション 1: Amazon EC2 マシンを使用する

EC2 マシンを kinesis-rds-user の AWS アクセスキー ID と、以前の投稿で作成した AWS シークレットアクセスキーを用いて使用するには、次のように認証情報を設定します。

[ec2-user@ip-172-xx-yy-zzz ~]$ cat ~/.aws/credentials 
[default]
aws_access_key_id = XXXXXXXXXXXXXXX
aws_secret_access_key = tSQYYYYYYXXX+TrDYYYYYYYY

この Python コードを使用する :

import boto3
import json
import random
import calendar
import time
from datetime import datetime
import psycopg2
from psycopg2.extras import LogicalReplicationConnection

my_stream_name = 'Foo'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
my_connection  = psycopg2.connect(
                   "dbname='postgres' host='mypgdb.abcdefghijk.us-east-1.rds.amazonaws.com' user='repluser' password='replpass'" ,
                   connection_factory = LogicalReplicationConnection)
cur = my_connection.cursor()
cur.drop_replication_slot('wal2json_test_slot')
cur.create_replication_slot('wal2json_test_slot', output_plugin = 'wal2json')
cur.start_replication(slot_name = 'wal2json_test_slot', options = {'pretty-print' : 1}, decode= True)

def consume(msg):
    kinesis_client.put_record(StreamName=my_stream_name, Data=json.dumps(msg.payload), PartitionKey="default")
    print (msg.payload)

cur.consume_stream(consume)

注 : オプションの print(msg.payload) を使用しましたが、これは次のプリント出力を示しています。例えば、RDS for PostgreSQL では、このサンプル DML を実行します。

postgres> \d+ employee
+----------+---------+-------------+-----------+----------------+---------------+
| Column   | Type    | Modifiers   | Storage   | Stats target   | Description   |
|----------+---------+-------------+-----------+----------------+---------------|
| empno    | integer |  not null   | plain     | <null>         | <null>        |
| deptid   | integer |             | plain     | <null>         | <null>        |
+----------+---------+-------------+-----------+----------------+---------------+
Indexes:
    "employee_pkey" PRIMARY KEY, btree (empno)
Foreign-key constraints:
    "employee_deptid_fkey" FOREIGN KEY (deptid) REFERENCES dept(deptid)
Has OIDs: no

Time: 0.032s

postgres> insert into employee values (1057,100);
INSERT 0 1
Time: 0.002s
postgres> insert into employee values (1058,100);
INSERT 0 1
Time: 0.002s
postgres> update employee set empno = 1059 where empno = 1057;
UPDATE 1
Time: 0.002s

 

Python コードは、次のようになります。

[ec2-user@ip-172-31-63-237 ~]$ python kinesis_producer_from_pg.py
{
    "change": [
        {
            "kind": "insert",
            "schema": "jatin",
            "table": "employee",
            "columnnames": ["empno", "deptid"],
            "columntypes": ["integer", "integer"],
            "columnvalues": [1057, 100]
        }
    ]
}
{
    "change": [
        {
            "kind": "insert",
            "schema": "jatin",
            "table": "employee",
            "columnnames": ["empno", "deptid"],
            "columntypes": ["integer", "integer"],
            "columnvalues": [1058, 100]
        }
    ]
}
{
    "change": [
        {
            "kind": "update",
            "schema": "jatin",
            "table": "employee",
            "columnnames": ["empno", "deptid"],
            "columntypes": ["integer", "integer"],
            "columnvalues": [1059, 100],
            "oldkeys": {
                "keynames": ["empno"],
                "keytypes": ["integer"],
                "keyvalues": [1057]
            }
    }
    ]
}

オプション 2AWS Lambda 関数を使用する

AWS Lambda 関数を使用するには、前の Python コードを再利用し、デプロイパッケージでパッケージ化することができます。詳細については、「デプロイパッケージ (Python) の作成」を参照してください。これは「AWS Lambda 開発者ガイド」の中にあります。

この例では、psycopg2 を使用しました。これは、Python プログラミング言語に対応した一般的な PostgreSQL アダプタです。パッケージは次のようになります。

ec2-user@ip-172-xx-yy-zzz $ ls -ltr
drwxrwxr-x 2 ec2-user ec2-user   4096 May 14 00:53 psycopg2
-rw-rw-r-- 1 ec2-user ec2-user 472722 May 19 20:13 mypackage.zip
-rw-rw-r-- 1 ec2-user ec2-user   1350 May 19 20:36 app.py

With app.py, it looks like the following:

import sys
import logging
import psycopg2
import boto3
import json
import random
import calendar
import time
from datetime import datetime
from psycopg2.extras import LogicalReplicationConnection

my_stream_name = 'Foo'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')

logger = logging.getLogger()
logger.setLevel(logging.INFO)

try:
    my_connection  = psycopg2.connect(
                      "dbname='postgres' host='mypgdb.xxxxxxxxxxxxx.us-east-1.rds.amazonaws.com' user='repluser' password='replpass'" ,
                      connection_factory = LogicalReplicationConnection)
except:
    logger.error("ERROR: Unexpected error: Could not connect to RDS for PostgreSQL instance.")
    sys.exit()

logger.info("SUCCESS: Connection to RDS for PostgreSQL instance succeeded")

def handler(event, context):
    """
    この関数は、コンテンツを RDS for PostgreSQL から Kinesis へストリーミングする
    """

    cur = my_connection.cursor()
    cur.create_replication_slot('wal2json_test_slot', output_plugin = 'wal2json')
    cur.start_replication(slot_name = 'wal2json_test_slot', options = {'pretty-print' : 1}, decode= True)

    cur.consume_stream(consume)

def consume(msg):
    kinesis_client.put_record(StreamName=my_stream_name, Data=json.dumps(msg.payload), PartitionKey="default")
    print (msg.payload)

IAM ロールを作成する

AWS Lambda 関数を作成する前に、Kinesis と Amazon RDS データベースにアクセスする権限を持つ、適切な IAM ロールがあることを確認する必要があります。また、Amazon RDS および Kinesis データストリームにアクセスできるように、適切なセキュリティグループを添付して、適切な仮想プライベートクラウド (VPC) に Lambda 関数を作成する必要があります。

IAM コンソールでは、以下に示すように、Lambda 関数に割り当てる IAM ロールを作成できます。

AWS Lambda 関数を作成する

AWS CLI を使用して、次のように Lambda 関数を作成できます。

aws lambda create-function \
--region us-east-1 \
--function-name postgres-kinesis \
--zip-file fileb://mypackage.zip \
--role arn:aws:iam::123456789111:role/lambda-role-mysql \
--handler app.handler \
--runtime python2.7 \
--timeout 15 \
--memory-size 512

テストする

Lambda 関数を作成したら、それを起動してテストすることができます。

ちなみに、別のセッションでは、次のように、シェルスクリプトを使用してレコードが Kinesis データストリームに公開されているかどうかをテストすることもできます。

streamname=Foo; aws kinesis describe-stream --stream-name $streamname --output text | grep SHARDS | awk '{print $2}' | while read shard; do aws kinesis get-shard-iterator --stream-name $streamname --shard-id $shard --shard-iterator-type LATEST --output text | while read iterator; do while output=`aws kinesis get-records --shard-iterator $iterator --output text`; do iterator=`echo "$output" | head -n1 | awk '{print $2}'`; echo "$output" | awk 'NR > 1' | grep RECORDS | while read record; do echo $record | awk '{print $3}' | base64 -id; done; done; done; done

この例では、自分のレコードが Kinesis データストリームに公開されていることを示しています。

"{\n\t\"change\": [\n\t\t{\n\t\t\t\"kind\": \"insert\",\n\t\t\t\"schema\": \"<>\",\n\t\t\t\"table\": \"employee\",\n\t\t\t\"columnnames\": [\"empno\", \"deptid\"],\n\t\t\t\"columntypes\": [\"integer\", \"integer\"],\n\t\t\t\"columnvalues\": [1234, 100]\n\t\t}\n\t]\n}""{\n\t\"change\": [\n\t\t{\n\t\t\t\"kind\": \"insert\",\n\t\t\t\"schema\": \"<>\",\n\t\t\t\"table\": \"employee\",\n\t\t\t\"columnnames\": [\"empno\", \"deptid\"],\n\t\t\t\"columntypes\": [\"integer\", \"integer\"],\n\t\t\t\"columnvalues\": [2222, 101]\n\t\t}\n\t]\n}""{\n\t\"change\": [\n\t\t{\n\t\t\t\"kind\": \"update\",\n\t\t\t\"schema\": \"<>\",\n\t\t\t\"table\": \"employee\",\n\t\t\t\"columnnames\": [\"empno\", \"deptid\"],\n\t\t\t\"columntypes\": [\"integer\", \"integer\"],\n\t\t\t\"columnvalues\": [3333, 101],\n\t\t\t\"oldkeys\": {\n\t\t\t\t\"keynames\": [\"empno\"],\n\t\t\t\t\"keytypes\": [\"integer\"],\n\t\t\t\t\"keyvalues\": [2222]\n\t\t\t}\n\t\t}\n\t]\n}""{\n\t\"change\": [\n\t\t{\n\t\t\t\"kind\": \"delete\",\n\t\t\t\"schema\": \"<>\",\n\t\t\t\"table\": \"employee\",\n\t\t\t\"oldkeys\": {\n\t\t\t\t\"keynames\": [\"empno\"],\n\t\t\t\t\"keytypes\": [\"integer\"],\n\t\t\t\t\"keyvalues\": [1234]\n\t\t\t}\n\t\t}\n\t]\n}"

メッセージを使用する

これで、変更されたレコードを使用する準備が整いました。すべてのコンシューマーコードが動作します。この投稿のコードを使用すると、前述のように JSON 形式のメッセージが表示されます。

まとめ

この記事では、Amazon Kinesis Data Streams を使用して、変更ストリームをデータベースのレコードに公開する方法を説明しました。データ指向の企業の多くは、これに類似したアーキテクチャを使用しています。この記事で示す例は、実際の本番環境では使用できませんが、この統合スタイルを試して、エンタープライズアーキテクチャの拡張機能を改善することが可能です。最も複雑な部分はおそらく、Amazon Kinesis Data Streams が水面下で既に解決しているでしょう。

その他のリソース

詳細については、以下のソースを参照してください。

チェンジセット抽出用 JSON 出力プラグイン (GitHub)

レプリケーション接続とカーソルクラス」 (Psycopg 2.7.5 ドキュメント)

Adding replication protocol」 (PDF ファイル)


著者について

Jatin Singh はアマゾン ウェブ サービスのパートナーソリューションアーキテクトです。 彼は、AWS を使用している場合にソリューションの価値を向上させる手助けとなるために、AWS の顧客と協力してデータベースプロジェクト上の指導や技術支援を行っています。