Amazon Web Services ブログ

Amazon EventBridge Pipes と Timestream for LiveAnalytics を使用した時系列アプリケーションの迅速な開発

Amazon Timestream for LiveAnalytics は、高速でスケーラブルなサーバーレス時系列データベースであり、1 日に数兆件のイベントを簡単かつコスト効率よく保存および分析する事ができます。Timestream for LiveAnalytics は、数億の IoT デバイス、産業機器の監視や、ゲームセッション、ストリーミングビデオセッション、金融、ログ分析などのユースケースに使用できます。Timestream for LiveAnalytics を使用すると、高可用性を実現しながら、1 分あたり数十ギガバイトの時系列データを取り込み、数秒でテラバイトの時系列データに対する SQL クエリを実行できます。

現在、様々な組織が、時間に敏感なデータのリアルタイムの洞察を提供することで、競争上の優位性を獲得しようとしていますが、データストリームを使用したリアルタイムデータの取り込みをサポートし、それを Timestream for LiveAnalytics で処理して保存するデータパイプラインが必要となります。時系列データをサポートしている AWS ソースから Timestream for LiveAnalytics に転送するには、Apache Flink アダプターなどのすぐに使えるアダプター、またはカスタマイズされた専用のコードが必要になる場合がありました。

時系列データの取り込みを簡素化する為、Amazon EventBridge Pipes と Timestream for LiveAnalytics の統合機能が今回使えるようになりました。EventBridge Pipes を使用して、Amazon DynamoDBAmazon Kinesis Data StreamsAmazon MQAmazon Simple Queue Service (Amazon SQS) などのさまざまなソースから Timestream for LiveAnalytics にデータを取り込めるようになりました。本リリースにより、Timestream for LiveAnalytics にデータを取り込むための、柔軟なローコード・ノーコード (LCNC) 構成ベースのソリューションが利用可能となります。

EventBridge Pipes は、サポートされているソースターゲット間のポイントツーポイント統合を目的としており、高度な変換エンリッチメントフィルター処理をサポートしています。EventBridge Pipes は、イベント駆動型アーキテクチャを開発する際の専門知識と統合コードの必要性を軽減し、企業のアプリケーション間の一貫性を促進します。

本投稿では、Timestream for LiveAnalytics が Kinesis Data Streams からデータを取り込めるように EventBridge Pipes を設定する方法を紹介します。

ソリューションの概要

以下の図は、EventBridge Pipes を利用して Kinesis Data Streams からデータを取り込む為のアーキテクチャを示しています。EventBridge Pipes がサポートしているその他の統合を調べるには、Amazon EventBridge Pipes sources を参照してください。

次の JSON でサンプル車両データを生成し、Amazon Kinesis Data Generator (KDG) を使用して Kinesis Data Streams にストリーミングします。 Kinesis Data Streams から Timestream for LiveAnalytics にデータを取り込むためのパイプを以下で設定していきます。

{
    "connectionTime": "{{date.now("YYYY-MM-DD HH:mm:ss.SSS")}}",
    "vehicle_id": "CAR_{{random.number({"min":1, "max":100})}}",
    "driver_id": "USER_{{random.number({"min":1, "max":1000})}}",
    "engine_status": "{{random.number({"min":0, "max":1})}}",
    "miles": "{{random.number({"min":0, "max":1000})}}.0"   
}
JSON

前提

本投稿では、データベース VehicleMetricsDBテーブル VehicleMetrics を作成しました。カーディナリティの高い列を持ち、テーブルのクエリで述語として頻繁に使用されるディメンションに基づいてパーティションキーを選択することをお勧めします。これにより、データがパーティション間で均等に分散され、パフォーマンスに関する問題を回避する事ができます。本投稿では、顧客定義のパーティションキーとして vehicle_id を持つ車両から生成されたデータを使います。またデータストリーム vehicle-metrics-stream についても作成しました。

注意: このソリューションでは、アカウントにコストが発生する AWS リソースが作成されます。本投稿の一部として作成したリソースは、完了したら必ず削除して下さい。

EventBridge pipe を作成する

  1. EventBridge Pipes のコンソール画面を開く
  2. Create pipe を選択
  3. Pipe name として作成するパイプ名を指定する
  4. Description として説明を入力する (オプション)
  5. Build pipe タブの SourceKinesis を選択し、適切なストリームを選択する
  6. Additional setting で要件に応じて、バッチサイズや多重度の値を設定する
    1. On partial batch item failureAUTOMATIC_BISECT を選択する事を推奨します。各バッチは自動的に半分に分割され、すべてのレコードが処理されるか、失敗したメッセージのみがバッチ内に残るまで、半分ずつ再試行されます。
    2. Batch size – デフォルトの 100 を選択しましょう。尚、Amazon SQS がソースの場合は最大のバッチサイズは 10 になります。

  1. Build pipe タブの Target を選択
  2. このデモでは、フィルタリングエンリッチメントのオプションは必要ないため、ターゲットを直接設定しますが、要件に基づいてフィルタリングとエンリッチメントの適用を選択できます。エンリッチメントレイヤーを追加すると、さまざまなサービスがさまざまなレベルのバッチ処理をサポートします。
  3. DetailsTarget service で、ターゲットとして Timestream for LiveAnalytics を選択し、対象のデータベースとテーブルを選択します。

データが有効な JSON の場合、入力テンプレートまたはターゲットパラメーターの JSON パスはコンテンツを直接参照できます。たとえば、<$.data.someKey> を使用して変数を参照できます。

  1. Time field type には、$.data.connectionTime をデフォルトの時間として TIMESTAMP_FORMAT (yyyy-MM-dd HH:mm:ss.SSS) を使用します。要件によっては、EPOCH 形式も選択できます。
  2. Version value を指定する事で、最新の値を使用してデータポイントを更新できます。各 Kinesis レコードには、ApproximateArrivalTimestamp という値が含まれており、ストリームがレコードを正常に受信して保存したときに設定されます。本投稿では、更新を処理するためにバージョン値を $.data.approximateArrivalTimestampに設定しました。そのため、最新の ApproximateArrivalTimestamp でメジャーが変更され、同じディメンションと時間の値が更新されます。

次のステップでは、Timestream for LiveAnalytics のデータモデル (ディメンション、メジャー、メジャー名、および必要な列のデータ型) を構成します。この構成では、ビジュアルビルダーまたは JSON エディターのいずれかを使用できます。データモデリングのベストプラクティスに従う事で、効果的なデータモデリングを実現します。

  1. JSON エディターを利用して以下の情報を提供します
    1. DimensionValue (ディメンジョン)として、vehicle_iddriver_id を使います。
    2. MeasureValue (メジャー)としては、マイル数とengine_statusを使います。
    3. MultiMeasureName (メジャー名) には、’metric’ という固定値を使用します。要件に基づいて特定のソース列をメジャー名として選択できますが、8,192 個の個別の値を超えないようにしてください。
{
  "DimensionMappings": [
    {
      "DimensionValue": "$.data.vehicle_id",
      "DimensionValueType": "VARCHAR",
      "DimensionName": "vehicle_id"
    },
    {
      "DimensionValue": "$.data.driver_id",
      "DimensionValueType": "VARCHAR",
      "DimensionName": "driver_id"
    }
  ],
  "MultiMeasureMappings": [
    {
      "MultiMeasureName": "metric",
      "MultiMeasureAttributeMappings": [
        {
          "MeasureValue": "$.data.miles",
          "MeasureValueType": "DOUBLE",
          "MultiMeasureAttributeName": "miles"
        },
        {
          "MeasureValue": "$.data.engine_status",
          "MeasureValueType": "BIGINT",
          "MultiMeasureAttributeName": "engine_status"
        }
      ]
    }
  ]
}
JSON

Visual Builder を使用してデータモデルをセットアップすることもできます。以下のスクリーンショットは、Visual Builder を使ったディメンションの設定例です。MULTI_MEASURE_NAMEMULTI_MEASURE_VALUE に対しても同様に設定することができます。

これで、パイプの詳細設定 (アクセス許可、再試行ポリシー、Amazon CloudWatch logs) ができるようになりました。

  1. Pipe settings タブの Permission セクションで、新しい AWS Identity and Access Management (IAM) ロールを定義するか、既存のロールを使用できます。初めてパイプを作成する場合は、 Create a new role for specific resource を選択します。

Kinesis データストリームまたは DynamoDB ストリームをパイプのソースとして指定した場合は、オプションで再試行ポリシーとデッドレターキュー (DLQ) を設定できます。指定した保存期間より古いレコードを破棄し、失敗した場合の再試行回数を指定できます。

  1. Retry policy では、イベントの最大経過時間として、少なくとも 30 分以上を使用することが推奨されます。このデモでは、30 分に設定したため、30 分より古いレコードは処理されず、DLQ に直接移動されます。
  2. Retry attempts は 10 回に設定します。これは、一時的な問題に対処するための推奨値です。永続的な問題の場合は、再試行後にレコードが DLQ に移動され、残りのストリームのブロックが解除されます。
  3. 設定ミスによるデータ損失を避けるために、パイプの DLQ を構成することを強くお勧めします。 DLQ で Kinesis イベントのシーケンス番号を参照してレコードを修正し、Timestream に再送信できます。 DLQ を有効にするには、以下の Dead-letter queue の箇所を ON とし、使用するキューまたはトピックを選択します。このデモでは、Amazon SQS を使用し、同じ AWS アカウントと AWS リージョン内のキューを選択します。

  1. Log destination で、ログレコードの配信先を選択します。ここでは、CloudWatch Logs を選択します。
  2. Log level では、EventBridge がログレコードに含める情報のレベルを選択します。デフォルトでは、ERROR ログレベルが選択されています。ここでは、詳細を確認できるように、Log LevelERROR から INFO に変更します。
  3. ターゲットが Timestream for LiveAnalytics の場合は、include execution data オプションを選択することをお勧めします。これにより、EventBridge はイベントのペイロード情報、サービスの要求/応答情報をログレコードに含めることができます。

EventBridge Pipes の実行データは、トラブルシューティングとデバッグに役立ちます。ペイロードフィールドには、バッチに含まれる各イベントの実際のコンテンツが含まれているため、個々のイベントを特定のパイプ実行に関連付けることができます。ただし、受信データには機密情報が含まれている可能性があり、このオプションを有効にすると、実際のイベントデータが選択したすべての宛先に記録されるため、データの機密性に基づいて正しい決定を下すようにしてください。

  1. Create pipe を選択してパイプを作成します。

EventBridge コンソールで作成した特定のパイプに移動し、パイプが Running 状態になるまで待ちます。

ソリューションの検証

サンプルアーキテクチャの箇所で説明したように、KDG を使用してデータをストリーミングしました。この取り込みを検証するには、Amazon Timestream for LiveAnalytics のクエリエディターを開き、次の SQL クエリを実行します。

SELECT * FROM "VehicleMetricsDB"."VehicleMetrics" limit 5
SQL

以下はクエリの結果を示しています。

要件に応じて、Timestream のクエリ言語を使用して様々なクエリを実行できます。

考慮事項

AWS Command Line Interface (CLI) (AWS CLI)、AWS CloudFormationAWS Cloud Development Kit (AWS CDK) を使用して、EventBridge Pipes のパイプを作成することもできます。

また、CloudWatch メトリクスを使用して、取り込みの進行状況の監視が可能で、EventBridge コンソールの Monitoring タブでは呼び出しと失敗を視覚化できます。トラブルシューティングについては、Log Amazon EventBridge Pipes を参照してください。

クリーンアップ

料金の発生を回避するには、AWS マネジメントコンソールを使用して、このデモの為に作成したリソースを削除します。

  1. Timestream のコンソールで Timestream のデータベースとテーブルを削除します。
  2. EventBrdige のコンソールで作成したパイプを選択して削除します。
  3. Kinesis Data Streams のコンソールで作成したストリームを削除します。
  4. CloudFormation コンソールで、KDG 用に作成した CloudFormation スタックを削除します。

結論

本投稿では、EventBridge Pipes を統合して Kinesis Data Streams からデータを Timstream for LiveAnalytics に取り込む方法を説明しました。この新しい統合は、時系列データの取り込みを簡素化するのに役立ちます。

詳細については、次のリソースを参照してください。

本投稿へのフィードバックについては、Amazon Timestream の AWS re:Post に送信して下さい。

翻訳はテクニカルアカウントマネージャーの西原が担当しました。原文はこちらをご覧下さい。