Amazon Web Services ブログ

Amazon EventBridge を用いたアウトオブバンドの AWS AppSync リアルタイムサブスクリプションの簡素化

この記事は、AWS のプリンシパルスペシャリストソリューションアーキテクト、Josh Kahn によって書かれました。

リアルタイムの最新のデータは、多くのカテゴリのアプリケーションにとって有益です。スポーツスコアから株価、配信アプリなど多くのものまで、モバイルとデスクトップの両方で魅力的なフロントエンドアプリケーションを構築するには、リアルタイムデータが不可欠です。これらのタイプのアプリケーションのモダンアーキテクチャでは、クライアントがサーバーからプッシュされた更新をサブスクライブできるようにする、長時間の双方向接続を使用します。

AWS AppSync は、GraphQL サブスクリプションを使用してリアルタイムデータを可能にすることができるマネージド型の GraphQL サービスです。サブスクリプションは、WebSocket 接続 を介して AppSync でクライアントとバックエンドの間で実装および管理されます。AppSync API によって公開されるデータソースは、Amazon NeptuneAmazon QLDB などの代替データソースに加えて、Amazon DynamoDBAmazon Elasticsearch Service などの事前統合データソースなどを含めて、サブスクリプションを適用可能です。

AWS AppSync では、ミューテーションやデータの変更に応じてサブスクリプションが呼び出されます。つまり、GraphQL 操作によってデータが変更されると、AppSync はミューテーションが正常に完了したときにそのデータの変更をサブスクライバに通知します。要するに、ミューテーションは、サブスクライブしているクライアントに送信されるデータをパブリッシュします。

AppSyncを使った作業において、顧客と自分自身のプロジェクトの両方で実装したことがある、より一般的なパターンの1つは、AppSync外部(アウトオブバンド)でのデータベース更新をサブスクライバにパブリッシュすることです。AppSync外部(アウトオブバンド)の更新は、一部のシステムが AppSync ミューテーションとは無関係にデータを変更した場合に発生します。たとえば、このブログの最近の投稿では、位置情報の変更によって DynamoDB テーブルの更新を行い、その変更が順にサブスクライブしているクライアントへとプッシュされる必要があるシステムについて説明しています。その投稿と一緒に投稿されたサンプルコードは、Amazon Location Serviceのジオフェンス イベントルールにマッチする場合に Amazon EventBridge によって呼び出される AWS Lambda 関数によって対応しています。この関数は、AppSync API を介してバックエンドのミューテーションを実行し、新しいデータをサブスクライバに送信します。

Lambda 関数を呼び出してデータの変更に応答してミューテーションを実行することは、アウトオブバンドのデータベースの更新に関する通知を解決するための一般的なアプローチです。効果的である一方で、このアプローチでは、長期間メンテナンスされなければならない関数の使用が必要です。この投稿では、サーバーレスイベントバスである EventBridge を活用して、Lambda 関数を記述してメンテナンスすることなく、同じ作業を実行するソリューションについて説明します。

EventBridge はイベント駆動型アーキテクチャで、システムまたはドメイン間でオペレーションを実行するために使用されます。これにより、スケーラブルで回復力のある方法でアプリケーションを簡単に接続できます。プロデューサーは、豊富なルールとフィルタリング機能を使用してターゲットにルーティングできるイベント(または状態の変更)をパブリッシュします。プロデューサーとコンシューマは疎結合されており、ルーティングは設定によって定義されます。たとえば、オンライン注文システムでは、他のシステムで使用される注文が作成または送信されたときにイベントをパブリッシュすることができます。コンシューマーとして、API destinations と呼ばれる EventBridge 機能を介して API または Webhook を含めることもできます。API destinationsを使用して、アウトオブバンドの更新に応答して AppSync サブスクリプションをトリガーします。

シナリオ

前述した受注処理システムを拡張して、EventBridge API destinationsを使用してアウトオブバンドサブスクリプションをトリガーするデモをみてみましょう。システムは複数のコンポーネントを使用し、それぞれが別々のチームによって構築および管理されます。私たちのチームは AppSync を使用してフロントエンドのバックエンドを構築しています。注文がいつ履行、送信されたかなどを顧客が知るために、フロントエンドアプリケーションは現在の注文のステータスに応じて更新される必要があります。
注文システム自体はブラックボックスで、別のチームが所有しています。データベースが関与していると推測することはできますが、私たちのチームはデータベースに直接アクセスすることも、データ変更のストリームもありません。代わりに、注文システムは「orders」と呼ばれるカスタム EventBridge イベントバスに注文ステータスの変更をパブリッシュします。注文ステータスの更新のメッセージペイロードは、次のようになります。

{
    "id": "b051312994104931b0980d1ad1c5340f",
    "detail-type": "Order Status Update",
    "source": "orders.system",
    "time": "2021-07-02T09:59:48Z",
    "detail": {
        "order-id": "a0cd6966711e4d7a80a770bbe656ed34",
        "status": "SHIPPED",
        "previous-status": "IN_PROGRESS",
        "status-detail": "Ship complete. No change."
    }
}
JSON

EventBridge にパブリッシュされたメッセージは、設定を介してルーティング、フィルタリング、および変換できます。これを行うには、EventBridge ルールを作成し、AppSync エンドポイントの API destinationsを追加し、AppSync に送信する前にデータを変換するルールを設定したターゲットを作成します。

AppSync GraphQL API を構築する

スキーマを使用して AppSync で GraphQL API を構築し始めます。その一部を以下に示します。GraphQL スキーマの設計の詳細については、AppSync のドキュメントを参照してください。

type StatusUpdate { 
    orderId: ID! 
    status: Status! 
    prevStatus: Status 
    updatedAt: AWSDateTime!
} 

enum Status { 
    PENDING 
    IN_PROGRESS 
    SHIPPED 
    DELIVERED 
    COMPLETE 
} 

type Mutation { 
    publishStatusUpdate(orderId: ID!, 
        status: Status!, 
        prevStatus: Status, 
        updatedAt: AWSDateTime!): StatusUpdate 
} 

type Subscription { 
    onStatusUpdate(orderId: ID!): StatusUpdate 
        @aws_subscribe(mutations: [ "publishStatusUpdate" ]) 
}
JSON

onStatusUpdate サブスクリプションは、publishStatusUpdate ミューテーションが実行されたときにトリガーされます。場合によっては、独自のデータストアで注文ステータスのビューを保持したい場合がありますが、ここでは、NONEデータソースに接続されたローカルリゾルバを使用して、ステータスの更新をサブスクライバに渡します。これにより、データソースを更新せずに通知をパブリッシュできます。この場合、リゾルバマッピングテンプレートは、AppSync 自体のローカル操作の一部としてリクエストとレスポンスを整形するためだけに使用されます。

## Request Template
{
"version": "2018-05-29",
"payload": $util.toJson($ctx.args)
}

## Response Template
$util.toJson($ctx.result)
Java

EventBridge API の宛先とルーティングルールの設定

AppSync の設定を終えたので、EventBridge の設定に移ります。ルール、ターゲットなどを設定する権限についてはこの投稿で扱う内容の対象外ですが、EventBridge のポリシーとアーキテクチャの詳細については、re:Invent 2020 session “Building event-driven applications with Amazon EventBridge”を参照してください。

設定を開始するには、EventBridge コンソール、AWS CLI、またはお気に入りの infrastructure as code ツール(CloudFormation を使用したサンプルを次に示します。AWS CDK では、このユースケースに L1 constructsを使用する必要があります)で新しい API destinationを設定します。

API destinationエンドポイントは AppSync GraphQL エンドポイントで、HTTP メソッドとして POST を使用した /graphql を指定します。レート制限はオプションですが、ダウンストリームシステムがレート制限されているシナリオで役立ちます。

[Create a new connection] を選択して、エンドポイントの承認を設定します。ここでは簡単にするために API キーを使用しますが、Amazon Cognito などの OAuth プロバイダー、または基本的な認証情報も使用できます。

AppSync 用の、API キーヘッダーの名前 (x-api-key) とプロビジョニングされたキーを入力します。API キーは AWS Secrets Managerに格納されます。EventBridge は、必要に応じて宛先への各呼び出しに署名するための追加の認証情報もサポートします。CloudFormation を使用すると、次のように同じ API のdestinationを作成できます。

ApiDestination:  ## EventBridge API destination pointing to AppSync API
    Type: AWS::Events::ApiDestination
    Properties:
      ConnectionArn: !GetAtt ApiDestinationConnection.Arn
      HttpMethod: POST
      InvocationEndpoint: !GetAtt GraphQLApi.GraphQLUrl

  ApiDestinationConnection:  ## Connection for EventBridge API Destination
    Type: AWS::Events::Connection
    Properties:
      AuthorizationType: API_KEY
      AuthParameters:
        ApiKeyAuthParameters:
          ApiKeyName: x-api-key
          ApiKeyValue: !GetAtt GraphQLApiKey.ApiKey
YAML

次に、EventBridge の AppSync に注文ステータス更新イベントをルーティングするルールを作成します。ルールは特定のイベントバスに関連付けられており、この場合、ordersイベントバスとなります。(すべての AWS アカウントにはdefaultバスがあります)。ルールは、AWS コンソール、CLI、またはinfrastructure as codeを使用して作成できます。

EventBridge ルールは、イベントパターンを使用してイベントを一致させ、ターゲットに送信します。イベントパターンは、使い慣れた JSON のような構文を使用して定義されます。イベントパターンは柔軟性があり、さまざまなユースケースに使用できます。その他の例については、ドキュメントを参照してください。今回のユースケースでは、注文システムからのすべての注文ステータス更新イベントを照合します(AWS コンソールでは、前のメッセージ例に対してイベントパターンをテストすることもできます)。

{
    "source": [ "orders.system" ], 
    "detail-type": [ "Order Status Update" ] 
}
JSON

イベントパターンを設定したら、イベントバスを選択し、API destination ターゲットを次のように設定します。AWS コンソールでは、EventBridge に対してAPI destinationの呼び出しをするためのIAM ロール作成を許可するか、既存のロールを使用することができます。

先に進む前に、[Configure input] セクションをクリックして展開します。これが魔法が起こる場所です。input transformer を使用して、前に示したイベントペイロードの値を、適切にフォーマットされた GraphQL リクエストにマッピングできます。input transformerは、入力パスと入力テンプレートの 2 つの部分で構成されています。入力パスでは、JSON パス式を使用して、入力イベントの要素を入力テンプレートで使用できる変数にマッピングします。テンプレートは文字列または JSON オブジェクトを生成でき、ターゲットに渡す情報を定義するために使用されます。HTTP POST を使用して GraphQL 操作を実行するのに慣れている場合 (多くのライブラリが内部的にこれを処理します)、テンプレートは見覚えのあるリクエストボディであるでしょう。

クエリ自体は、入力テンプレートの主要コンポーネントです。テンプレート内の単一の文字列内に含まれていますが、操作自体は見覚えがあるはずです。

mutation PublishStatusUpdate(
    $orderId: ID!,
    $status: Status!,
    $prevStatus: Status,
    $updatedAt: AWSDateTime!
) {
    publishStatusUpdate(
        orderId: $orderId,
        status: $status,
        prevStatus: $prevStatus,
        updatedAt: $updatedAt) {
        orderId
        status
        prevStatus
        updatedAt
    }
}
JSON

サブスクリプション通知に含めるべきデータはすべて、ミューテーション選択セットに含める必要があります。

最後に、ルールとターゲットを作成します。CloudFormation を使用して、同じルールとターゲットを次のように定義できます。



RouteToAppSyncRule:  ## Route order status updates to AppSync endpoint
    Type: AWS::Events::Rule
    Properties:
      EventBusName: !GetAtt OrdersEventBus.Name
      EventPattern:
        source:
          - orders.system
        detail-type:
          - Order Status Update
      Targets:
        - Id: appsync-order-status-update
          Arn: !GetAtt ApiDestination.Arn
          RoleArn: !GetAtt EventBridgeRole.Arn
          InputTransformer:
            InputPathsMap:
              orderId: "$.detail.order-id"
              status: "$.detail.status"
              prevStatus: "$.detail.previous-status"
              updatedAt: "$.time"
            InputTemplate: |
              {
                "query": "mutation PublishStatusUpdate($orderId:ID!, $status:Status!, $prevStatus:Status, $updatedAt:AWSDateTime!) { publishStatusUpdate(orderId:$orderId, status:$status, prevStatus:$prevStatus, updatedAt:$updatedAt) { orderId status prevStatus updatedAt } }",
                "operationName": "PublishStatusUpdate",
                "variables": {
                  "orderId": "<orderId>",
                  "status": "<status>",
                  "prevStatus": "<prevStatus>",
                  "updatedAt": "<updatedAt>"
                }
              }
YAML

次に、AWS CLI と AWS コンソールを使用して、注文ステータスの更新イベントによってサブスクライバに通知がトリガーされることを確認できます。AppSync コンソール を開き、前に作成した API を選択し、左側のメニューの [クエリ] をクリックします。組み込みの WebSocket クライアントでクエリブラウザを使用して、次のように入力して GraphQL サブスクリプションを作成します(注文 ID の値はあくまで一例で、EventBridge に送信されるペイロードと同じである必要があります)。

subscription OnStatusUpdate {
    onStatusUpdate(orderId: "123") {
        orderId
        prevStatus
        status
        updatedAt
    }
}
JSON

クエリブラウザの上部にある [実行] ボタン (オレンジ) をクリックします。右側に待機インジケータが表示されるはずです。ブラウザはセキュアなWebSocket接続を使用してアップデートをサブスクライブしています。Amplify libraries API クライアントなどの GraphQL クライアントを活用して、WebSocket 経由でプッシュされた注文更新をサブスクライブするために、フロントエンドアプリケーションでも同様のサブスクリプションが使用されます。

AWS CLI を使用して、Orders イベントバスにイベントをパブリッシュできます。このシナリオでは、データベースの変更によって同様のイベントがトリガーされます。まず、status-update.json という名前のファイルを作成し、以下を追加します。

[ 
    {
        "EventBusName": "orders", 
        "Source": "orders.system", 
        "DetailType": "Order Status Update", 
        "Detail": "{ \"order-id\": \"123\", \"status\": \"SHIPPED\", \"previous-status\": \"IN_PROGRESS\" }" 
    } 
]
JSON

ターミナルで、次のコマンドを入力します。

aws events put-events --entries file://status-update.json
Bash

しばらくすると、AppSync クエリブラウザーに次のような更新が表示されます。

{
	"data": {
		"onStatusUpdate": {
			"prevStatus": "IN_PROGRESS",
			"status": "SHIPPED",
			"updatedAt": "2021-07-02T21:15:13Z",
			"orderId": "123"
		}
	}
}
JSON

AWS CloudFormation コンソール を使用して、次のテンプレートを使用して、ご自身のアカウントでソリューションを試すことができます。

---
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Parameters:
  ProjectName:
    Type: String
    Default: aws-appsync-eventbridge-destination

Resources:
  OrdersEventBus: ## EventBridge Event Bus for processing orders
    Type: AWS::Events::EventBus
    Properties:
      Name: orders

  ApiDestination:  ## EventBridge API destination pointing to AppSync API
    Type: AWS::Events::ApiDestination
    Properties:
      ConnectionArn: !GetAtt ApiDestinationConnection.Arn
      HttpMethod: POST
      InvocationEndpoint: !GetAtt GraphQLApi.GraphQLUrl

  ApiDestinationConnection:  ## Connection for EventBridge API Destination
    Type: AWS::Events::Connection
    Properties:
      AuthorizationType: API_KEY
      AuthParameters:
        ApiKeyAuthParameters:
          ApiKeyName: x-api-key
          ApiKeyValue: !GetAtt GraphQLApiKey.ApiKey

  RouteToAppSyncRule:  ## Route order status updates to AppSync endpoint
    Type: AWS::Events::Rule
    Properties:
      EventBusName: !GetAtt OrdersEventBus.Name
      EventPattern:
        source:
          - orders.system
        detail-type:
          - Order Status Update
      Targets:
        - Id: appsync-order-status-update
          Arn: !GetAtt ApiDestination.Arn
          RoleArn: !GetAtt EventBridgeRole.Arn
          DeadLetterConfig:
            Arn: !GetAtt DLQ.Arn
          InputTransformer:
            InputPathsMap:
              orderId: "$.detail.order-id"
              status: "$.detail.status"
              prevStatus: "$.detail.previous-status"
              updatedAt: "$.time"
            InputTemplate: |
              {
                "query": "mutation PublishStatusUpdate($orderId:ID!, $status:Status!, $prevStatus:Status, $updatedAt:AWSDateTime!) { publishStatusUpdate(orderId:$orderId, status:$status, prevStatus:$prevStatus, updatedAt:$updatedAt) { orderId status prevStatus updatedAt } }",
                "operationName": "PublishStatusUpdate",
                "variables": {
                  "orderId": "<orderId>",
                  "status": "<status>",
                  "prevStatus": "<prevStatus>",
                  "updatedAt": "<updatedAt>"
                }
              }

  DLQ:
    Type: AWS::SQS::Queue

  DLQPolicy:
    Type: AWS::SQS::QueuePolicy
    Properties:
      Queues:
        - !Ref DLQ
      PolicyDocument:
        Statement:
          - Action: "sqs:SendMessage"
            Effect: Allow
            Resource: !GetAtt DLQ.Arn
            Principal:
              Service: events.amazonaws.com
            Condition:
              ArnEquals:
                aws:SourceArn: !GetAtt RouteToAppSyncRule.Arn

  EventBridgeRole:  ## Service role to invoke Api Destination
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: events.amazonaws.com
            Action: "sts:AssumeRole"
      Path: /
      Policies:
        - PolicyName: eventbridge-invoke-api-destination
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action: events:InvokeApiDestination
                Resource:
                  - !Sub "arn:aws:events:${AWS::Region}:${AWS::AccountId}:api-destination/${ApiDestination}/*"
  
  GraphQLApi:  ## AppSync API
    Type: AWS::AppSync::GraphQLApi
    Properties:
      AuthenticationType: API_KEY
      Name: !Ref ProjectName

  GraphQLApiKey:  ## API Key for AppSync
    Type: AWS::AppSync::ApiKey
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId

  GraphQLSchema:  ## Schema for sample backend for frontend
    Type: AWS::AppSync::GraphQLSchema
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Definition: |
        type StatusUpdate {
          orderId: ID!
          status: Status!
          prevStatus: Status
          updatedAt: AWSDateTime!
        }
        enum Status {
          PENDING
          IN_PROGRESS
          SHIPPED
          DELIVERED
          COMPLETE
        }
        type Query {
          getStatus(orderId: ID!): Boolean
        }
        type Mutation {
          publishStatusUpdate(orderId: ID!, status: Status!, prevStatus: Status, updatedAt: AWSDateTime!): StatusUpdate
        }
        type Subscription {
          onStatusUpdate(orderId: ID!): StatusUpdate
            @aws_subscribe(mutations: [ "publishStatusUpdate" ])
        }
        schema {
          query: Query
          mutation: Mutation
          subscription: Subscription
        }

  NoneDataSource:  ## Local data source for AppSync
    Type: AWS::AppSync::DataSource
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: NoneDataSource
      Type: NONE

  PublishStatusUpdateMutationResolver:  ## Resolver for putStatusUpdate mutation
    Type: AWS::AppSync::Resolver
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      TypeName: Mutation
      FieldName: publishStatusUpdate
      DataSourceName: !GetAtt NoneDataSource.Name
      RequestMappingTemplate: |
        {
          "version": "2018-05-29",
          "payload": $util.toJson($ctx.args)
        }
      ResponseMappingTemplate: |
        $util.toJson($ctx.result)


Outputs:
  AppSyncEndpoint:
    Value: !GetAtt GraphQLApi.GraphQLUrl
    Description: Endpoint for GraphQL API

  ApiKey:
    Value: !GetAtt GraphQLApiKey.ApiKey
YAML

まとめ

この投稿では、Amazon EventBridge を使用して AppSync サブスクライバへのアウトオブバンドのデータベースの更新の配信を簡素化する方法について説明しました。EventBridge ルールとinput transformersにより、API destinationを使用して AppSync に送信される GraphQL ミューテーション ペイロードを作成しました。このペイロードは、Lambda 関数やメンテナンスが必要な特定のコードを記述する必要はありません。更新や変更は、選択したinfrastructure as codeツールを使用して簡単に実装できます。AppSync と EventBridge を使用すると、需要に応じて自動的に拡張する柔軟なサーバーレステクノロジーを活用した組み込みの WebSockets 接続管理により、クラウドバックエンドから数千の接続されたフロントエンド Web およびモバイルクライアントにデータの変更やイベントをシームレスにプッシュできます。
詳細については、AWS AppSync 開発者ガイドおよび Amazon EventBridge 開発者ガイドを参照してください。

翻訳はソリューションアーキテクト福本が担当しました。原文はこちらです。