Front-End Web & Mobile
Practical use cases for AWS AppSync Pipeline Resolvers – Part 3: Heterogeneous Data Sources
This article was written by Salman Moghal, Application Architect, AWS, and Abdul Kitana, Security Architect, AWS
Overview
AWS AppSync is a fully managed service that allows to deploy Serverless GraphQL backends in the AWS cloud. It provides features that developers can use to create modern data driven applications allowing to easily query multiple databases, microservices, and APIs from a single GraphQL endpoint. Customers can leverage AppSync real-time capabilities, offline data synchronization, built-in server-side caching, fine-grained access control, security, support for business logic in the API layer using GraphQL resolvers, and more. In this article, we focus on how to implement and orchestrate backend logic directly in the AppSync GraphQL API layer using pipeline resolvers.
Resolvers are built-in functions in GraphQL that “resolve” types or fields defined in the GraphQL schema with the data in the data sources. Resolvers in AppSync can be of two types: unit resolvers or pipeline resolvers. Pipeline resolvers offer the ability to serially execute operations against multiple data sources in single API call, triggered by queries, mutations, or subscriptions.
AppSync Pipeline Resolvers significantly simplify client-side application complexity and help enforce server-side business logic controls by orchestrating requests to multiple data sources. They can be used to enforce certain aspects of the application’s logic at the API layer.
This is an article series in three parts where we use a sample application to highlight how pipeline resolvers can be used to solve common problems, such as:
- Part 1: Implement conditional checks on the server-side before creating records in a backend database. In this example, we implement a user’s quota management feature directly in the GraphQL API.
- Part 2: Perform nested database queries, i.e. given a key that is present in two DynamoDB tables, aggregate data across tables.
- Part 3: Interact with data from heterogeneous data sources that may or may not be directly supported by AWS AppSync in a single call, i.e. DynamoDB using its built-in AppSync data source support and S3 buckets using a Lambda data source.
You can follow the links above to access the other articles in the series. In the first article you can find a CloudFormation template that can be used to deploy all the configured services in your AWS account as well as steps necessary to create users to test the different use cases.
Our sample application is a simple web app where users sign-in and publish posts to their profile. Users have a quota or subscription that determine how many posts they can publish, and they can also add others users as friends. Finally, the content of the posts can be formatted in HTML and require to support a size of up to a few megabytes. We provide full implementation details of the use-cases listed above using an AppSync GraphQL API.
Use case #3 – Heterogeneous data sources
Pipeline resolvers can be used to access different types of data sources in one query or mutation. The data sources can be database services such as Amazon RDS, DynamoDB or Elasticsearch, or a Lambda data source. With Lambda as data source, we can interact with backend services that are not directly supported by AppSync, for example, S3 buckets.
In our application, users might want to upload large posts, and storing them in DynamoDB might not be an option because of the size limitations. To work around DynamoDB row/document size limitation, we can use Amazon S3 to store and retrieve the contents of user’s posts.
In this example, we use a new feature of AppSync service called Direct Lambda Resolvers. We demonstrate how a direct Lambda resolver can be bound to both Mutation and Query fields, and how we can still use a Pipeline resolver function to perform additional control logic before invoking a Direct Lambda resolver. This approach enables us to treat a Direct Lambda Resolver as a microservice that interacts with its own data source. Our Lambda implementation is generic to handle multiple operations performed against S3 as a data source, i.e. create contents in the S3 bucket and retrieve contents from the S3 bucket. A user can call a Mutation to create contents in the S3 bucket, and similarly, users can invoke a Query to retrieve content of their posts from the S3 bucket. The S3 object key for each post content is constructed based on the userId
+ postId
attributes using a Pipeline Resolver function.
Let’s first review our Pipeline Resolver architecture. At a high-level, both Create Post Content and Get Post Content look similar. The only difference is the type of field they are bound to: the Create Post Content resolver is bound to a Mutation type field whereas the Get Post Content resolver is bound to a Query type field.
As we can see in the GraphQL schema, the createPostContent
Mutation field and the getPostContent
Query field are linked to the Pipeline resolvers:
Next we take a closer look at the two pipeline resolvers. Starting with the CreatePostContent
pipeline resolver, we notice the get_post_content_s3_key
function has a request and response mapping template. However the create_post_content_in_s3
function does not, as this function uses the S3BlogContent
Lambda function as a data source. When we disable the VTL request and response mapping templates, AppSync treats the Lambda data source as a Direct Lambda resolver.
Next, we take a look at the GetPostContent
pipeline resolver shown below. Notice we are re-using the get_post_content_s3_key
function. Also notice that we are also reusing the S3BlogContent
Lambda data source. As a Direct Lambda Resolver the create_post_content_in_s3
function does not have request and response mapping templates. The Lambda logic can be coded to handle multiple parent types
and fields
, we check how this is possible by introspecting the Lambda event object later in this section.
Both CreatePostContent
and GetPostContent
pipeline resolvers invoke the shared get_post_content_s3_key
pipeline function. The request mapping template of this function simply executes a DynamoDB query operation with a hash key and a filter expression. The hash key is set to postId
in the query expression. A filter expression is used to further reduce the results that contain posts for the given userId
. Since we are using a DynamoDB query operation, which targets a set of records in the Posts
table by postId
, there is no need to use a global secondary index userId-index
defined in the table. We created this index specifically to help speed up DynamoDB scan operations in the previous use case.
The response mapping template for the get_post_content_s3_key
pipeline function looks at the result retrieved from DynamoDB. If the sizes of the items retrieved from DynamoDB are less than or equal to zero, the function throws an Error. This blocks the execution of the direct lambda resolver. This approach also allows to simplify our Lambda logic by not testing for empty results. If the query operation retrieves results from DynamoDB and there are no errors, the pipeline function passes the results to next function in the pipeline linked to a Direct Lambda resolver.
#set ($count = $context.result.items.size())
#if ($count <= 0)
$util.error("Unknown postId: ${context.arguments.input.postId}, or userId: ${context.identity.username}")
#end
#if($context.error)
$util.error($context.error.message, $context.error.type)
#end
## Pass back the result from DynamoDB. **
$util.toJson($context.result)
There is no request or response templates for Direct Lambda Resolvers and no need to use VTL, therefore the create_post_content_in_s3
and get_post_content_from_s3
pipeline functions simply invoke the same Lambda data source, i.e. S3BlogContent
. We create this data source next, but let us first take a look at the Lambda event object that AppSync sends through to the Direct Lambda Resolver.
Lambda receives an AppSync event object containing several key fields. A sample AppSync Lambda event is shown below. In Lambda code, we can introspect the info
object to determine the type of logic we want to perform. This gives us a lot of flexibility to code the business logic the way we want. In order to create or retrieve content from the S3 bucket, we construct the S3 object key dynamically based on the userId
and the postId
passed in the Lambda event object. We take a look at how we select the userId
and the postId
in the Lambda business logic next.
{
"arguments": {
"input": {
"postId": "1001",
"content": "<html><body>post contents - 1001</body></html>"
}
},
"identity": {
"claims": {
"sub": "256475df-cea6-43b6-bb6b-bd9c434a421f",
"aud": "m46cd9iugaot4pqn797sccvm8",
"email_verified": true,
"event_id": "17b0df60-bab4-4ced-85e9-185a55e7da42",
"token_use": "id",
"auth_time": 1603003763,
"iss": "https://cognito-idp.us-east-2.amazonaws.com/us-east-XYZ",
"cognito:username": "user1",
"exp": 1603053330,
"iat": 1603049730,
"email": "user1@amazon.com"
},
"defaultAuthStrategy": "ALLOW",
"groups": null,
"issuer": "https://cognito-idp.us-east-2.amazonaws.com/us-east-XYZ",
"sourceIp": [
"A.B.C.D"
],
"sub": "256475df-cea6-43b6-bb6b-bd9c434a421f",
"username": "user1"
},
"source": null,
"request": {
"headers": {
// list of headers
}
},
"prev": {
"result": {
"items": [
{
"postId": "1001",
"title": "title1",
"userId": "user1",
"content": "first post"
}
],
"nextToken": null,
"scannedCount": 1,
"startedAt": null
}
},
"info": {
"selectionSetList": [],
"selectionSetGraphQL": "",
"parentTypeName": "Mutation",
"fieldName": "createPostContent",
"variables": {}
},
"stash": {}
}
The Lambda function manages data ingestion to an S3 bucket. If you executed the CloudFormation template, you should already have a Lambda function called appsync-direct-lambda-resolver-function
. If you’re creating it manually:
- In the Lambda console, under Lambda > Functions, click Create Function
- Enter a function name, i.e.
appsync-direct-lambda-resolver-function
. - Set the Runtime as nodejs12.x
- Update the timeout for the function to something larger than the default, for example: 30 seconds
- Add an environment variable called
CONTENT_BUCKET
and set it to the S3 bucket Lambda should connect to. - Paste in the following code contents:
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const bucketName = process.env.CONTENT_BUCKET;
exports.handler = async (event, context) => {
// we don't need to validate event.prev object or event.prev.result.items length
// because get_post_content_s3_key pipeline function only passes control over to Direct Lambda
// Resolver when DynamoDB Query operation returns exactly one valid result. As per our
// DynamoDB Post schema, there should never be multiple rows for a given userId and postId
let resp = {};
try {
// create contents in s3 bucket
if (event.info.fieldName === 'createPostContent') {
if (!event.arguments.input.postId || !event.arguments.input.content) {
const errorMessage = 'missing required parameters in createPostContent';
console.error('Exception occurred: ', errorMessage);
throw new Error(errorMessage);
}
const params = {
Body: event.arguments.input.content,
ContentType: 'text/plain',
Bucket: bucketName,
Key: event.prev.result.items[0].userId + '/' + event.arguments.input.postId
};
console.log('Creating object in bucket: ' + bucketName + ', s3 params: ' + JSON.stringify(params));
const data = await s3.putObject(params).promise();
resp = {
etag: data.ETag
};
}
// get contents from s3 bucket
else if(event.info.fieldName === 'getPostContent') {
if (!event.arguments.input.postId) {
const errorMessage = 'missing required parameters in getPostContent';
console.error('Exception occurred: ', errorMessage);
throw new Error(errorMessage);
}
const params = {
Bucket: bucketName,
Key: event.prev.result.items[0].userId + '/' + event.arguments.input.postId
};
console.log('Retrieving object from bucket: ' + bucketName + ', s3 params: ' + JSON.stringify(params));
const data = await s3.getObject(params).promise();
const content = data.Body.toString('utf-8');
resp = {
content: content
};
}
else {
const errorMessage = 'unsupported operation' + event.info.fieldName;
console.error('Exception occurred: ', errorMessage);
throw new Error(errorMessage);
}
}
catch (ex) {
console.error('Exception occurred: ', ex.message);
const promise = new Promise((resolve, reject) => {
reject(ex.message);
});
return promise;
}
return resp;
};
The code inspects the event.info.fieldName
details to determine how this Lambda was invoked via GraphQL with the operation details and what type of parameters it should expect. These parameters are passed in through the Mutation or Query operations. The Lambda logic above simply figures out whether to create an S3 object containing the contents or whether to read the an existing S3 Object.
As indicated earlier, in order to store or retrieve post contents from the S3 bucket, Lambda uses the userId
from the event.prev.result.items
array and the postId
from Lambda’s event.argument.input
object. The S3 object key is therefore created as userId
/postId
. Note the items
array always contains one element because of the way we have structured the Posts
table. When a user calls the createPostContent
mutation, the content is sent in the argument as event.arguments.input.content
. Lambda takes the content from this attribute and stores it in the S3 bucket using the S3 object key format above. An S3 putObject
API call is used to create the object and it returns the etag ID
of the created S3 object. Lambda simply passes this etag ID
back to the AppSync service.
Instead of using the userId
in the event.prev.result.items
array, we could use the value from event.identity.username
instead. However, in order to leverage the DynamoDB data and to ensure the user has indeed an existing post in Posts
table, we use the userId
present in the event.prev.result.items
array. Consequently, this means that we cannot execute the createPostContent
mutation or the getPostContent
query if the user’s post does not exist in the Posts
table. We’re leveraging both AppSync’s built-in resolvers in the pipeline as well as Lambda to perform distinct checks and organize the business logic in the pipeline.
Lambda retrieves the name of the S3 bucket from an environment variable called CONTENT_BUCKET
. Set this environment variable prior to testing the createPostContent
mutation or getPostContent
query. The environment variable is automatically set to the name of bucket that was created when you used the provided CloudFormation template. Refer to the stack output in the CloudFormation service console to determine the name of the S3 content bucket.
Lambda must have an IAM role assigned to it. This IAM role must have permissions to read and write objects from the S3 bucket. A sample IAM policy for Lambda is provided below. Replace <YOUR_S3_BUCKET_NAME>
with your bucket. If you deployed the CloudFormation template, the Lambda execution role is automatically created for you and the S3 bucket name is automatically set.
AppsyncResolverLambdaExecutionRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path : "/"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: 'allow_s3'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
Resource:
Fn::Join:
- ""
-
- "arn:aws:s3:::"
- "<YOUR_S3_BUCKET_NAME>"
- "/*"
Once the Lambda function is created, we can add it as a new data source in the AppSync API. Again, if you deployed the CloudFormation template in the beginning of this article, you should already have a Lambda data source linked to AppSync and called S3BlogContent
.
- Under AppSync > Data Sources
- Click New Data Source
- Specify a data source name, i.e.
S3BlogContent
, select the AWS Lambda Function as the data source type by selecting the region and the Lambda function created in the previous step, i.e.appsync-direct-lambda-resolver-function
.
You should now have the Lambda function as a data source in your GraphQL API:
We can now test the createPostContent
mutation and the getPostContent
query in the AppSync console. There are two ways to prepare the Posts
table prior to testing.
- If there is an existing post created by the currently logged in user, note down the postId and follow the steps below to invoke the
createPostContent
mutation and thegetPostContent
query operation. - Alternatively, create some sample data directly in the DynamoDB console in the
Users
table foruser1
.
Let’s assume the Posts
table does not contain any user posts. Create a new post by invoking a createPost
mutation. The createPost
mutation operation returns a postId
that we use in the next step. You can skip this step if you already have a valid postId
present in the Posts
table for user1
.
Now invoke the createPostContent
mutation and pass the postId
returned above:
We can confirm a folder with the userId
was created with the postId
as file name in the S3 bucket:
Finally, let’s retrieve the content of the S3 object that we just created by calling the getPostContent
query. This operation only takes postId
as parameter and retrieves the content of the S3 object we just created.
As you can see, both operations work as expected.
Conclusion
With pipeline resolvers, developers can compose operations, orchestrate and execute business logic directly in AppSync, allowing to easily interact with multiple data sources with a single GraphQL API call, saving network bandwidth and resources.
In this article we demonstrated how to take advantage of Direct Lambda Resolvers to build flexible serverless business logic in your programming language of choice on GraphQL with AppSync. We also demonstrated how we can easily mix and match Direct Lambda Resolvers as a pipeline function inside a Pipeline Resolver. In the previous articles we implemented a user quota management system with pipeline resolvers directly in AppSync without the need to maintain any additional backend infrastructure and how to perform data aggregation from different DynamoDB tables leveraging pipeline resolvers.