Front-End Web & Mobile

Practical use cases for AWS AppSync Pipeline Resolvers – Part 1: Quota Management

This article was written by Salman Moghal, Application Architect, AWS, and Abdul Kitana, Security Architect, AWS

Overview

AWS AppSync is a fully managed service that allows to deploy Serverless GraphQL backends in the AWS cloud. It provides features that developers can use to create modern data driven applications allowing to easily query multiple databases, microservices, and APIs from a single GraphQL endpoint. Customers can leverage AppSync real-time capabilities, offline data synchronization, built-in server-side caching, fine-grained access control, security, support for business logic in the API layer using GraphQL resolvers, and more. In this article, we focus on how to implement and orchestrate backend logic directly in the AppSync GraphQL API layer using pipeline resolvers.

Resolvers are built-in functions in GraphQL that “resolve” types or fields defined in the GraphQL schema with the data in the data sources. Resolvers in AppSync can be of two types: unit resolvers or pipeline resolvers. Pipeline resolvers offer the ability to serially execute operations against multiple data sources in single API call, triggered by queries, mutations, or subscriptions.

AppSync Pipeline Resolvers significantly simplify client-side application complexity and help enforce server-side business logic controls by orchestrating requests to multiple data sources. They can be used to enforce certain aspects of the application’s logic at the API layer.

This is an article series in three parts where we use a sample application to highlight how pipeline resolvers can be used to solve common problems, such as:

You can follow the links above to access the other articles in the series. In this article we explain the backend configuration as well as the initial setup using AWS CloudFormation, then move on to the first use case.

Our sample application is a simple web app where users sign-in and publish posts to their profile. Users have a quota or subscription that determine how many posts they can publish, and they can also add others users as friends. Finally, the content of the posts can be formatted in HTML and require to support a size of up to a few megabytes.  We provide full implementation details of the use-cases listed above using an AppSync GraphQL API.

 

Initial Setup

Use the following CloudFormation template to build the entire solution, including the required Amazon Cognito User Pool, DynamoDB tables, GraphQL schema, resolvers, and data sources including a Lambda function. An S3 bucket is also created where Lambda stores and retrieves blog content. You can quickly deploy the sample AppSync backend in your own account with the following template using the AWS CloudFormation console:

Description: AWSAppSync Patterns Blog Infrastructure
Parameters:
  DdbPostTableGSI1:
    Type: String
    Description: Post Table global secondary index name
    Default: userId-index
Resources:
  # ----------------------------------------------------------
  # create dynamodb tables
  DdbUsersTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: "users"
      ProvisionedThroughput: 
        ReadCapacityUnits: 2
        WriteCapacityUnits: 2
      AttributeDefinitions:
        -
          AttributeName: "userId"
          AttributeType: "S"
      KeySchema:
        -
          AttributeName: "userId"
          KeyType: "HASH"
  DdbPostsTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: "posts"
      ProvisionedThroughput: 
        ReadCapacityUnits: 2
        WriteCapacityUnits: 2
      AttributeDefinitions:
        -
          AttributeName: "userId"
          AttributeType: "S"
        -
          AttributeName: "postId"
          AttributeType: "S"
      KeySchema:
        -
          AttributeName: "postId"
          KeyType: "HASH"
      GlobalSecondaryIndexes:
        - 
          IndexName: !Ref DdbPostTableGSI1
          KeySchema: 
            - 
              AttributeName: "userId"
              KeyType: "HASH"
          Projection:
            ProjectionType: "ALL"
          ProvisionedThroughput:
            ReadCapacityUnits: 2
            WriteCapacityUnits: 2
  # ----------------------------------------------------------
  # create lambda resources
  AppsyncResolverLambdaPermissionPolicy:
    Type: 'AWS::Lambda::Permission'
    Properties:
      FunctionName: !Ref AppsyncResolverLambda
      Action: 'lambda:InvokeFunction'
      Principal: "appsync.amazonaws.com"
  AppsyncResolverLambdaExecutionRole:
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path : "/"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: 'allow_s3'
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
            - Effect: Allow
              Action:
              - s3:GetObject
              - s3:PutObject
              Resource: 
                Fn::Join: 
                  - ""
                  - 
                    - "arn:aws:s3:::"
                    - !Ref S3BucketBlogPostContent
                    - "/*"
  AppsyncResolverLambda:
    Type: 'AWS::Lambda::Function'
    DependsOn: AppsyncResolverLambdaExecutionRole
    Properties:
      Runtime: nodejs12.x
      Timeout: 30
      Description: 'AppSync  post content from S3'
      FunctionName: 'appsync-direct-lambda-resolver-function'
      Handler: 'index.handler'
      Role: !GetAtt AppsyncResolverLambdaExecutionRole.Arn
      Environment:
        Variables:
          CONTENT_BUCKET: !Ref S3BucketBlogPostContent
      Code:
        ZipFile: |
          const AWS = require('aws-sdk');
          const s3 = new AWS.S3();
          const bucketName = process.env.CONTENT_BUCKET;

          exports.handler = async (event, context) => {
              //console.log('Event: ' + JSON.stringify(event));
              //console.log('Context: ' + JSON.stringify(context));
              //console.log('Bucket: ' + bucketName);
              
              // we don't need to validate event.prev object or event.prev.result.items length
              // because get_post_content_s3_key pipeline function only passes control over to Direct Lambda
              // Resolver when DynamoDB Query operation returns exactly one valid result.  As per our
              // DynamoDB Post schema, there should never be multiple rows for a given userId and postId

              let resp = {};
              try {
                // create contents in s3 bucket
                if (event.info.fieldName === 'createPostContent') {
                    if (!event.arguments.input.postId || !event.arguments.input.content) {
                        const errorMessage = 'missing required parameters in createPostContent';
                        console.error('Exception occurred: ', errorMessage);
                        throw new Error(errorMessage);
                    }
                    const params = {
                        Body: event.arguments.input.content,
                        ContentType: 'text/plain',
                        Bucket: bucketName,
                        Key: event.prev.result.items[0].userId + '/' + event.arguments.input.postId
                    };
                    console.log('Creating object in bucket: ' + bucketName + ', s3 params: ' + JSON.stringify(params));
                    const data = await s3.putObject(params).promise();
                    resp = {
                        etag: data.ETag 
                    };
                }
                // get contents from s3 bucket
                else if(event.info.fieldName === 'getPostContent') {
                    if (!event.arguments.input.postId) {
                        const errorMessage = 'missing required parameters in getPostContent';
                        console.error('Exception occurred: ', errorMessage);
                        throw new Error(errorMessage);
                    }
                    const params = {
                      Bucket: bucketName,
                      Key: event.prev.result.items[0].userId + '/' + event.arguments.input.postId
                    };
                    console.log('Retrieving object from bucket: ' + bucketName + ', s3 params: ' + JSON.stringify(params));
                    const data = await s3.getObject(params).promise();
                    const content = data.Body.toString('utf-8');
                    resp = {
                        content: content
                    };
                }
                else {
                    const errorMessage = 'unsupported operation' + event.info.fieldName;
                    console.error('Exception occurred: ', errorMessage);
                    throw new Error(errorMessage);
                }
              }
              catch (ex) {
                console.error('Exception occurred: ', ex.message);
                const promise = new Promise((resolve, reject) => {
                  reject(ex.message);
                });
                return promise;
              }

              return resp;
          };
  # ----------------------------------------------------------
  # create cogito resources
  SNSRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
        - Effect: "Allow"
          Principal:
            Service:
            - "cognito-idp.amazonaws.com"
          Action:
          - "sts:AssumeRole"
      Policies:
      - PolicyName: "CognitoSNSPolicy"
        PolicyDocument:
          Version: "2012-10-17"
          Statement:
          - Effect: "Allow"
            Action: "sns:publish"
            Resource: "*"
          - Effect: "Deny"
            Action: "sns:publish"
            Resource: "arn:aws:sns:*:*:*"
  UserPool:
    Type: "AWS::Cognito::UserPool"
    Properties:
      UserPoolName: appsync-patterns-blog-user-pool
      AutoVerifiedAttributes:
      - email
      MfaConfiguration: "OPTIONAL"
      SmsConfiguration:
        ExternalId: appsync-patterns-blog-external
        SnsCallerArn: !GetAtt SNSRole.Arn
      Schema:
      - Name: email
        AttributeDataType: String
        Mutable: true
        Required: true
  UserPoolClient:
    Type: "AWS::Cognito::UserPoolClient"
    Properties:
      ClientName: appsync-patterns-blog-client
      GenerateSecret: false
      UserPoolId: !Ref UserPool
      AllowedOAuthFlows:
      - code
      AllowedOAuthFlowsUserPoolClient: true
      AllowedOAuthScopes:
      - email
      - phone
      - openid
      CallbackURLs:
      - https://aws.amazon.com/cognito/
      LogoutURLs:
      - https://aws.amazon.com/cognito/
      DefaultRedirectURI: https://aws.amazon.com/cognito/
      ExplicitAuthFlows:
      - ALLOW_USER_PASSWORD_AUTH
      - ALLOW_USER_SRP_AUTH
      - ALLOW_REFRESH_TOKEN_AUTH
      PreventUserExistenceErrors: ENABLED
      SupportedIdentityProviders:
      - COGNITO

  UserPoolDomain:
    Type: AWS::Cognito::UserPoolDomain
    Properties: 
      Domain: !Sub 'appsync-patterns-blog-${AWS::AccountId}'
      UserPoolId: !Ref UserPool
  # ----------------------------------------------------------
  # create appsync resources
  AppSyncIamRole:
    Type: 'AWS::IAM::Role'
    Properties:
      RoleName: IamRoleForAppSyncToDynamoDB
      Description: Allow AppSync to access DynamoDB tabes
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - appsync.amazonaws.com 
            Action:
              - 'sts:AssumeRole'
      Policies:
        - PolicyName: AllowAccessForAppsyncResolvers
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                - dynamodb:GetItem
                - dynamodb:PutItem
                - dynamodb:Query
                - dynamodb:Scan
                - dynamodb:UpdateItem
                Resource: 
                - !Join
                  - ''
                  - - !Sub 'arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/'
                    - !Ref DdbUsersTable
                    - '*'
                - !Join
                  - ''
                  - - !Sub 'arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/'
                    - !Ref DdbPostsTable
                    - '*'
              - Effect: Allow
                Action:
                - lambda:invokeFunction
                Resource: 
                  - !GetAtt AppsyncResolverLambda.Arn
  GraphQLApi:
    Type: AWS::AppSync::GraphQLApi
    Properties:
      Name: appsync-patterns-blog
      AuthenticationType: AMAZON_COGNITO_USER_POOLS
      UserPoolConfig:
        AwsRegion: !Ref AWS::Region
        DefaultAction: ALLOW
        UserPoolId: !Ref UserPool
      AdditionalAuthenticationProviders: 
        - AuthenticationType: AWS_IAM
  UsersDynamoDBTableDataSource:
    Type: "AWS::AppSync::DataSource"
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: DdbUsersTable
      Type: AMAZON_DYNAMODB
      ServiceRoleArn: !GetAtt AppSyncIamRole.Arn
      DynamoDBConfig:
        AwsRegion: !Ref AWS::Region
        TableName: !Ref DdbUsersTable          
  PostsDynamoDBTableDataSource:
    Type: "AWS::AppSync::DataSource"
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: PostsDynamoDBTable
      Type: AMAZON_DYNAMODB
      ServiceRoleArn: !GetAtt AppSyncIamRole.Arn
      DynamoDBConfig:
        AwsRegion: !Ref AWS::Region
        TableName: !Ref DdbPostsTable                       
  LambdaDataSource:
    Type: "AWS::AppSync::DataSource"
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: S3BlogContent
      Type: AWS_LAMBDA
      ServiceRoleArn: !GetAtt AppSyncIamRole.Arn
      LambdaConfig:
        LambdaFunctionArn: !GetAtt AppsyncResolverLambda.Arn
  S3BucketBlogPostContent:
    Type: 'AWS::S3::Bucket'
    Properties: {}
  GraphQLSchema:
    Type: "AWS::AppSync::GraphQLSchema"
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Definition: |
        type Mutation {
          createPost(input: CreatePostInput!): Post
          createPostContent(input: CreatePostContentInput!): String
        }
        type Query {
          getFriendsPosts: PostConnection
          getPostContent(input: GetPostContentInput!): String
        }
        input CreatePostInput {
          title: String!
          content: String!
        }
        input CreatePostContentInput {
          postId: ID!
          content: String!
        }
        input GetPostContentInput {
          postId: ID!
        }
        type Post {
          userId: String!
          postId: ID!
          title: String!
          content: String!
        }
        type SubscriptionDetails {
          subscription_tier: String!
          maxPosts: Int!
        }
        type User {
          userId: String!
          email: String!
          name: String!
          subscription: SubscriptionDetails
          friends: [String]
        }
        type PostConnection {
          items: [Post]
          nextToken: String
        }
  # Usecase 1 - AppSync Pipeline Resolver - Mutation - createPost
  CreatePostPipelineResolver:
    Type: "AWS::AppSync::Resolver"
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      TypeName: Mutation
      FieldName: createPost 
      Kind: PIPELINE
      PipelineConfig: 
        Functions:
          - !GetAtt GetSubscriptionLimitFunction.FunctionId
          - !GetAtt CheckNumberOfPostsAgainstLimitFunction.FunctionId
          - !GetAtt CreatePostFunction.FunctionId
      RequestMappingTemplate: "{}"
      ResponseMappingTemplate: "$util.toJson($context.result)"  
    DependsOn: GraphQLSchema
  # AppSync Pipeline Resolver Function - get_subscription_limit
  GetSubscriptionLimitFunction:
    Type: AWS::AppSync::FunctionConfiguration 
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: get_subscription_limit
      DataSourceName: !GetAtt UsersDynamoDBTableDataSource.Name
      FunctionVersion: "2018-05-29"
      RequestMappingTemplate: |
        {
            "version" : "2017-02-28",
            "operation" : "GetItem",
            "key" : {
                "userId" : $util.dynamodb.toDynamoDBJson($context.identity.username)
            }
        }
      ResponseMappingTemplate: |
        #set($result = {})
        #set($result.limit = $context.result.subscription.maxPosts)

        #if($context.error)
          $util.error($context.error.message, $context.error.type)
        #end

        $util.toJson($result) 
  # AppSync Pipeline Resolver Function - check_number_of_posts_against_limit
  CheckNumberOfPostsAgainstLimitFunction:
    Type: AWS::AppSync::FunctionConfiguration 
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: check_number_of_posts_against_limit
      DataSourceName: !GetAtt PostsDynamoDBTableDataSource.Name
      FunctionVersion: "2018-05-29"
      RequestMappingTemplate: |
        {
            "version": "2017-02-28",
            "operation": "Scan",
            "index": "userId-index",
            "filter": {
                "expression": "userId = :userId",
                "expressionValues": {
                    ":userId": $util.dynamodb.toDynamoDBJson($context.identity.username)
                }
            }
        }
      ResponseMappingTemplate: |
        #set ($limit = $context.prev.result.limit)
        #set ($count = $context.result.items.size())
        #if ($limit <= $count)
          $util.error("Posts Limit Reached: limit: ${limit}, posts by ${context.identity.username}: ${context.result.items.size()}")
        #end

        #if($context.error)
          $util.error($context.error.message, $context.error.type)
        #end

        $util.toJson($result)
  # AppSync Pipeline Resolver Function - create_post
  CreatePostFunction:
    Type: AWS::AppSync::FunctionConfiguration 
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: create_post
      DataSourceName: !GetAtt PostsDynamoDBTableDataSource.Name
      FunctionVersion: "2018-05-29"
      RequestMappingTemplate: |
        {
            "version" : "2017-02-28",
            "operation" : "PutItem",
            "key" : {
                "postId": $util.dynamodb.toDynamoDBJson($utils.autoId())
            },
            "attributeValues" : {
                "userId" : $util.dynamodb.toDynamoDBJson($context.identity.username),
                "title": $util.dynamodb.toDynamoDBJson($context.arguments.input.title),
                "content": $util.dynamodb.toDynamoDBJson($context.arguments.input.content)
            },
            "condition": {
                "expression": "attribute_not_exists(#postId) AND attribute_not_exists(#userId)",
                "expressionNames": {
                  "#postId": "postId",
                  "#userId": "userId"
                }
            }
        }
      ResponseMappingTemplate: |
        #if($context.error)
          $util.error($context.error.message, $context.error.type)
        #end
        
        $util.toJson($context.result)
  
  # Usecase 2 - AppSync Pipeline Resolver - Query - getFriendsPosts
  GetFriendsPostsPipelineResolver:
    Type: AWS::AppSync::Resolver
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      FieldName: getFriendsPosts
      TypeName: Query
      Kind: PIPELINE
      PipelineConfig:
        Functions:
          - !GetAtt GetListOfFriendsFunction.FunctionId
          - !GetAtt GetFriendsPostsFunction.FunctionId
      # following represents BEFORE segment in pipeline resolver
      RequestMappingTemplate: |
        $util.qr($context.stash.put("userId", $context.identity.username))
        {}
      # following represents AFTER segment in pipeline resolver
      ResponseMappingTemplate: |
        $util.toJson($context.prev.result)
    DependsOn: GraphQLSchema

  # AppSync Pipeline Resolver Function - get_list_of_friends
  # Description: Function to get list of friends for current userId
  GetListOfFriendsFunction:
    Type: AWS::AppSync::FunctionConfiguration
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: get_list_of_friends
      DataSourceName: !GetAtt UsersDynamoDBTableDataSource.Name
      FunctionVersion: "2018-05-29"
      RequestMappingTemplate: |
        {
          "operation": "GetItem",
          "key": {
            "userId": $util.dynamodb.toDynamoDBJson($context.stash.userId)
          }
        }
      ResponseMappingTemplate: |
        $util.toJson($context.result.friends)

  # AppSync Pipeline Resolver Function - get_friends_posts
  # Description: Function performs "scan" operation on userId column in posts table
  GetFriendsPostsFunction:
    Type: AWS::AppSync::FunctionConfiguration
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: get_friends_posts
      DataSourceName: !GetAtt PostsDynamoDBTableDataSource.Name
      FunctionVersion: "2018-05-29"
      RequestMappingTemplate: |
        #set($expressionArr = [])
        #set($expressionValueMap = {})

        #foreach($friend in ${context.prev.result})
          ## build a template or placeholder
          #set($expressionTemplate = ":t" + $foreach.count)
          ## now build expression array
          #set($partialExpressionStr = "userId = ${expressionTemplate}")
          $util.qr($expressionArr.add($partialExpressionStr))
          ## also build expression value map
          $util.qr($expressionValueMap.put($expressionTemplate, $util.dynamodb.toString($friend)))
        #end
        ## lets now build the final expression with OR conditions
        #set($expressionStr = "")
        #foreach($expr in ${expressionArr})
          #if($foreach.count == $expressionArr.size())
            #set($expressionStr = "${expressionStr}${expr}")
          #else
            #set($expressionStr = "${expressionStr}${expr} OR ")
          #end
        #end
        {
          "operation": "Scan",
          "index": "userId-index",
          "filter": {
            #if(!$expressionArr.isEmpty())
              "expression": $util.toJson($expressionStr),
              "expressionValues" : $util.toJson($expressionValueMap)
            #else
              #set($expressionStr = "attribute_not_exists(postId)")
              "expression": $util.toJson($expressionStr),
            #end
          },
          "limit": $util.defaultIfNull($context.arguments.limit, 86400),
          "nextToken": $util.toJson($util.defaultIfNullOrEmpty($context.arguments.nextToken, null)),
        }
      ResponseMappingTemplate: |
        $util.toJson($context.result)

  # Usecase 3 - AppSync Resolver - Mutation - createPostContent
  # CreatePostContentResolver:
  #   Type: AWS::AppSync::Resolver
  #   Properties:
  #     ApiId: !GetAtt GraphQLApi.ApiId
  #     DataSourceName: !GetAtt LambdaDataSource.Name
  #     FieldName: createPostContent
  #     TypeName: Mutation
  #   DependsOn: GraphQLSchema

  # Usecase 3 - AppSync Pipeline Resolver - Query - getPostContent
  # GetPostContentResolver:
  #   Type: AWS::AppSync::Resolver
  #   Properties:
  #     ApiId: !GetAtt GraphQLApi.ApiId
  #     DataSourceName: !GetAtt LambdaDataSource.Name
  #     FieldName: getPostContent
  #     TypeName: Query
  #   DependsOn: GraphQLSchema

  # Usecase 3 - AppSync Pipeline Resolver - Mutation - createPostContent
  CreatePostContentPipelineResolver:
    Type: "AWS::AppSync::Resolver"
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      TypeName: Mutation
      FieldName: createPostContent 
      Kind: PIPELINE
      PipelineConfig: 
        Functions:
          - !GetAtt GetPostContentS3KeyFunction.FunctionId
          - !GetAtt CreatePostContentInS3Function.FunctionId
      RequestMappingTemplate: "{}"
      ResponseMappingTemplate: "$util.toJson($context.result)"
    DependsOn: GraphQLSchema

  # AppSync Pipeline Resolver Function - create_post_content_in_s3
  CreatePostContentInS3Function:
    Type: AWS::AppSync::FunctionConfiguration 
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: create_post_content_in_s3
      DataSourceName: !GetAtt LambdaDataSource.Name
      FunctionVersion: "2018-05-29"

  # Usecase 3 - AppSync Pipeline Resolver - Query - getPostContent
  GetPostContentPipelineResolver:
    Type: "AWS::AppSync::Resolver"
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      TypeName: Query
      FieldName: getPostContent 
      Kind: PIPELINE
      PipelineConfig: 
        Functions:
          - !GetAtt GetPostContentS3KeyFunction.FunctionId
          - !GetAtt GetPostContentFromS3Function.FunctionId
      RequestMappingTemplate: "{}"
      ResponseMappingTemplate: "$util.toJson($context.result)"
    DependsOn: GraphQLSchema
  # AppSync Pipeline Resolver Function - get_post_content_s3_key
  GetPostContentS3KeyFunction:
    Type: AWS::AppSync::FunctionConfiguration 
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: get_post_content_s3_key
      DataSourceName: !GetAtt PostsDynamoDBTableDataSource.Name
      FunctionVersion: "2018-05-29"
      RequestMappingTemplate: |
        {
            "version" : "2017-02-28",
            "operation" : "Query",
            "query" : {
              "expression": "postId = :postId",
              "expressionValues" : {
                ":postId" : $util.dynamodb.toDynamoDBJson($context.arguments.input.postId)
              }
            },
            "filter": {
                "expression": "userId = :userId",
                "expressionValues" : {
                  ":userId" : $util.dynamodb.toDynamoDBJson($context.identity.username)
                }
            },
            "scanIndexForward": true,
            "limit": $util.defaultIfNull(${context.arguments.limit}, 1000),
            "nextToken": $util.toJson($util.defaultIfNullOrBlank($context.arguments.nextToken, null))
        }
      ResponseMappingTemplate: |
        #set ($count = $context.result.items.size())
        #if ($count <= 0)
          $util.error("Unknown postId: ${context.arguments.input.postId}, or userId: ${context.identity.username}")
        #end

        #if($context.error)
            $util.error($context.error.message, $context.error.type)
        #end
        ## Pass back the result from DynamoDB. **
        $util.toJson($context.result)  
  # AppSync Pipeline Resolver Function - get_post_content_from_s3
  GetPostContentFromS3Function:
    Type: AWS::AppSync::FunctionConfiguration 
    Properties:
      ApiId: !GetAtt GraphQLApi.ApiId
      Name: get_post_content_from_s3
      DataSourceName: !GetAtt LambdaDataSource.Name
      FunctionVersion: "2018-05-29"
Outputs:
  S3BucketBlogPostContent:
    Value: !Ref S3BucketBlogPostContent
    Description: S3 Bucket Name for Blog Post Content

Once the CloudFormation template above is deployed, it creates an AppSync API with the following GraphQL schema:

type Mutation {
  createPost(input: CreatePostInput!): Post
  createPostContent(input: CreatePostContentInput!): String
}
type Query {
  getFriendsPosts: PostConnection
  getPostContent(input: GetPostContentInput!): String
}
input CreatePostInput {
  title: String!
  content: String!
}
input CreatePostContentInput {
  postId: ID!
  content: String!
}
input GetPostContentInput {
  postId: ID!
}
type Post {
  userId: String!
  postId: ID!
  title: String!
  content: String!
}
type SubscriptionDetails {
  subscription_tier: String!
  maxPosts: Int!
}
type User {
  userId: String!
  email: String!
  name: String!
  subscription: SubscriptionDetails
  friends: [String]
}
type PostConnection {
  items: [Post]
  nextToken: String
}

There are two predominant types in the GraphQL schema above, User and Post.  These types represent records stored in the DynamoDB tables Users and Posts, respectively:

Users Table

Attribute 1 Attribute 2 Attribute 3 Attribute 4 Attribute 5
userId name email subscription friends

 

  • userId is a unique identifier for each user in the Users table.  AppSync automatically links the user ID to the Cognito’s sub user attribute present in the JWT token, which is an unique identifier for the user.
  • subscription represents user’s quota based on their subscription tier.  This field contains an object that has two fields, i.e. subscription_tier and maxPosts.  maxPosts is a numeric field that indicates how many posts a user is allowed to publish in this tier.
  • friends contain a list of user IDs.  These make up for user’s friends.  Note that this attribute could be set to an empty array [] or can be completely missing from the specific user entry.

 

Posts Table

Attribute 1 Attribute 2 Attribute 3 Attribute 4
postId userId title content

 

  • postId is a unique identifier for each post in Posts table.
  • userId represents who the post belongs to.  This is a global secondary index defined for this attribute, and it is not the sort key.
  • content represents the post contents in string format.  We demonstrate how we can store large articles in a S3 bucket as DynamoDB is limited at 400Kb for each record size.

The use cases we discuss in all articles in this series leverage the Users and Posts tables as well as the resources deployed in the CloudFormation template provided earlier.

Before diving into the use cases, we must register a user in the Cognito User Pool deployed by the CloudFormation stack mentioned previously.  In order to do so:

  • Navigate to Amazon Cognito in the AWS Management Console and access the appsync-patterns-blog-user-pool
  • Navigate to App Integration > App Client Settings on the left-hand-side menu.
  • Click Launch Hosted UI to launch Cognito hosted UI login screen.
  • Click the Sign-up link at the bottom of the dialog box.
  • Register a new username user1 with a valid email address.  Wait for Cognito to send a confirmation code to the email address and finish the sign-up process.  You may be redirected to the Amazon Cognito product launch page, feel free to close the browser window.
  • Verify the user1 exists in the appsync-patterns-blog-user-pool and the account status is set to CONFIRMED as seen below.
  • Optionally, repeat these registration steps and create more users in the user pool.

 

Use case #1 – Quota management

Our first use case deals with implementing simple business logic that is executed before sending a request to a data source such as a DynamoDB table.  We show how to implement a user quota check feature with pipeline resolvers. In this example, users submit posts that are stored in a Posts table. Users have subscription tiers with assigned quotas that dictate the maximum number of posts they can publish with each user’s subscription limits stored in a separate Users table. Using Pipeline Resolvers, we validate the number of posts each user has stored in the Posts table, and check that against the limit we retrieve from the Users table.

 

 

As per the pipeline architecture above, we create three pipeline functions.  These functions enforce the logic we require:

  • get_subscription_limit – retrieves the maximum number of posts a user can publish from the Users table.  The table contains a subscription attribute (not to be confused with GraphQL subscription operations) with information about the user’s subscription details.  An example value may look like this:
{
    "subscription_tier": "Premium",
    "maxPosts" : 5
}
  • check_number_of_posts_against_limit –  checks the number of posts the user has submitted against the limit that was pulled in the previous function. If the number of posts exceeds the limit then an error is returned to the user, otherwise we proceed to the last function.
  • create_post: Adds the post the user submitted to the Posts table (if the user is under their allocated limit).

A createPost mutation operation is defined in the GraphQL schema as follows:

type Mutation {
	createPost(input: CreatePostInput!): Post
}
input CreatePostInput {
	title: String!
	content: String!
}
type Post {
	userId: String!
	postId: ID!
	title: String!
	content: String!
}

The Create Post pipeline resolver, linked to the createPost mutation field, is defined as follows.  Notice the three pipeline functions that we explained above, as seen in our pipeline architecture diagram.

 

 

The first pipeline function get_subscription_limit reads a record from the Users table and retrieves the subscription data to determine what is the maximum limit of records a user is allowed to create in the Posts table.  The request template for this function is straight forward as it performs a simple DynamoDB GetItem operation.  The userId is retrieved from the AppSync built-in $context.identity object from the request.  It contains the identity information about the logged in user.

We use the AppSync built-in $util utility helpers in our resolver templates providing useful built-in helper methods.  In this specific case, $util.dynamodb makes it easier to read and write data to DynamoDB.

{
    "version": "2017-02-28",
    "operation": "GetItem",
    "key": {
        "userId": $util.dynamodb.toDynamoDBJson($context.identity.username)
    }
}

The response template for the get_subscription_limit function is where we receive the user’s quota and assign it to the $result.limit variable.  We access this variable in our next pipeline function to evaluate the user quota.  If there is any error executing the GetItem operation, the response resolver catches it and throws an error with contextual information.  If there are no errors, the result from the GetItem operation is passed to next pipeline function in the sequence.

#set($result = {})
#set($result.limit = $context.result.subscription.maxPosts)

#if($context.error)
    $util.error($context.error.message, $context.error.type)
#end

$util.toJson($result)

Next, we build the check_number_of_posts_against_limit function.  This pipeline function operates on the Posts table.  It executes a DynamoDB scan operation to list all records in the table that match a filter expression and lists the records that belong to the currently logged in user.

{
    "version": "2017-02-28",
    "operation": "Scan",
    "index": "userId-index",
    "filter": {
        "expression": "userId = :userId",
        "expressionValues": {
            ":userId": $util.dynamodb.toDynamoDBJson($context.identity.username)
        }
    }
}

The response template for the check_number_of_posts_against_limit function first assigns $result.limit that was set in the previous get_subscription_limit pipeline function to a variable $limit. The variable $context.prev indicates that we want to access data set in the previous pipeline function, therefore $context.prev.result.limit refers to the $result.limit in the get_subscription_limit pipeline function.

Now we need to access the total number of records the function check_number_of_posts_against_limit request resolver retrieved after the filter was applied. These records (or items) are available in $context.result.items as an array.  We use the VTL size() function to retrieve the total count. The variable   $context.result.scannedCount contains the total number of records the scan operation accessed in the table without applying the filter condition, therefore it is not useful in our use case.  Note that $ctx and $context can be used interchangeably.  We assign the size value in $context.result.items.size() to the $count variable.

Finally, to determine whether the user has breached their maximum posts limit, we can compare the variables $limit and $count. If $limit is less than or equal to $count, we throw an error denying the user from creating another post in the Posts table.  Otherwise, since user has not breached their maximum posts limit, we proceed to our next pipeline function where we create a record in the Posts table.

#set ($limit = $context.prev.result.limit)
#set ($count = $context.result.items.size())
#if ($limit <= $count)
  $util.error("Posts Limit Reached: limit: ${limit}, posts by ${context.identity.username}: ${context.result.items.size()}")
#end

#if($context.error)
  $util.error($context.error.message, $context.error.type)
#end

$util.toJson($result)

Finally, we add the last pipeline function called create_post.  This function simply creates an entry in the Posts table.  It automatically generates the postId using the utility helper $util.autoId().  The value for the userId is retrieved from the AppSync $context.identity object as before.  The title and content values are then stored in Posts table.

There is a filter condition added to the PutItem operation that prohibits duplicate entries from being created in Posts table, guaranteeing a record containing specific postIds and userIds must always be unique.

{
    "version" : "2017-02-28",
    "operation" : "PutItem",
    "key" : {
        "postId": $util.dynamodb.toDynamoDBJson($utils.autoId())
    },
    "attributeValues" : {
        "userId" : $util.dynamodb.toDynamoDBJson($context.identity.username),
        "title": $util.dynamodb.toDynamoDBJson($context.arguments.input.title),
        "content": $util.dynamodb.toDynamoDBJson($context.arguments.input.content)
    },
    "condition": {
        "expression": "attribute_not_exists(#postId) AND attribute_not_exists(#userId)",
        "expressionNames": {
              "#postId": "postId",
              "#userId": "userId"
        }
    }
}

The response template for the create_post function is rather simple.  It simply returns the result of the DynamoDB PutItem operation to the caller.  If there is any error executing the PutItem operation, the response resolver catches it and throws an error with proper contextual information.

#if($context.error)
    $util.error($context.error.message, $context.error.type)
#end

$util.toJson($context.result)

Before testing our pipeline resolver, let’s create sample records in the DynamoDB Users table.  Using the DynamoDB console, create the following records:

{
  "email": "user1@email.com",
  "friends": [
    "user2"
  ],
  "name": "Test User 1",
  "subscription": {
    "subscription_tier": "Premium",
    "maxPosts" : 5
  },
  "userId": "user1"
}

Now, we can test by using the query console in AppSync.  Before executing the query, AppSync prompts you to log in to the user pool. Use the user1 user you created during the setup after deploying the CloudFormation template.  We can see the first post is created successfully:

 

 

However, once we pass the posts limit (set at 5), we receive an error when trying to create more posts:

 

 

Conclusion

With pipeline resolvers, developers can compose operations, orchestrate and execute business logic directly in AppSync, allowing to easily interact with multiple data sources with a single GraphQL API call, saving network bandwidth and resources.

In this post, we showcased how to combine multiple data sources in a pipeline resolver to create powerful and flexible backend business logic to enforce data integrity in our application. We went over a simple scenario of implementing user quota system directly in AppSync without the need to maintain any additional backend infrastructure. In the next articles we show you how to aggregate data across tables using pipeline resolvers and how to use Direct Lambda Resolvers in a pipeline to interact with an S3 bucket where posts content is stored.