Amazon Web Services ブログ

Amazon Kinesis Data Analytics for SQL アプリケーションから Amazon Managed Service for Apache Flink Studio への移行

本記事は、AWS の Worldwide Public Sector におけるパートナーソリューションアーキテクトである Nicholas Tunney によって作成されたものの日本語版です。原文はこちらよりご確認いただけます。

2024 年 10 月 17 日: Amazon Kinesis Data Analytics for SQL の提供終了が発表されました。詳細は AWS News Blog をご覧ください。

2024 年 2 月 9 日: Amazon Kinesis Data Firehose は Amazon Data Firehose に名称変更されました。詳細は AWS What’s New post をご覧ください。

2023 年 8 月 30 日: Amazon Kinesis Data Analytics は Amazon Managed Service for Apache Flink に名称変更されました。詳細は AWS News Blog をご覧ください。

この記事では、Apache Flink の高度なストリーミング機能を活用するために、Kinesis Data Analytics for SQL アプリケーションから Amazon Managed Service for Apache Flink への移行を AWS が推奨する理由について説明します。また、Amazon Managed Service for Apache Flink Studio を使用して、移行したアプリケーションをデプロイする前に分析アプリケーションをテスト・チューニングする方法も紹介します。Kinesis Data Analytics for SQL アプリケーションを利用されていないお客様に対しても、この記事はデータ分析の過程で遭遇する多くのユースケースと、Amazon Managed Service for Apache Flink がどのように目標達成を支援できるかについて、背景となる情報を提供します。

Amazon Managed Service for Apache Flink は、フルマネージド型の Apache Flink サービスです。アプリケーション JAR または実行可能ファイルをアップロードするだけで、AWS がインフラストラクチャと Flink ジョブのオーケストレーションを管理します。また、Apache Flink を使用するノートブック環境である Amazon Managed Service for Apache Flink Studio を活用することで、データストリームのクエリや SQL クエリの開発、または概念実証ワークロードの開発を行うことを容易とし、アプリケーションの本番環境への展開を数分で行うこともできます。

Kinesis Data Analytics for SQL よりも Amazon Managed Service for Apache Flink または Amazon Managed Service for Apache Flink Studio の使用をお勧めします。Amazon Managed Service for Apache Flink と Amazon Managed Service for Apache Flink Studio が、exactly-once 処理セマンティクス、イベント時間ウィンドウ、ユーザー定義関数 (UDF) とカスタム統合を使用した拡張性、命令型言語のサポート、永続的なアプリケーション状態、水平スケーリング、複数のデータソースのサポートなど、高度なデータストリーム処理機能を提供するためです。Kinesis Data Analytics for SQL にはないこれらの機能は、データストリーム処理の正確性、完全性、一貫性、信頼性を保証する上で重要なものです。

ソリューション概要

今回のユースケースは、いくつかの AWS サービスを使用して、サンプルの自動車センサーデータをストリーミング、取り込み、変換し、Amazon Managed Service for Apache Flink Studio を使用してリアルタイムで分析するというものです。Amazon Managed Service for Apache Flink Studio を使用すると、Web ベースの開発環境であるノートブックを作成できます。ノートブックを使用すると、Apache Flink が提供する高度な機能と組み合わせた、シンプルでインタラクティブな開発エクスペリエンスを得ることができます。Amazon Managed Service for Apache Flink Studio は Apache Zeppelin をノートブックとして使用し、Apache Flink をストリーム処理エンジンとして使用します。Amazon Managed Service for Apache Flink Studio のノートブックは、これらのテクノロジーをシームレスに組み合わせて、あらゆるスキルレベルの開発者がデータストリームの高度な分析にアクセスできるようにします。ノートブックはすぐにプロビジョニングされ、ストリーミングデータを即座に表示および分析する手段を提供します。Apache Zeppelin は、Studio ノートブックに以下を含む完全な分析ツールスイートを提供します。

  • データの可視化
  • ファイルへのデータのエクスポート
  • より簡単な分析のための出力フォーマットの制御
  • ノートブックをスケーラブルな本番アプリケーションに変換する機能

Kinesis Data Analytics for SQL アプリケーションとは異なり、Amazon Managed Service for Apache Flink は以下の SQL を追加でサポートします。

  • 複数の Kinesis データストリーム間、または Kinesis データストリームと Amazon Managed Streaming for Apache Kafka (Amazon MSK) トピック間でのストリームデータの結合
  • データストリーム内の変換されたデータのリアルタイム可視化
  • 同じアプリケーション内での Python スクリプトまたは Scala プログラムの使用
  • ストリーミングレイヤーのオフセットの変更

Amazon Managed Service for Apache Flink のもう一つの利点は、デプロイ後にソリューションのスケーラビリティが向上することです。需要に応じて基盤となるリソースをスケールできるためです。Kinesis Data Analytics for SQL アプリケーションをスケーリングするためには、ポンプを追加してアプリケーションにより多くのリソースを使用するよう促す必要があります。

このソリューションでは、自動車センサーデータにアクセスし、データのエンリッチを行い、結果を Amazon Data Firehose ストリーム経由で、Amazon Simple Storage Service (Amazon S3) データレイクに配信する Amazon Managed Service for Apache Flink Studio ノートブックを作成します。このパイプラインは、さらなる処理や可視化のために Amazon OpenSearch Service やその他のターゲットにデータを送信する際にも使用できます。

Kinesis Data Analytics for SQL アプリケーション vs. Amazon Managed Service for Apache Flink

この例では、ストリーミングデータに対して以下のアクションを実行します。

  1. Amazon Kinesis Data Streams データストリームに接続する。
  2. ストリームデータを表示する。
  3. データを変換および充実させる。
  4. Python でデータを操作する。
  5. データを Firehose ストリームに再ストリーミングする。

Kinesis Data Analytics for SQL アプリケーションと Amazon Managed Service for Apache Flink を比較するために、まず Kinesis Data Analytics for SQL アプリケーションがどのように機能するかを説明しましょう。

Kinesis Data Analytics for SQL アプリケーションの根幹には、アプリケーション内ストリームの概念があります。アプリケーション内ストリームは、ストリーミングデータを保持し、データに対してアクションを実行できるテーブルと考えることができます。アプリケーション内ストリームは、Kinesis Data Streams などのストリーミングソースにマッピングされます。アプリケーション内ストリームにデータを取り込むには、まず Kinesis Data Analytics for SQL アプリケーションの管理コンソールでソースをセットアップします。次に、ソースストリームからデータを読み取り、テーブルに配置するポンプを作成します。ポンプクエリは継続的に実行され、ソースデータをアプリケーション内ストリームに供給します。複数のソースから複数のポンプを作成して、アプリケーション内ストリームにデータを供給できます。その後、アプリケーション内ストリームに対してクエリが実行され、結果を解釈したり、さらに処理や保存のために他の宛先に送信したりできます。

以下の SQL は、アプリケーション内ストリームとポンプのセットアップを示しています。

CREATE OR REPLACE STREAM "TEMPSTREAM" ( 
   "column1" BIGINT NOT NULL, 
   "column2" INTEGER, 
   "column3" VARCHAR(64));

CREATE OR REPLACE PUMP "SAMPLEPUMP" AS 
INSERT INTO "TEMPSTREAM" ("column1", 
                          "column2", 
                          "column3") 
SELECT STREAM inputcolumn1, 
      inputcolumn2, 
      inputcolumn3
FROM "INPUTSTREAM";

アプリケーション内ストリームからデータを読み取るには、SQL SELECT クエリを使用します。

SELECT *
FROM "TEMPSTREAM"

Amazon Managed Service for Apache Flink Studio で同様のセットアップを行う場合、基盤となる Apache Flink 環境を使用してストリーミングソースに接続し、コネクタを使用して 1 つのクエリでデータストリームを作成します。以下の例は、以前と同じソースに接続していますが、Apache Flink を使用しています。

CREATE TABLE `MY_TABLE` ( 
   "column1" BIGINT NOT NULL, 
   "column2" INTEGER, 
   "column3" VARCHAR(64)
) WITH (
   'connector' = 'kinesis',
   'stream' = sample-kinesis-stream',
   'aws.region' = 'aws-kinesis-region',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'
 );

MY_TABLE は、サンプルの Kinesis Data Streams からデータを継続的に受信するデータストリームです。SQL SELECT クエリを使用して問い合わせが可能です。

SELECT column1, 
       column2, 
       column3
FROM MY_TABLE;

Kinesis Data Analytics for SQL アプリケーションはストリーミングデータ上での操作を可能にする拡張機能を持つ SQL:2008 標準のサブセットを使用していますが、Apache Flink の SQL サポートは SQL 標準を実装する Apache Calcite を基にしています。

また、Amazon Managed Service for Apache Flink Studio が同じノートブック内で、 SQL の他に PyFlink と Scala の実行をサポートしていることも重要なポイントです。これにより、SQL だけでは不可能な、プログラミングによる複雑なストリーミングデータ処理を行うことができます。

前提条件

この演習では、さまざまな AWS リソースをセットアップし、分析クエリを実行します。移行の作業を進めるには、管理者アクセス権を持つ AWS アカウントが必要です。まだ管理者アクセス権を持つ AWS アカウントをお持ちでない場合は、作成してから以降の作業を進めてください。この記事で説明するサービスは、AWS アカウントに課金される可能性があります。不要な課金を停止するために、この記事の最後にあるクリーンアップ手順に従ってください。

ストリーミングデータの構成

ストリーミングの領域における典型的なタスクは、IoT センサーからのデータの取得、変換、エンリッチです。本演習では、リアルタイムのセンサーデータを生成するために、AWS IoT Device Simulator を使用します。シミュレータは AWS アカウント内で実行され、Web インターフェイスを提供します。ユーザーは、ユーザー定義のテンプレートから仮想的に接続されたデバイスのフリートを起動し、それらをシミュレートして定期的に AWS IoT Core にデータを配信できます。本演習用のサンプルデータを生成するための仮想デバイスフリートを構築できるということです。

以下の Amazon CloudFormation テンプレートを使用して IoT Device Simulator をデプロイします。これにより、必要なすべてのリソースがアカウント内に作成されます。

  1. 「スタックの詳細を指定」ページで、ソリューションスタックに名前を割り当てます。
  2. 「パラメータ」で、このソリューションテンプレートのパラメータを確認し、必要に応じて変更します。
  3. 「User email」に、IoT Device Simulator UI にログインするためのリンクとパスワードを受け取るための有効なメールアドレスを入力します。
  4. 「次へ」を選択します。
  5. 「スタックオプションの設定」ページで、「次へ」を選択します。
  6. 「確認」ページで、設定を確認して確認します。テンプレートが AWS Identity and Access Management (IAM) リソースを作成することを認識するチェックボックスを選択します。
  7. 「スタックの作成」を選択します。

スタックのインストールには約 10 分かかります。

  1. 招待メールを受け取ったら、CloudFront リンクを選択し、メールに記載されている認証情報を使用して IoT Device Simulator にログインします。 このソリューションには、AWS ですぐにセンサーデータの配信を開始できるようにするための、事前構築された自動車デモが含まれています。
  2. 「Device Type」ページで、「Create Device Type」を選択します。
  3. 「Automotive Demo」を選択します。
  4. ペイロードは自動的に入力されます。デバイスの名前を入力し、トピックとして “automotive-topic” を入力します。
  5. 「Save」を選択します。

次に、シミュレーションを作成します。

  1. 「Simulations」ページで、「Create Simulation」を選択します。
  2. 「Simulation type」で、「Automotive Demo」を選択します。
  3. 「Select a device type」で、作成したデモデバイスを選択します。
  4. 「Data transmission interval」と「Data transmission duration」に希望の値を入力します。 好みの値を入力できますが、少なくとも 10 秒ごとに送信する 10 台のデバイスを使用してください。データ送信期間は数分に設定してください。そうしないと、ラボ中に何度もシミュレーションを再起動する必要があります。
  5. 「Save」を選択します。

これでシミュレーションを実行できます。

  1. 「Simulations」ページで、目的のシミュレーションを選択し、「Start simulations」を選択します。 または、実行したいシミュレーションの横にある「View」を選択し、「Start」を選択してシミュレーションを実行します。
  2. シミュレーションを表示するには、表示したいシミュレーションの横にある「View」を選択します。 シミュレーションが実行中の場合、デバイスの位置を示すマップと、IoT トピックに送信された最新の 100 件までのメッセージを表示できます。

次に、シミュレータが AWS IoT Core にセンサーデータを送信していることを確認できます。

  1. AWS IoT Core コンソールに移動します。

IoT Device Simulator をデプロイしたのと同じリージョンにいることを確認してください。

  1. ナビゲーションペインで、「MQTT Test Client」を選択します。
  2. トピックフィルターとして “automotive-topic” を入力し、「Subscribe」を選択します。

シミュレーションを実行している限り、IoT トピックに送信されているメッセージが表示されます。

最後に、IoT メッセージを Kinesis Data Streams にルーティングするルールを設定できます。このストリームは、Amazon Managed Service for Apache Flink Studio ノートブックのソースデータを提供します。

  1. AWS IoT Core コンソールで、「Message Routing」と「Rules」を選択します。
  2. ルールの名前を入力します(例: “automotive_route_kinesis”)、そして「Next」を選択します。
  3. 以下の SQL クエリを実行します。この SQL は、IoT Device Simulator が公開している “automotive-topic” からすべてのメッセージ列を選択します。
SELECT timestamp, trip_id, VIN, brake, steeringWheelAngle, torqueAtTransmission, engineSpeed, vehicleSpeed, acceleration, parkingBrakeStatus, brakePedalStatus, transmissionGearPosition, gearLeverPosition, odometer, ignitionStatus, fuelLevel, fuelConsumedSinceRestart, oilTemp, location 
FROM 'automotive-topic' WHERE 1=1
  1. 「Next」を選択します。
  2. 「Rule Actions」で、ソースとして「Kinesis Stream」を選択します。
  3. 「Create New Kinesis Stream」を選択します。

これにより新しいウィンドウが開きます。

  1. 「Data stream name」に “automotive-data” と入力します。

この演習では CloudFormation により事前に作成されたストリームを使用します。

  1. 「Create Data Stream」を選択します。

このウィンドウを閉じて AWS IoT Core コンソールに戻ることができます。

  1. 「Stream name」の横にある更新ボタンを選択し、”automotive-data” ストリームを選択します。
  2. 「Create new role」を選択し、ロールに “automotive-role” という名前を付けます。
  3. 「Next」を選択します。
  4. ルールのプロパティを確認し、「Create」を選択します。

ルールはすぐにデータのルーティングを開始します。

Amazon Managed Service for Apache Flink Studio のセットアップ

データが AWS IoT Core を通じてストリーミングされ、Kinesis Data Streams に入力されるようになったので、Amazon Managed Service for Apache Flink Studio ノートブックを作成できます。

  1. Amazon Kinesis コンソールで、ナビゲーションペインの「Analytics applications」を選択します。
  2. 「Studio」タブで、「Create Studio notebook」を選択します。
  3. 「Quick create with sample code」を選択したままにします。
  4. ノートブックに “automotive-data-notebook” という名前を付けます。
  5. 新しいウィンドウで「Create」を選択して、AWS Glue の Data Catalog 内に新規にデータベースを作成します。
  6. 「Add database」を選択します。
  7. データベースに “automotive-notebook-glue” という名前を付けます。
  8. 「Create」を選択します。
  9. 「Create Studio notebook」セクションに戻ります。
  10. 更新を選択し、新しい AWS Glue データベースを選択します。
  11. 「Create Studio notebook」を選択します。.
  12. Studio ノートブックを開始するには、「Run」を選択して確認します。
  13. ノートブックが実行されたら、ノートブックを選択し、「Open in Apache Zeppelin」を選択します。
  14. 「Import note」を選択します。
  15. 「Add from URL」を選択します。
  16. 以下の URL を入力します: https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-2461/auto-notebook.ipynb
  17. 「Import Note」を選択します。
  18. 新しいノートを開きます。

ストリーム分析の実行

Kinesis Data Analytics for SQL アプリケーションでは、管理コンソールを通じてストリーミングソースを追加し、アプリケーション内ストリームとポンプを定義して、Kinesis Data Streams からデータをストリーミングします。アプリケーション内ストリームは、データを保持し、クエリに利用できるようにするテーブルとして機能します。ポンプは、ソースからデータを取り込み、アプリケーション内ストリームにストリーミングします。その後、任意の SQL テーブルをクエリするのと同じように、SQL を使用してアプリケーション内ストリームに対してクエリを実行できます。以下のコードをご覧ください:

CREATE OR REPLACE STREAM "AUTOSTREAM" ( 
    `trip_id` CHAR(36),
    `VIN` CHAR(17),
    `brake` FLOAT,
    `steeringWheelAngle` FLOAT,
    `torqueAtTransmission` FLOAT,
    `engineSpeed` FLOAT,
    `vehicleSpeed` FLOAT,
    `acceleration` FLOAT,
    `parkingBrakeStatus` BOOLEAN,
    `brakePedalStatus` BOOLEAN,
    `transmissionGearPosition` VARCHAR(10),
    `gearLeverPosition` VARCHAR(10),
    `odometer` FLOAT,
    `ignitionStatus` VARCHAR(4),
    `fuelLevel` FLOAT,
    `fuelConsumedSinceRestart` FLOAT,
    `oilTemp` FLOAT,
    `location` VARCHAR(100),
    `timestamp` TIMESTAMP(3));

CREATE OR REPLACE PUMP "MYPUMP" AS 
INSERT INTO "AUTOSTREAM" ("trip_id",
    "VIN",
    "brake",
    "steeringWheelAngle",
    "torqueAtTransmission",
    "engineSpeed",
    "vehicleSpeed",
    "acceleration",
    "parkingBrakeStatus",
    "brakePedalStatus",
    "transmissionGearPosition",
    "gearLeverPosition",
    "odometer",
    "ignitionStatus",
    "fuelLevel",
    "fuelConsumedSinceRestart",
    "oilTemp",
    "location",
    "timestamp")
SELECT VIN,
    brake,
    steeringWheelAngle,
    torqueAtTransmission,
    engineSpeed,
    vehicleSpeed,
    acceleration,
    parkingBrakeStatus,
    brakePedalStatus,
    transmissionGearPosition,
    gearLeverPosition,
    odometer,
    ignitionStatus,
    fuelLevel,
    fuelConsumedSinceRestart,
    oilTemp,
    location,
    timestamp
FROM "INPUT_STREAM"

Kinesis Data Analytics for SQL アプリケーションからのアプリケーション内ストリームとポンプを Amazon Managed Service for Apache Flink Studio に移行するには、ポンプ定義を削除し、kinesis コネクタを定義することで、これを単一の CREATE クエリに変換します。Zeppelin ノートブックの最初の段落では、テーブルとして提示されるコネクタをセットアップします。受信メッセージのすべての項目、またはそのサブセットに対して列を定義できます。

クエリを実行すると、ノートブックに成功結果が出力されます。これで SQL を使用してこのテーブルにクエリを実行したり、PyFlink や Scala を使用してこのデータにプログラムによる操作を実行したりできます。

ストリーミングデータにリアルタイム分析を実行する前に、データが現在どのようにフォーマットされているかを見てみましょう。これを行うには、作成したテーブルに対して簡単な Flink SQL クエリを実行します。ストリーミングアプリケーションで使用される SQL は、SQL アプリケーションで使用されるものと同じです。

数秒後にレコードが表示されない場合は、IoT Device Simulator がまだ実行中であることを確認してください。

Kinesis Data Analytics for SQL コードも実行している場合、結果セットが若干異なる場合があります。これは Amazon Managed Service for Apache Flink のもう一つの重要な違いです。後者には exactly once 配信の概念があるためです。このアプリケーションが本番環境にデプロイされ、再起動されたり、スケーリングアクションが発生したりした場合、Amazon Managed Service for Apache Flink は各メッセージを一度だけ受信することを保証します。一方、Kinesis Data Analytics for SQL アプリケーションでは、結果に影響を与える可能性のある重複メッセージを無視するために、受信ストリームをさらに処理する必要があります。

一時停止アイコンを選択して、現在の段落を停止できます。クエリを停止すると、ノートブックにエラーが表示される場合がありますが、無視して構いません。プロセスがキャンセルされたことを知らせているだけです。

Flink SQL は SQL 標準を実装しており、データベーステーブルをクエリする場合と同じように、ストリームデータに対して簡単に計算を実行する方法を提供します。データのエンリッチにおける一般的なタスクとしては、計算または変換 (華氏から摂氏への変換など)結果を保存するための新規フィールドの作成、下流のクエリの簡素化、可視化を改善するための新規データ作成等が挙げられます。次の段落を実行して、センサーの読み取り時に自動車が加速中であったかどうかを簡単に知ることができる、accelerating という名前の新しい Boolean 値を追加する方法を見てみましょう。このプロセスは、Kinesis Data Analytics for SQL と Amazon Managed Service for Apache Flink の間で違いはありません。

新しい列を検査し、新しい Boolean 値を FLOAT acceleration 列と比較したら、段落の実行を停止できます。

センサーから送信されるデータは通常、レイテンシーとパフォーマンスを向上させるためにコンパクトです。外部データでデータストリームをエンリッチすること、例えば追加の車両情報や現在の気象データなどでストリームをエンリッチすることは非常に有用です。この例では、現在 Amazon S3 に保存されている CSV からデータを取り込み、現在のエンジン速度帯を反映する color という名前の列を追加したいと仮定しましょう。

Apache Flink SQL は、AWS サービスやその他のソース用のいくつかのソースコネクタを提供しています。最初の段落で行ったように新しいテーブルを作成し、代わりに filesystem コネクタを使用することで、Flink が Amazon S3 に直接接続してソースデータを読み取ることができます。以前の Kinesis Data Analytics for SQL アプリケーションでは、新しい参照をインラインで追加することはできませんでした。代わりに、S3 参照データを定義し、アプリケーション設定に追加して、SQL JOIN で参照として使用できました。

注意: us-east-1 リージョンを使用していない場合は、csv をダウンロードして独自の S3 バケットにオブジェクトを配置できます。csv ファイルを s3a:/// として参照してください。

最後のクエリを基に、次の段落では現在のデータと新しく作成したルックアップソーステーブルに対して SQL JOIN を実行します。

エンリッチされたデータを再度ストリーミングします。実際のシナリオでは、データの扱い方に多くの選択肢があります。例えば、S3 データレイクにデータを送信したり、さらなる分析のために別の Kinesis データストリームに送信したり、可視化のために OpenSearch Service にデータを保存したりすることができます。簡単にするために、データを Amazon Data Firehose に送信し、データレイクとして機能する S3 バケットにデータをストリーミングします。

Amazon Data Firehose は、わずか数クリックで Amazon S3、OpenSearch Service、Amazon Redshift データウェアハウス、および Splunk にデータをストリーミングできます。

Amazon Data Firehose ストリームの作成

Firehose ストリームを作成するには、以下の手順を実行します:

  1. Amazon Data Firehose コンソールで、「Create delivery stream」を選択します。
  2. ストリームソースとして「Direct PUT」を選択し、ターゲットとして「Amazon S3」を選択します。
  3. 配信ストリームに “automotive-firehose” という名前を付けます。
  4. 「Destination settings」で、新しいバケットを作成するか、既存のバケットを使用します。
  5. S3 バケットの URL をメモしておきます。
  6. 「Create delivery stream」を選択します。 ストリームの作成には数秒かかります。
  7. Amazon Managed Service for Apache Flink コンソールに戻り、「Streaming applications」を選択します。
  8. 「Studio」タブで、Studio ノートブックを選択します。
  9. 「IAM role」の下にあるリンクを選択します。
  10. IAM ウィンドウで、「Add permissions」を選択し、「Attach policies」を選択します。
  11. 「AmazonKinesisFullAccess」と「CloudWatchFullAccess」を検索して選択し、「Attach policy」を選択します。
  12. Zeppelin ノートブックに戻ることができます。

Amazon Data Firehose へのデータのストリーミング

Apache Flink v1.15 以降、Firehose ストリームへのコネクタの作成は、任意の Kinesis Data Streams へのコネクタの作成と同様に機能します。2つの違いがあることに注意してください。コネクタは firehose で、stream 属性は delivery-stream になります。

コネクタ作成後は、SQL テーブルのようにデータを書き込むことができます。

Firehose ストリームを通じてデータが取得されていることを確認するには、Amazon S3 コンソールを開き、ファイルが作成されていることを確認します。ファイルを開いて新しいデータを検査します。

Kinesis Data Analytics for SQL アプリケーションでは、SQL アプリケーションダッシュボードで新しい宛先を作成していました。既存の宛先を移行するには、新しい宛先を定義する SQL クエリをノートブックに追加します。新しいテーブル名を参照しながら、INSERT を使用して新しい宛先に書き込み続けることができます。

時系列データ

Amazon Managed Service for Apache Flink Studio ノートブックで実行できるもう一つの一般的な操作は、タイムウィンドウ(一定の期間)にわたる集計です。このようなデータは、異常を特定したり、アラートを送信したり、さらに処理するために保存したりするために、別の Kinesis Data Streams に送信できます。次の段落には、タンブリングウィンドウを使用し、30 秒間隔で自動車フリートの総燃料消費量を集計する SQL クエリが含まれています。最後の例と同様に、別のデータストリームに接続してこのデータを挿入し、さらに分析することができます。

Scala と PyFlink

データストリームに対して実行する関数は、単純さとメンテナンス性の両方の観点から、SQL よりもプログラミング言語で書く方が簡単な場合があります。例として、SQL 関数がネイティブにサポートしていない複雑な計算、特定の文字列操作、データの複数のストリームへの分割、他の AWS サービス(テキスト翻訳や感情分析など)との連携などが挙げられます。Amazon Managed Service for Apache Flink は、Zeppelin ノートブック内で複数の Flink インタープリターを使用する機能を持っています。これは Kinesis Data Analytics for SQL アプリケーションにはない機能です。

データに注意深く目を通していれば、location フィールドが JSON 文字列であることに気付いたでしょう。Kinesis Data Analytics for SQL では、文字列関数を使用して SQL 関数を定義し、JSON 文字列を分解することができました。これは、メッセージデータの安定性に依存する脆弱なアプローチですが、いくつかの SQL 関数を使用して改善することができます。Kinesis Data Analytics for SQL で関数を作成する構文は以下のパターンに従います。

CREATE FUNCTION ''<function_name>'' ( ''<parameter_list>'' )
    RETURNS ''<data type>''
    LANGUAGE SQL
    [ SPECIFIC ''<specific_function_name>''  | [NOT] DETERMINISTIC ]
    CONTAINS SQL
    [ READS SQL DATA ]
    [ MODIFIES SQL DATA ]
    [ RETURNS NULL ON NULL INPUT | CALLED ON NULL INPUT ]  
  RETURN ''<SQL-defined function body>''

Amazon Managed Service for Apache Flink でも最近利用可能となった Apache Flink v1.15 では、Apache Flink SQL のテーブル SQL に JSON Path 構文に似た JSON 関数を追加しました。これにより、SQL 内で直接 JSON 文字列にクエリを実行できます。以下のコードをご覧ください。

%flink.ssql(type=update)
SELECT JSON_STRING(location, '$.latitude) AS latitude,
JSON_STRING(location, '$.longitude) AS longitude
FROM my_table

あるいは、Apache Flink v1.15 以前の方法である、ノートブック内で Scala または PyFlink を使用してフィールドを変換し、データを再ストリーミングすることもできます。両言語とも、堅牢な JSON 文字列処理を提供します。

以下の PyFlink コードは、メッセージの location フィールドから緯度と経度を抽出する 2 つのユーザー定義関数を定義します。これらの UDF は、その後 Flink SQL から呼び出すことができます。環境変数 st_env を参照しています。PyFlink は Zeppelin ノートブック内で 6 つの変数を作成します。Zeppelin は、変数 z としてコンテキストも公開しています。

メッセージに予期しないデータが含まれている場合、エラーが発生する可能性もあります。Kinesis Data Analytics for SQL アプリケーションは、アプリケーション内エラーストリームを提供します。これらのエラーは別途処理され、再ストリーミングされるか削除されます。Kinesis Data Analytics ストリーミングアプリケーションの PyFlink では、複雑なエラー処理戦略を書き、即座に回復してデータの処理を続けることができます。JSON 文字列が UDF に渡される際、それは不正な形式であったり、不完全であったり、空であったりする可能性があります。UDF 内でエラーをキャッチすることで、エラーが発生した場合でも Python は処理を継続し、値を返すことができます。

以下のサンプルコードは、2 つのフィールドに対して除算計算を実行する別の PyFlink スニペットを示しています。ゼロ除算エラーが発生した場合、デフォルト値を提供してストリームがメッセージの処理を続行できるようにします。

%flink.pyflink
@udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def DivideByZero(price):    
    try:        
        price / 0        
    except:        
        return -1
st_env.register_function("DivideByZero", DivideByZero)

次のステップ

この記事で行ったようなパイプラインの構築は、AWS の追加サービスをテストするための基礎を提供します。ストリームを削除する前に、ストリーミング分析の学習を続けることをお勧めします。以下を検討してください。

クリーンアップ

この演習で作成したサービスをクリーンアップするには、以下の手順を実行します。

  1. CloudFormation コンソールに移動し、IoT Device Simulator スタックを削除します。
  2. AWS IoT Core コンソールで、「Message Routing」と「Rules」を選択し、ルール “automotive_route_kinesis” を削除します。
  3. Kinesis Data Stream コンソールで Kinesis データストリーム “automotive-data” を削除します。
  4. IAM コンソールで IAM ロール “automotive-role” を削除します。
  5. AWS Glue コンソールで、”automotive-notebook-glue” データベースを削除します。
  6. Amazon Managed Service for Apache Flink Studio ノートブック “automotive-data-notebook” を削除します。
  7. Firehose 配信ストリーム “automotive-firehose” を削除します。

まとめ

Amazon Managed Service for Apache Flink Studio に関するこのチュートリアルをご覧いただきありがとうございます。現在、レガシーの Amazon Managed Service for Apache Flink Studio SQL アプリケーションを使用している場合は、AWS テクニカルアカウントマネージャーまたはソリューションアーキテクトに連絡し、Amazon Managed Service for Apache Flink Studio への移行について相談することをお勧めします。Amazon Kinesis Data Streams Developer Guide で学習を続け、GitHub でコードサンプルにアクセスできます。


著者について

Nicholas Tunney は、AWS のワールドワイドパブリックセクターパートナーソリューションアーキテクトです。彼はグローバル SI パートナーと協力して、政府、非営利の医療、公益事業、教育部門のクライアント向けに AWS におけるアーキテクチャの開発を行っています。