Amazon Web Services ブログ

Amazon Athenaの新しいフェデレーテッド・クエリによる複数データソースの検索

現在、企業は構築するアプリケーションに最適にフィットした複数のデータストアを利用しています。例えば、ソーシャルネットワークアプリケーションを構築するような場合は、リレーショナルデータベースよりは Amazon Neptune のようなグラフデータベースの方が多くのケースで最適です。同様に頻繁な反復処理(fast iterations)のために柔軟なスキーマが求められるワークロードには、 Amazon DocumentDB (with MongoDB compatibility) がより適しているでしょう。Amazon.com のCTO/VPである Werner Vogels が述べているように、「複数の明確に異なる要件を満たすことが出来る単一のデータベースは存在しない (Seldom can one database fit the needs of multiple distinct use cases.)」ということです。よって開発者は多数の目的別に構築された(purpose-built)データベースを利用しながら、非常に多くの分散されたアプリケーションを構築しています。そういう意味では、今日の開発者はアプリケーションが最適になるように、ワークロードに最適なツールを選択できるように、複雑なアプリケーションをより小さい塊(ピース)に分割しています。しかし、データストアとアプリケーションの数が増えてくると、複数のデータソースをまたぐ分析処理というのはより難しくなってきています。

本日、Amazon Athena でサポートするフェデレーテッド・クエリについて発表することを嬉しく思います。

Amazon Athena のフェデレーテッド・クエリ

フェデレーテッド・クエリは、データアナリスト、エンジニア、データサイエンティストに対して、リレーショナルデータベース、非リレーショナルデータベース、オブジェクトデータ、カスタムデータソースに保存されているデータをまたがるSQLを実行することが可能な Amazon Athenaの新機能です。Athena のフェデレーテッド・クエリによって、お客様は単一のSQLクエリの発行でオンプレミスやクラウド上で稼働する複数のデータソースに対してデータ分析することが可能です。Athena は、AWS Lambda で実行するデータソースコネクタ(Data Source Connectors)を利用してフェデレーテッド・クエリを実行します。AWSは、Amazon DynamoDB, Apache HBase, Amazon DocumentDB, Amazon Redshift, Amazon CloudWatch Logs, Amazon CloudWatch Metrics, JDBC 対応したリレーショナルデータ(MySQL, PostgreSQLなど)にアクセスする Apache 2.0 ライセンスでオープンソース化された Athena データソースコネクタを提供します。Athenaは、このデータソースコネクタを利用し、複数データソースにまたがる SQL のフェデレーテッド・クエリを実行することが可能です。また Query Fedration SDKを利用して、お客様独自のデータソースに接続するコネクタを開発することで、該当のデータソースに対して Athena から SQLクエリを実行することが可能です。コネクタは Lambda上で動作するため、お客様はインフラストラクチャやワークロードのピーク時のリソース拡張の管理をすることなく、引き続き Athena のサーバーレスアーキテクチャの利点を享受することが可能です。

複数のアプリケーションにわたるデータの分析処理は複雑で時間のかかる作業です。アプリケーション開発者は通常アプリケーションの主要な機能をもとににデータソースの種類を選択します。その結果、多くの場合、分析に必要なデータはリレーショナル型、キーバリュー型、ドキュメント型、インメモリ型、サーチ(全文検索)型、グラフ型、時系列型、台帳型のデータベースなどに散在します。イベントやアプリケーションのログは、通常 Amazon S3 などのオブジェクトストアに保存されます。これらの複数データソースをまたぐデータ分析をするには、分析者は新しいプログラム言語や新しいデータアクセス概念を学習したり、複数データソースの検索を容易にするために、データウェアハウスを構築し、関連データのETL処理(取出し、変換、ロード)を行う複雑なパイプライン処理を構築したりする必要があります。データパイプライン処理によって基本的にデータのリアルタイム性は低くなり、複数システム間のデータの正確性や一貫性を検証するカスタムプロセスが必要になります。さらに連携元のアプリケーションが変更されるとデータパイプライン処理も変更する必要があったり、データの修正が必要になったりします。Athena のフェデレーテッド・クエリは、データがどこにあってもその場所で検索することにより、この複雑性を取り除きます。分析者はお馴染みのSQLを使って、複数のデータソース間のデータを結合(JOIN)して分析することが出来たり、その後の分析用に複数データを使った加工処理をスケジュールされたSQLで処理して、その結果を S3 上に保存することが可能になります。

Athena Query Federation SDKによって、AWSが提供するコネクタ以外に対しても、フェデレーテッド・クエリの利点を拡大享受することが出来ます。100行にも満たないプログラムコードで、お客様は独自のデータソース用のコネクタを開発し、共有することが可能です。コネクタは、Lambda関数としてデプロイされ、Athenaで利用するためにデータソースとして登録されます。コネクタが登録されると、Athenaから該当データソースの有効なデータベース、テーブル、列を取り出すことが可能です。1つのAthenaのクエリから複数のデータソースにまたがる検索が可能です。データソースに対してクエリが発行されると Athenaは該当するコネクタを起動して、読込対象の表の指定された部分を取出し、並列度を管理し、フィルタ条件のプッシュダウンを行います。クエリを発行したユーザーに合わせて、コネクタは個別のデータエレメントへのアクセスを付与、または制限を行います。コネクタは、クエリでリクエストされたデータを返すフォーマットとして、C, C++, Java, Python, Rustなどの言語で実装可能な Apache Arrow を使います。コネクタは Lambdaで実行され、クラウドやオンプレミス上の様々なデータソースをLambdaからアクセス可能にするために利用されます。

データソースコネクタ

Athena によってデータソースを新しく登録することで、そのデータに対してクエリを実行することが可能です。データソースを登録すると、そのデータソース用のデータコネクタが関連付けられます。皆さんは、AWS提供のオープンソースのコネクタやお客様が開発したコネクタを利用したり、既存のコネクタを配布したり、コミュニティやマーケットプレイスで提供されるコネクタを利用することが可能です。データソースのタイプによっては、コネクタはスキャン、リード、フィルタ処理に必要な表の固有のパーツを特定したり、並列度を管理するためのメタデータ情報を管理します。Athenaのデータソースコネクタはアカウント内の Lambda 関数として実行されます。

各コネクタは、それぞれデータソース固有の2つの Lambda 関数で構成されています。1つはメタデータ用、もう1つはレコード読込用です。コネクタのプログラムコードはオープンソースで、Lambda 関数としてデプロイされます。また Lambda 関数を AWS Serverless Application Repository へデプロイ可能で、それらは Athena で利用可能です。Lambda 関数をデプロイすると固有のAmazon Resource Name(ARN)が提供されるので、そのARNを Athena によって登録する必要があります。ARNを登録すると、 Athena はクエリ実行時にどの Lambda 関数を呼び出すか判断可能になります。この2つの関数のARNが登録されると登録されたデータソースに対して検索することが可能になります。

データソースに対してフェデレーテッド・クエリが実行されると、Athenaはメタデータとデータを並列で読み込む Lambda関数の起動を複数展開します。アカウント内のLambdaの同時起動制限によって起動する並列数は変わります。例えば、もし Lambdaの同時起動数制限が300の場合、Athena はレコード読込で300並列で実行可能です。並列で実行される2つのクエリの場合は、Athenaは2倍の同時実行数を起動出来ます。

図1は、Athena フェデレーテッド・クエリがどのように動作するか表しています。Athena にフェデレーテッド・クエリを発行した場合、Athena はデータソースに接続するために適切な Lambda ベースのコネクタを起動します。Athenaはメタデータとデータを並列で読み込む Lambda関数の起動を複数展開します。

図 1: Athena フェデレーテッド・クエリのアーキテクチャ

このブログ記事は、データ分析者がどのように1つのSQLでより迅速に複数のデータベース間のデータを検索することができるかを説明しています。便宜上、ここでは以下の複数の目的別専門データソースを利用するアーキテクチャの空想上の Eコマース企業を考えます:

  1. 支払い取引データをAWS上で稼働する Apache HBaseに保存
  2. 顧客注文でまだ配送されていないアクティブ発注を注文エンジンが高速に受注データを取り出すために Redis に保存
  3. Eメールアドレスや配送情報などの顧客データを DocumentDB に保存
  4. 製品カタログを Aurora に保存
  5. 発注処理のログイベントを Amazon CloudWatch Logsに保管
  6. 注文履歴と分析データを Redshift に保存
  7. 配送追跡データを DynamoDB に保存
  8. IoT 有効のタブレット利用したドライバー陣のラストマイル配送状況

この空想のEコマース企業の顧客は課題を持っています。彼らの注文が奇妙な状態で滞っていると苦情を言っています。いくつかの注文はすでに配送されているにもかかわらず「保留中」として表示されているのに、別の注文ではまだ出荷されていないにもかかわらず「配送済」と表示されます。

この会社の経営層は、カスタマーサービスのアナリストに全注文の実際の状態を特定することを指示しました。

Athena フェデレーテッド・クエリの利用

Athenaのフェデレーテッド・クエリを利用して、アナリストは異なるデータソースのデータを迅速に分析することが出来ます。加えて、これらのデータソースからデータを取出し、Amazon S3にそのデータを保存するパイプラインをセットアップして、Athenaで検索することが可能です。

図2は Athena が起動する1つのクエリでオンプレミスとクラウドにあるデータソースに接続する Lambdaベースのコネクタを図示しています。この図では、Athena が S3 からのデータをスキャンし、さらに EMR の HBase, DynamoDB, MySQL, Redshift, ElastiCashe (Redis), Amazon Aurora のデータを読込む Lambda ベースのコネクタを実行しています。

図2: フェデレーテッド・クエリの例

アナリストは、このリポジトリから 以下のコネクタを登録、利用して以下のクエリを実行することが可能です。

  1. Redis からの全手のアクティブな注文の取出し(athena-redis参照)
  2. 正規表現マッチングと取出しによる CloudWatch Logs 内の「WARN」,「ERROR」イベントをもつ全注文に対して結合(athena-cloudwatch 参照)
  3. ホスト名を取得するための EC2インベントリと「WARN」,「ERROR」で記録された受注処理のステータス情報の結合(athena-cmdb 参照)
  4. 影響のある注文の顧客連絡先の詳細データを取得するために DocumentDB データを結合 (athena-docdb 参照)
  5. 配送状況と詳細追跡情報を取得するための DynamoDBデータとの結合 (athena-dynamodb 参照)
  6. 影響のある注文データの支払い状況を取得するためのHBaseデータに対する結合 (athena-hbase 参照)

データソースコネクタ登録

アナリストは Athena クエリエディタの「データソース接続フロー」からデータソースコネクタを登録することが出来ます。

  1. クエリエディタのデータソースを選択します
  2. 以下のスクリーンショットで表示されているように接続するデータソースを選択します。また Query Federation SDK を利用した固有のデータソースコネクタの構築を選択することも可能です。
  3. 登録を完了するための画面で残りのステップを実行します。これらは、データソースに対するコネクタ関数の設定(以下のスクリーンショットで表示)やクエリで使用されるカタログ名としての名称を選択、説明文の入力が含まれます。

分析クエリ例

データソースコネクタの登録が完了すると、カスタマーサービスのアナリストは、SQLクエリをで影響のあった注文を特定するために、以下のようなサンプルのクエリを記述することが可能です。このようにすることで企業のビジネススピードを向上することが可能です。

以下のビデオでは、フェデレーテッド・クエリ例のデモをご覧いただけます。

WITH logs 
     AS (SELECT log_stream, 
                message                                          AS 
                order_processor_log, 
                Regexp_extract(message, '.*orderId=(\d+) .*', 1) AS orderId, 
                Regexp_extract(message, '(.*):.*', 1)            AS log_level 
         FROM 
     "lambda:cloudwatch"."/var/ecommerce-engine/order-processor".all_log_streams 
         WHERE  Regexp_extract(message, '(.*):.*', 1) != 'WARN'), 
     active_orders 
     AS (SELECT * 
         FROM   redis.redis_db.redis_customer_orders), 
     order_processors 
     AS (SELECT instanceid, 
                publicipaddress, 
                state.NAME 
         FROM   awscmdb.ec2.ec2_instances), 
     customer 
     AS (SELECT id, 
                email 
         FROM   docdb.customers.customer_info), 
     addresses 
     AS (SELECT id, 
                is_residential, 
                address.street AS street 
         FROM   docdb.customers.customer_addresses),
     shipments 
     AS ( SELECT order_id, 
                 shipment_id, 
                 from_unixtime(cast(shipped_date as double)) as shipment_time,
                 carrier
        FROM lambda_ddb.default.order_shipments),
     payments
     AS ( SELECT "summary:order_id", 
                 "summary:status", 
                 "summary:cc_id", 
                 "details:network" 
        FROM "hbase".hbase_payments.transactions)
         
SELECT _key_            AS redis_order_id, 
       customer_id, 
       customer.email   AS cust_email, 
       "summary:cc_id"  AS credit_card,
       "details:network" AS CC_type,
       "summary:status" AS payment_status,
       status           AS redis_status, 
       addresses.street AS street_address, 
       shipments.shipment_time as shipment_time,
       shipments.carrier as shipment_carrier,
       publicipaddress  AS ec2_order_processor, 
       NAME             AS ec2_state, 
       log_level, 
       order_processor_log 
FROM   active_orders 
       LEFT JOIN logs 
              ON logs.orderid = active_orders._key_ 
       LEFT JOIN order_processors 
              ON logs.log_stream = order_processors.instanceid 
       LEFT JOIN customer 
              ON customer.id = customer_id 
       LEFT JOIN addresses 
              ON addresses.id = address_id 
       LEFT JOIN shipments
              ON shipments.order_id = active_orders._key_
       LEFT JOIN payments
              ON payments."summary:order_id" = active_orders._key_
 さらに Athenaではクエリ内で指定した S3バケットに全クエリの結果を保存します。もしあなたのユースケースで S3へデータを取り込むことが必須の場合には、データソースの登録、S3へのデータ取込み、パーティションの作成と 
       Glue catalog 内のメタデータの作成のために 
       CTAS statement or 
       INSERT INTO 文を利用、データフォーマットを 
        サポートフォーマット へ変換するなどの Athena のフェデレーテッド・クエリの機能を利用することが可能です。 
       

まとめ

このブログでは、新しい Athenaのフェデレーテッド・クエリの機能を紹介しました。例を使って、アカウント内の AWS Lambda からアクセス可能なデータソースにAthenaから接続するための Athena のデータソースコネクタの登録、利用方法を見てきました。最終的には、フェデレーテッド・クエリを使って、より迅速な分析が可能になるだけでなく、データレイクであるS3へETL(データの取出し、変換、ロード)処理が可能になることを確認しました。

Athena の フェデレーテッド・クエリは、us-east-1 (バージニア北部)でプレビューとして有効になっています。以下の Athena FAQ内のステップによって今からプレビューを開始してください。
さらに詳細な機能については、こちらのドキュメント Connect to a Data Source documentation をご覧ください。
既存のコネクタを利用して開始する場合は、以下の Connect to a Data Source guide をご参照ください。
Athena Query Federation SDK を利用した自己所有データソースコネクタの開発方法は、GitHub にあるAthena example をご覧ください。

  Janak Agarwal は Athena のプロダクトマネージャーです。

原文: Query any data source with Amazon Athena’s new federated query
翻訳: SA 丹羽