Front-End Web & Mobile

Simplify out of band AWS AppSync real-time subscriptions with Amazon EventBridge

This article was written by Josh Kahn, Principal Specialist Solutions Architect, AWS

September 14, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.

Real-time, up-to-date data is valuable for many categories of applications. From sports scores to stock quotes to delivery apps and many others, real-time data is essential in building engaging frontend applications on both mobile and desktop. Modern architectures for these types of applications make use of long-lived, two-way connections that allow the client to subscribe to updates pushed from the server.

AWS AppSync is a managed GraphQL service that can enable real-time data through the use of GraphQL subscriptions. Subscriptions are implemented and managed between the client and backend in AppSync via WebSocket connections. Any data source exposed by an AppSync API can support subscriptions, including pre-integrated data sources such as Amazon DynamoDB and Amazon OpenSearch Service (successor to Amazon Elasticsearch Service) in addition to alternative data sources such as Amazon Neptune or Amazon QLDB.

With AWS AppSync, subscriptions are invoked in response to a mutation or change in data. In other words, when data is mutated or modified via a GraphQL operation, AppSync will notify subscribers of that data change on successful completion of the mutation. In short, a mutation publishes data which is sent to clients subscribed to it.

In working with AppSync, one of the more common patterns that we have implemented with both customers and in my own projects is publishing “out-of-band” database updates to subscribers. An “out-of-band” update occurs when some system alters data independent of an AppSync mutation. For example, in a recent post on this blog, my colleagues described a system in which changes in location prompted updates to a DynamoDB table that in turn needed to be pushed to subscribed clients. Sample code associated with that post made use of an AWS Lambda function that is invoked by Amazon EventBridge when geofencing conditions are met. The function then performs a backend mutation via the AppSync API, which in turns sends the new data to subscribers.

Invoking a Lambda function to perform a mutation in response to data change is a common approach to solving for notifications on out-of-band database updates. While effective, that approach still requires the use of a function that must be maintained over time. In this post, we describe a solution that leverages EventBridge, a serverless event bus, to perform the same work without the need to write and maintain a Lambda function to do so.

EventBridge is used in event-driven architectures to choreograph operations across systems or domains. It makes it easier to connect applications in a scalable, resilient manner. Producers publish events (or state changes) that can be routed to targets via a rich set of rules and filtering capabilities. Producers and consumers are loosely coupled and routing is defined via configuration. For example, an online ordering system may publish events when an order is created or shipped that can be consumed by other systems. Consumers can also include APIs or webhooks via an EventBridge feature called API destinations. We’ll use API destinations to trigger an AppSync subscription in response to an out-of-band update.

Scenario

Let’s expand the order fulfillment system mentioned earlier to demonstrate using EventBridge API destinations to trigger out-of-band subscriptions. The system makes use of several components, each built and managed by a separate team. Our team is building the backend of the frontend using AppSync. Our frontend application needs to be updated with current order status so that customers know when orders have been fulfilled, shipped, etc.

The ordering system itself is a black box, owned by another team. We can reason that there is a database involved, but our team does not have direct access to the database nor a stream of data changes. Instead, the ordering system publishes order status changes to a custom EventBridge event bus called “orders”. The message payload for an order status update looks like the following:

{
	“id”: “b051312994104931b0980d1ad1c5340f”,
	“detail-type”: “Order Status Update”,
	“source”: “orders.system”,
	“time”: “2021-07-02T09:59:48Z”,
	“detail”: {
		“order-id”: “a0cd6966711e4d7a80a770bbe656ed34”,
		“status”: “SHIPPED”,
		“previous-status”: “IN_PROGRESS”,
		“status-detail”: “Ship complete. No change.”
	}
}

Messages published to EventBridge can be routed, filtered, and transformed via configuration. To do this, we create an EventBridge rule, add an API destination for our AppSync endpoint, and create a target for the rule that transforms the data before sending to AppSync.

Build the AppSync GraphQL API

We start to build a GraphQL API in AppSync with the schema, part of which is shown below. More details on designing a GraphQL schema can be found in AppSync documentation.

type StatusUpdate {
	orderId: ID!
	status: Status!
	prevStatus: Status
	updatedAt: AWSDateTime!
}

enum Status {
	PENDING
	IN_PROGRESS
	SHIPPED
	DELIVERED
	COMPLETE
}

type Mutation {
	publishStatusUpdate(orderId: ID!,
        status: Status!,
        prevStatus: Status,
        updatedAt: AWSDateTime!): StatusUpdate
}

type Subscription {
	onStatusUpdate(orderId: ID!): StatusUpdate
		@aws_subscribe(mutations: [ “publishStatusUpdate” ])
}

The onStatusUpdate subscription will be triggered when the publishStatusUpdate mutation is performed. While we may in some cases want to maintain a view of order status in our own data store, here we will simply pass along status updates to subscribers by using a local resolver attached to a NONE data source, which allows us to publish notifications without updating the data source. In this case, the resolver mapping templates will be used solely to shape the request and response as part of a local operation in AppSync itself.

## Request Template
{
"version": "2018-05-29",
"payload": $util.toJson($ctx.args)
}

## Response Template
$util.toJson($ctx.result)

Configure EventBridge API destinations and routing rules

With AppSync configured, we can move to configuring EventBridge. Permissions to setup rules, targets, etc. are beyond the scope of this post though you can learn more about EventBridge policies and architectures in the re:Invent 2020 session “Building event-driven applications with Amazon EventBridge”.

To start, configure a new API destination in the EventBridge console, AWS CLI, or your favorite infrastructure as code tool (a sample using CloudFormation follows, note that AWS CDK requires the use of L1 constructs for this use case):

The API destination endpoint is the AppSync GraphQL endpoint, including /graphql with POST as the HTTP method. A rate limit is optional but can be useful in scenarios when the downstream system is rate limited.

Select Create a new connection to configure authorization for the endpoint. Here we use an API key for simplicity, but we can also use an OAuth provider, such as Amazon Cognito, or basic credentials as well.

For AppSync, enter the name of the API key header (x-api-key) and the provisioned key. The API key is stored in AWS Secrets Manager. EventBridge also supports additional credentials to sign each invocation to the destination if needed. Using CloudFormation, we can create the same API destination as follows:

ApiDestination:  ## EventBridge API destination pointing to AppSync API
    Type: AWS::Events::ApiDestination
    Properties:
      ConnectionArn: !GetAtt ApiDestinationConnection.Arn
      HttpMethod: POST
      InvocationEndpoint: !GetAtt GraphQLApi.GraphQLUrl

  ApiDestinationConnection:  ## Connection for EventBridge API Destination
    Type: AWS::Events::Connection
    Properties:
      AuthorizationType: API_KEY
      AuthParameters:
        ApiKeyAuthParameters:
          ApiKeyName: x-api-key
          ApiKeyValue: !GetAtt GraphQLApiKey.ApiKey

Next, create a rule to route order status update events to AppSync in EventBridge. Rules are tied to a particular event bus, in this case the orders event bus (all AWS accounts also have a default bus as well). Rules can be created via the AWS Console, CLI, or infrastructure as code.

EventBridge rules use event patterns to match events and send them to targets. Event patterns are defined using a familiar, JSON-like syntax. Event patterns are flexible and can be used for a wide variety of use cases. Check out the documentation for more examples. For our use case, we want to match all order status update events from the ordering system (in the AWS Console, you can also test the event pattern against the earlier example message):

{
	“source”: [ “orders.system” ],
	“detail-type”: [ “Order Status Update” ]
}

After configuring the event pattern, select the event bus and configure the API destination target as follows. In the AWS Console, you can allow EventBridge to create an IAM role to invoke the API destination or use an existing role.

Before moving forward, click on the Configure input section to expand. This is where the magic happens. Using an input transformer, we can map values from the event payload shown earlier to a properly formatted GraphQL request. An input transformer is composed of two parts: input path and input template. In the input path, we use JSON path expressions to map elements of the incoming event to variables that can be used in the input template. The template can produce a string or JSON object and is used to define the information to be passed to the target. If you are familiar with using an HTTP POST to perform GraphQL operations (many libraries handle this for you under the covers), the template should represent a familiar looking request body.

The query itself is the key component of the input template. Though contained within a single string in the template, the operation itself should look familiar:

mutation PublishStatusUpdate(
	$orderId: ID!,
	$status: Status!,
	$prevStatus: Status,
	$updatedAt: AWSDateTime!
) {
	publishStatusUpdate(
		orderId: $orderId,
		status: $status,
		prevStatus: $prevStatus,
		updatedAt: $updatedAt) {
		orderId
		status
		prevStatus
		updatedAt
	}
}

Note that any data that should be included in subscription notifications should be included in the mutation selection set.

Finally, create the rule and target. Using CloudFormation, we can define the same rule and target as follows:

RouteToAppSyncRule:  ## Route order status updates to AppSync endpoint
    Type: AWS::Events::Rule
    Properties:
      EventBusName: !GetAtt OrdersEventBus.Name
      EventPattern:
        source:
          - orders.system
        detail-type:
          - Order Status Update
      Targets:
        - Id: appsync-order-status-update
          Arn: !GetAtt ApiDestination.Arn
          RoleArn: !GetAtt EventBridgeRole.Arn
          InputTransformer:
            InputPathsMap:
              orderId: "$.detail.order-id"
              status: "$.detail.status"
              prevStatus: "$.detail.previous-status"
              updatedAt: "$.time"
            InputTemplate: |
              {
                "query": "mutation PublishStatusUpdate($orderId:ID!, $status:Status!, $prevStatus:Status, $updatedAt:AWSDateTime!) { publishStatusUpdate(orderId:$orderId, status:$status, prevStatus:$prevStatus, updatedAt:$updatedAt) { orderId status prevStatus updatedAt } }",
                "operationName": "PublishStatusUpdate",
                "variables": {
                  "orderId": "<orderId>",
                  "status": "<status>",
                  "prevStatus": "<prevStatus>",
                  "updatedAt": "<updatedAt>"
                }
              }

Next we can verify that order status update events triggers a notification to subscribers using the AWS CLI and AWS Console. Open the AppSync Console, select the API created earlier and click on Queries in the left-hand menu. Using the query browser with its built-in WebSocket client, we create a GraphQL subscription by entering the following (note the order ID value does not matter, it just needs to be the same as the payload sent to EventBridge):

subscription OnStatusUpdate {
	onStatusUpdate(orderId: "123") {
		orderId
		prevStatus
		status
		updatedAt
	}
}

Click the Run button (orange) at the top of the query browser. You should see a waiting indicator to the right, the browser is now subscribed to updates using a secure WebSocket connection. A similar subscription would be used in the front-end application to subscribe to order updates pushed via WebSockets, leveraging a GraphQL client such as the Amplify libraries API client.

We can use the AWS CLI to publish an event to the orders event bus. In our scenario a database change would be triggering a similar event. First, create a file named status-update.json and add the following:

[
  {
    "EventBusName": "orders",
    "Source": "orders.system",
    "DetailType": "Order Status Update",
    "Detail": "{ \"order-id\": \"123\", \"status\": \"SHIPPED\", \"previous-status\": \"IN_PROGRESS\" }"
  }
]

In your terminal, enter the following command:

aws events put-events --entries file://status-update.json

After a few moments, you should see an update in the AppSync query browser such as:

{
	"data": {
		"onStatusUpdate": {
			"prevStatus": "IN_PROGRESS",
			"status": "SHIPPED",
			"updatedAt": "2021-07-02T21:15:13Z",
			"orderId": "123"
		}
	}
}

You can try the solution in your own account with the following template using the AWS CloudFormation console:

---
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Parameters:
  ProjectName:
    Type: String
    Default: aws-appsync-eventbridge-destination

Resources:
  OrdersEventBus: ## EventBridge Event Bus for processing orders
    Type: AWS::Events::EventBus
    Properties:
      Name: orders

  ApiDestination:  ## EventBridge API destination pointing to AppSync API
    Type: AWS::Events::ApiDestination
    Properties:
      ConnectionArn: !GetAtt ApiDestinationConnection.Arn
      HttpMethod: POST
      InvocationEndpoint: !GetAtt GraphQLApi.GraphQLUrl

  ApiDestinationConnection:  ## Connection for EventBridge API Destination
    Type: AWS::Events::Connection
    Properties:
      AuthorizationType: API_KEY
      AuthParameters:
        ApiKeyAuthParameters:
          ApiKeyName: x-api-key
          ApiKeyValue: !GetAtt GraphQLApiKey.ApiKey

  RouteToAppSyncRule:  ## Route order status updates to AppSync endpoint
    Type: AWS::Events::Rule
    Properties:
      EventBusName: !GetAtt OrdersEventBus.Name
      EventPattern:
        source:
          - orders.system
        detail-type:
          - Order Status Update
      Targets:
        - Id: appsync-order-status-update
          Arn: !GetAtt ApiDestination.Arn
          RoleArn: !GetAtt EventBridgeRole.Arn
          DeadLetterConfig:
            Arn: !GetAtt DLQ.Arn
          InputTransformer:
            InputPathsMap:
              orderId: "$.detail.order-id"
              status: "$.detail.status"
              prevStatus: "$.detail.previous-status"
              updatedAt: "$.time"
            InputTemplate: |
              {
                "query": "mutation PublishStatusUpdate($orderId:ID!, $status:Status!, $prevStatus:Status, $updatedAt:AWSDateTime!) { publishStatusUpdate(orderId:$orderId, status:$status, prevStatus:$prevStatus, updatedAt:$updatedAt) { orderId status prevStatus updatedAt } }",
                "operationName": "PublishStatusUpdate",
                "variables": {
                  "orderId": "<orderId>",
                  "status": "<status>",
                  "prevStatus": "<prevStatus>",
                  "updatedAt": "<updatedAt>"
                }
              }

  DLQ:
    Type: AWS::SQS::Queue

  DLQPolicy:
    Type: AWS::SQS::QueuePolicy
    Properties:
      Queues:
        - !Ref DLQ
      PolicyDocument:
        Statement:
          - Action: "sqs:SendMessage"
            Effect: Allow
            Resource: !GetAtt DLQ.Arn
            Principal:
              Service: events.amazonaws.com
            Condition:
              ArnEquals:
                aws:SourceArn: !GetAtt RouteToAppSyncRule.Arn

  EventBridgeRole:  ## Service role to invoke Api Destination
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: events.amazonaws.com
            Action: "sts:AssumeRole"
      Path: /
      Policies:
        - PolicyName: eventbridge-invoke-api-destination
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action: events:InvokeApiDestination
                Resource:
                  - !Sub "arn:aws:events:${AWS::Region}:${AWS::AccountId}:api-destination/${ApiDestination}/*"
  
  GraphQLApi:  ## AppSync API
    Type: AWS::AppSync::GraphQLApi
    Properties:
      AuthenticationType: API_KEY
      Name: !Ref ProjectName

  GraphQLApiKey:  ## API Key for AppSync
    Type: AWS::AppSync::ApiKey
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId

  GraphQLSchema:  ## Schema for sample backend for frontend
    Type: AWS::AppSync::GraphQLSchema
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Definition: |
        type StatusUpdate {
          orderId: ID!
          status: Status!
          prevStatus: Status
          updatedAt: AWSDateTime!
        }
        enum Status {
          PENDING
          IN_PROGRESS
          SHIPPED
          DELIVERED
          COMPLETE
        }
        type Query {
          getStatus(orderId: ID!): Boolean
        }
        type Mutation {
          publishStatusUpdate(orderId: ID!, status: Status!, prevStatus: Status, updatedAt: AWSDateTime!): StatusUpdate
        }
        type Subscription {
          onStatusUpdate(orderId: ID!): StatusUpdate
            @aws_subscribe(mutations: [ "publishStatusUpdate" ])
        }
        schema {
          query: Query
          mutation: Mutation
          subscription: Subscription
        }

  NoneDataSource:  ## Local data source for AppSync
    Type: AWS::AppSync::DataSource
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: NoneDataSource
      Type: NONE

  PublishStatusUpdateMutationResolver:  ## Resolver for putStatusUpdate mutation
    Type: AWS::AppSync::Resolver
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      TypeName: Mutation
      FieldName: publishStatusUpdate
      DataSourceName: !GetAtt NoneDataSource.Name
      RequestMappingTemplate: |
        {
          "version": "2018-05-29",
          "payload": $util.toJson($ctx.args)
        }
      ResponseMappingTemplate: |
        $util.toJson($ctx.result)


Outputs:
  AppSyncEndpoint:
    Value: !GetAtt GraphQLApi.GraphQLUrl
    Description: Endpoint for GraphQL API

  ApiKey:
    Value: !GetAtt GraphQLApiKey.ApiKey

Conclusion

In this post, we have described how to use Amazon EventBridge to simplify publishing out-of-band database updates to AppSync subscribers. By using EventBridge rules and input transformers, we created a GraphQL mutation payload that was sent to AppSync using an API destination without the need to write a Lambda function or specific code that would need to be maintained. Updates or changes could easily be implemented with your infrastructure as code tool of choice. With AppSync and EventBridge, you can seamlessly push data changes and events from your cloud backend all the way to thousands of connected front-end web and mobile clients with built-in WebSockets connection management leveraging flexible serverless technologies that scale automatically according to demand.

For more details, refer to the AWS AppSync Developer Guide and Amazon EventBridge Developer Guide.