AWS Database Blog

Capture graph changes using Neptune Streams

August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink. Read the announcement in the AWS News Blog and learn more.

Many graph applications can benefit from the ability to capture changes to items stored in an Amazon Neptune database, at the point in time when such changes occur. Amazon Neptune now supports Neptune Streams, a fully managed feature of Neptune that reliably logs every change to your graph as it happens, in the order that it is made, and then makes these changes available via an HTTP REST API. Neptune Streams is currently available in Lab Mode.

In this blog post, I review the Neptune Streams feature, provision a streams based application that uses a polling framework provided by Neptune to integrate Neptune with an Amazon ElastiCache for Redis data store, and describe how you can use this same polling framework to build your own streams based graph applications.

Neptune Streams overview

Using Neptune Streams you can:

  • Replicate changes in a source Neptune database to another Neptune database
  • Replicate changes in a source Neptune database to another database such as Amazon DynamoDB or Amazon Aurora
  • Archive changes in a source Neptune database to Amazon S3
  • Index content that changes in a source Neptune database in Amazon OpenSearch
  • Calculate aggregations and graph metrics based on graph changes in a source Neptune database and cache them in Amazon ElastiCache
  • Publish changes in a source Neptune database to an Amazon Kinesis data stream, and use Amazon Kinesis Data Analytics to gain actionable insights into the changes in your graph data
  • Act on graph changes in a source Neptune database and issue additional queries against the graph data in order to modify or enrich the data or assert data constraints

If you enable Neptune Streams for an existing Neptune database, you may want to seed your target system with a copy of the source data before applying the changes from the stream. If your target is another Neptune database in the same Region as the source, you can use a snapshot to seed the target. For other replication scenarios, you can use neptune-export, which exports property graph data to either CSV or JSON, and RDF data to Turtle, to capture data from the source ready to be imported it into the target.

Stream records

When Neptune Streams is enabled, Neptune makes changes to your graph data and adds change records to the stream using the same transaction. This ensures that the stream is consistent with the graph, and that changes published on the stream appear in the same order as they occurred in the graph.

Neptune’s graph data is composed of quads. Requests to a Neptune Gremlin or SPARQL endpoint are executed in the context of an implicit transaction. Write requests typically create, modify, and delete more than one element (quad) per transaction. A Gremlin request to create a vertex with two properties results in three elements being created. An example of this is:

g.addV('Person').property('firstName', 'John').property('lastName', 'Smith')

Each of these elements appears as a separate record in the stream, stamped with the commit number – and, within the commit, the operation number – of the transaction to which it belongs:

{
  "lastEventId": {
    "commitNum": 1,
    "opNum": 3
  },
  "lastTrxTimestamp": 1570703182847,
  "format": "GREMLIN_JSON",
  "records": [
    {
      "eventId": {
        "commitNum": 1512,
        "opNum": 1
      },
      "data": {
        "id": "86f89fb0-d214-41eb-812c-6f89285d4e1f",
        "type": "vl",
        "key": "label",
        "value": {
          "value": "Person",
          "dataType": "String"
        }
      },
      "op": "ADD"
    },
    {
      "eventId": {
        "commitNum": 1,
        "opNum": 2
      },
      "data": {
        "id": "86f89fb0-d214-41eb-812c-6f89285d4e1f",
        "type": "vp",
        "key": "firstName",
        "value": {
          "value": "John",
          "dataType": "String"
        }
      },
      "op": "ADD"
    },
    {
      "eventId": {
        "commitNum": 1,
        "opNum": 3
      },
      "data": {
        "id": "86f89fb0-d214-41eb-812c-6f89285d4e1f",
        "type": "vp",
        "key": "lastName",
        "value": {
          "value": "Smith",
          "dataType": "String"
        }
      },
      "op": "ADD"
    }
  ],
  "totalRecords": 3
}

Stream endpoints

You retrieve records from a stream by issuing an HTTP GET request to your database’s SPARQL or Gremlin stream endpoint. Query string parameters allow you to control the point in the stream from which you start reading, and the number of records returned. The following example queries the Gremlin stream endpoint for the first three property graph change records at the beginning of the stream:

curl "https://<neptune-endpoint>:8182/gremlin/stream?iteratorType=TRIM_HORIZON&limit=3"

TRIM_HORIZON is one of three iterators that allow you to control the point from which your reader starts querying the stream. See the documentation for details of this and other parameters.

Integrating Neptune Streams with AWS Lambda

A common pattern when building streams based applications on AWS is to use an AWS Lambda function to process records from an Amazon Kinesis data stream, Amazon Kinesis Data Firehose delivery stream, or Amazon DynamoDB stream.

With this first release of Neptune Streams, you can integrate Lambda with Neptune Streams using a polling framework provided by Neptune through a CloudFormation template. This framework allows you to author a stream handler, and then register it with a Lambda function provided by the polling framework. The framework uses an AWS Step Functions and DynamoDB based workflow to schedule the host Lambda’s execution and checkpoint the stream processing.

Neptune Streams are not sharded. This is to ensure that all the changes to your graph are written to a stream and then made available to stream readers in the same order in which they were applied to the database. Neptune’s polling framework ensures that a single instance of the host Lambda function polls the stream for any given use case. You can, however, run multiple instances of the polling framework together with other stream reader applications against the stream in order to run different stream processing use cases concurrently. There is no limit on the number of stream readers that you can create and use concurrently in this fashion.

Using Neptune Streams and AWS Lambda to build event driven metrics

The demo application described in this post simulates the growth of a social network. Whenever an edge is added to this network, the corresponding change record on the Neptune stream causes a handler hosted by a Lambda function and executed by the polling framework to calculate the indegree centrality of the edge’s ‘out’ or ‘to’ vertex and store the result in ElastiCache for Redis.

The indegree centrality of a vertex is a count of the number of incoming edges attached to that vertex. Centrality measures can be used to determine the relative importance of each vertex in a graph. In our social network example, indegree centrality can be interpreted as an indicator of a person’s ‘popularity’: the more popular a person is, the higher their indegree centrality value.

For relatively small graphs, those comprising several hundred or a few thousand vertices, we can calculate the indegree centrality of all vertices with a simple Gremlin query (see the TinkerPop Recipes for this and other centrality calculations):

g.V().group().by().by(inE().count())

However, for large graphs containing many thousands or even millions of vertices, this calculation can take a long time to complete. The same is true of many other operations that have to touch all or most of the graph in order to compute a result. For operations intended to calculate an aggregation or graph metric, we can often replace a single, long running ‘graph global’ query with an event driven solution that recalculates or populates some part of the results on a per event basis – triggered by a vertex or edge being added or deleted, or a property value changing, for example. This is the solution we’ve adopted here, with Neptune Streams providing a series of events that trigger a local recalculation of indegree centrality for the target vertex contained in each event.

By storing the results in a cache, we allow immediate access to the required aggregation, measure, or metric. With our sample application, we can query for the top three most popular members of our social network and return their details, including their centrality measures, with millisecond response times, irrespective of the size of the graph.

Solution overview

The solution presented in this blog post creates the following resources:

  • Neptune VPC with three private and three public subnets
  • Neptune cluster comprising a single r4.xlarge instance, with appropriate subnet, parameter, and security groups
  • Neptune workload Lambda functions for writing data to Neptune
  • ElastiCache for Redis cluster with appropriate subnet group
  • Lambda, DynamoDB, and Step Functions based polling framework with DynamoDB and CloudWatch VPC endpoints
  • Amazon SageMaker Jupyter notebook instance with Neptune and ElastiCache query content

  1. The Workload Manager AWS Lambda function starts and stops the Neptune Workload Lambda function.
  2. The Neptune Workload Lambda function writes vertices and edges to the Amazon Neptune database, simulating the growth of a social network.
  3. Changes to the graph data are published to the Neptune stream.
  4. The Stream Handler hosted by the polling framework’s Lambda function polls the Neptune stream for batches of change records. For each new vertex, it updates the vertex count in the Amazon ElastiCache cluster. For each new edge, it queries Neptune to recalculate the indegree centrality of the edge’s target vertex, and publishes the result to the ElastiCache cluster.
  5. You can use an Amazon SageMaker hosted Jupyter notebook to query the ElastiCache and Neptune clusters, retrieve the vertex count value and review the details of the vertices with the highest degree cardinalities.

The components in the gray box represent the core of the solution. The components outside the gray box act as a data generator and data client for the purposes of the demo. The underlying polling framework components, including an AWS Step Functions workflow and Amazon DynamoDB table, have been omitted from the diagram.

Amazon ElastiCache for Redis was chosen to cache the results of the centrality calculations because, being an in-memory data store, it offers submillisecond latencies for both reads and writes. As a fully managed service, it automates common management tasks, provides for high availability, and is easily scaled as the application grows. ElastiCache provides both Redis and Memcached compatible engines. The Redis engine was chosen for this solution because it offers sorted sets, which we use to provide fast ranking based on centrality.

Launch the solution

Launch the solution stack from the AWS CloudFormation console by choosing one of the Launch Stack buttons in the following table. Acknowledge that AWS CloudFormation will create IAM resources, and then choose Create. It takes about twenty minutes to create the stack.

Region View Launch
US East (N. Virginia) View
US East (Ohio) View
US West (Oregon) View
Asia Pacific (Mumbai) View
Asia Pacific (Seoul) View
Asia Pacific (Singapore) View
Asia Pacific (Sydney) View
Asia Pacific (Tokyo) View
EU (Frankfurt) View
EU (Ireland) View
EU (London) View
EU (Stockholm) View

The stack creates three output parameters:

  • SageMakerNotebook – Link to an Amazon SageMaker hosted Jupyter notebook that you can use to query the ElastiCache and Neptune clusters.
  • StartWorkload – CLI command you can use to start the AWS Lambda based Neptune workload
  • StopWorkoad – CLI command you can use to stop the Neptune workload

Open your Jupyter notebooks

Click on the SageMakerNotebook link. In the Jupyter window, open the Neptune/neptune-streams directory. This directory contains two notebooks:

  • centrality-counts.ipynb – Queries the ElastiCache and Neptune clusters for details of the top three most popular members of a social network whose data is stored in Neptune.
  • stream-viewer.ipynb – Shows a tabular view of the contents of the Neptune stream, allowing you to browse backwards and forwards in the stream.

Open the centrality-counts notebook and choose Run All from the Cell dropdown menu. If you haven’t yet started the Neptune workload, the results will be empty. A preview of the centrality-counts notebook with empty results is shown below.

Start the Neptune workload

This next step assumes that you have installed the AWS Command Line Interface (CLI) on your local machine.

Open a terminal and run the StartWorkload CLI command from the CloudFormation output. This will trigger the Neptune Workload Lambda function, which writes vertices and edges to Neptune, simulating the growth of a social network. The workload initially seeds the network with a thousand vertices, and thereafter creates a mix of vertices and edges.

Wait a few seconds (the Step Functions workflow may currently be in an idle state from it not having found any stream records in the period just prior to you starting the workload) and then re-run the centrality-counts notebook. This time you should see some centrality details for the most popular members of the network. A preview of the centrality-counts notebook with the most popular members of the network is shown below.

View the Neptune stream contents

Open the stream-viewer notebook and choose Run All from the Cell dropdown menu. The notebook displays 10 records from the beginning of the Neptune stream. You can use the slider to position the viewer elsewhere in the stream, and the Next button to advance to the next 10 records. A preview of the stream viewer is shown below.

Code overview

The code for the demo’s Neptune stream handler function can be found in neptune_stream_handler.py in the neptune-streams-demo package. This package also contains the code for the Workload Manager and Neptune Workload Lambda functions.

neptune_stream_handler.py contains three classes: VertexMetrics, VertexMetricsService, and NeptuneStreamHandler.

VertexMetrics uses a Redis client to store vertex metrics, including vertex counts and vertex degree centrality values, in ElastiCache:

class VertexMetrics:
    
    def __init__(self, elasticache_endpoint):
        self.ec = redis.StrictRedis(host=elasticache_endpoint, port=6379)
        
    def most_popular_vertices(self):
        return self.ec.zrevrange('degree_centrality', 0, 2, withscores=True)
        
    def vertex_count(self):
        return self.ec.get('vertex_count')
        
    def set_vertex_count(self, count):
        self.ec.set('vertex_count', count)
        
    def increment_vertex_count(self):
        self.ec.incr('vertex_count')
        
    def decrement_vertex_count(self):
        self.ec.decr('vertex_count')
        
    def update_degree_centrality(self, v_id, centrality):
        self.ec.zadd('degree_centrality', {v_id:centrality})

The VertexMetricsService uses a Gremlin client to query Neptune to calculate a vertex’s indegree centrality, and a VertexMetrics instance to update a vertex’s metrics in ElastiCache with its newly calculated indegree centrality value:

class VertexMetricsService:
    
    def __init__(self, neptune_endpoint, elasticache_endpoint):
        GremlinUtils.init_statics(globals())
        gremlin_utils = GremlinUtils(Endpoints(neptune_endpoint=neptune_endpoint))
        self.vertext_metrics = VertexMetrics(elasticache_endpoint)
        self.neptune_connection = gremlin_utils.remote_connection()
        self.g = gremlin_utils.traversal_source(connection=self.neptune_connection)
        
    def __init_vertex_count(self):
        count = self.g.V().count().next()
        self.vertext_metrics.set_vertex_count(count)

    def __increment_vertex_count(self):
        if self.vertext_metrics.vertex_count() is None:
            self.__init_vertex_count()
        self.vertext_metrics.increment_vertex_count()
        
    def __decrement_vertex_count(self):
        if self.vertext_metrics.vertex_count() is None:
            self.__init_vertex_count()
        self.vertext_metrics.decrement_vertex_count()
        
    def __update_degree_centrality(self, v_id):
        centrality = self.g.V(v_id).inE().count().next()
        self.vertext_metrics.update_degree_centrality(v_id, centrality)
        
    def handle_event(self, op, data):
        
        type = data['type']

        if op == ADD_OPERATION:
            if type == 'vl':
                self.__increment_vertex_count()
            if type == 'e':
                self.__update_degree_centrality(data['to'])
                
        if op == REMOVE_OPERATION:
            if type == 'vl':
                self.__decrement_vertex_count()
            if type == 'e':
                self.__update_degree_centrality(data['to'])
            
        
    def close(self):
        self.neptune_connection.close()

The VertexMetricService.handle_event() method is called for each record in the Neptune stream. If the record describes an operation that adds or removes a vertex, the service increments or decrements the vertex count in VertexMetrics accordingly. If the record describes an operation that adds or removes an edge, the service recalculates the indegree centrality for the ‘out’ or ‘to’ vertex of the edge via a query back to Neptune. The method ignores all other operations.

The last class here, NeptuneStreamHandler, contains the handle_records() handler method that will be invoked by the polling framework’s host Lambda function:

class NeptuneStreamHandler(AbstractHandler):

    def handle_records(self, stream_log):
        
        params = json.loads(os.environ['AdditionalParams'])
        svc = VertexMetricsService(
            params['neptune_endpoint'], 
            params['elasticache_endpoint'])
        
        records = stream_log[RECORDS_STR]
        
        try:
            for record in records:
            
                svc.handle_event(record[OPERATION_STR], record[DATA_STR])
                yield HandlerResponse(
                    record[EVENT_ID_STR][OP_NUM_STR], 
                    record[EVENT_ID_STR][COMMIT_NUM_STR], 
                    1)
            
        except Exception as e:
            logger.error('Error occurred - {}'.format(str(e)))
            raise e
        finally:
            svc.close()

This handler method constructs a VertexMetricsService, passing it the Neptune and ElastiCache endpoints retrieved from the AdditionalParams environment variable. We describe how AdditionalParams can be used to pass custom configuration information to your handler later in this post. The handler then pulls a batch of records from the stream and invokes VertexMetricService.handle_event() for each record.

Each time a record is processed, the handler yields a HandlerResponse containing the operation number, the commit to which the operation belongs, and the count of the number of records processed since the last time the handler yielded a response. The framework then uses this response to update its lease, thereby ensuring that in the next poll, the records that are given to the handler follow on from the ones that have just been processed successfully.

Create your own polling application

To create your own polling application you must author a stream handler, publish it to S3, and then create a polling workflow using our predefined CloudFormation template. This CloudFormation template creates the Lambda polling framework. The framework uses a host Lambda function to execute your stream handler, and a Step Functions and DynamoDB based workflow to schedule and checkpoint the stream processing.

Authoring a stream handler to be used by the polling framework is similar to authoring a Lambda function. Your streams handler class must inherit from the polling framework’s AbstractHandler and implement a handle_records() method. The following code shows a simple handler that logs each record in the Neptune stream:

import lambda_function
import logging
from commons import *
from handler import AbstractHandler, HandlerResponse

logger = logging.getLogger('MyHandler')
logger.setLevel(logging.INFO)

class MyHandler(AbstractHandler):

  def handle_records(self, stream_log):
        
    records = stream_log[RECORDS_STR]
    
    last_op_num = None
    last_commit_num = None    
    count = 0
    
    try:
      for record in records:
      
        # Handle record
        logger.info(record)
      
        # Update local checkpoint info
        last_op_num = record[EVENT_ID_STR][OP_NUM_STR]
        last_commit_num = record[EVENT_ID_STR][COMMIT_NUM_STR]
      
        count += 1
      
    finally:

      try:   
        yield HandlerResponse(last_op_num, last_commit_num, count)     
      except Exception as e:
        logger.error('Error occurred - {}'.format(str(e)))
        raise e

When the polling framework’s host Lambda function executes your handler, the function passes the handler a batch of stream records up to the number specified by the StreamRecordsBatchSize CloudFormation parameter. We discuss the CloudFormation parameters in more detail below.

Checkpointing

The handle_records() method has a stream_log parameter that the framework uses to pass your handler a batch of stream records. From within the handle_records() method, you are expected to yield one or more HandlerResponse objects containing the details of the last record successfully processed (its operation and commit numbers) and the count of records processed since the last time the handler yielded a response.

In the example shown above, we yield a response after having processed all the records in the batch. In our earlier vertex metrics example, we yielded a response after each record. The demo code includes a finally() block that ensures that if the handler fails while processing a record, the handler still attempts to yield a response with the details of the last record that it successfully processed. As a result, following a record processing failure, the next time the framework invokes the handler, the handler will start from the failed record that caused the previous invocation to terminate.

Additional parameters

Many handlers will require additional configuration in order to function correctly. For example, our demo’s handler, which queries Neptune to calculate the degree centrality for each new edge’s target vertex and then stores the result in an ElastiCache instance, requires endpoint information for both the Neptune and ElastiCache clusters.

You can supply additional parameters to your handler via the AdditionalParams CloudFormation parameter. The string value you supply to the CloudFormation template can then be accessed from your handler via an AdditionalParams environment variable.

For our demo application, we’ve chosen to supply the Neptune and ElastiCache endpoint values as a JSON object:

{
  "elasticache_endpoint": "neptune-streams-xxx.euw2.cache.amazonaws.com",
  "neptune_endpoint": "neptune-streams-xxx.eu-west-2.neptune.amazonaws.com"
}

Publishing your handler

Once you’ve finished authoring your stream handler, create a deployment package in the form of a ZIP archive and upload it to S3.

CloudFormation

To install and run your stream handler, create a CloudFormation stack using the polling framework’s root CloudFormation template:

Region View Launch
US East (N. Virginia) View
US East (Ohio) View
US West (Oregon) View
Asia Pacific (Mumbai) View
Asia Pacific (Seoul) View
Asia Pacific (Singapore) View
Asia Pacific (Sydney) View
Asia Pacific (Tokyo) View
EU (Frankfurt) View
EU (Ireland) View
EU (London) View
EU (Stockholm) View

The template has over 20 parameters, allowing it to be configured for a wide variety of use cases. Some parameters are optional, several have default values, but a few require populating with values specific to your environment.

Environment

  • Supply an ApplicationName. This will be used to refer to the resources created by the CloudFormation template. Our demo application is called vertex-metrics.
  • Supply the networking details for the VPC in which the Lambda polling framework will run. This will usually be the same VPC as your Neptune VPC. The required networking parameters include VPC, SubnetIds, SecurityGroupIds, and RouteTableIds.

Neptune

  • Supply the NeptuneStreamEndpoint. This is of the form https://<cluster>:<port>/gremlin/stream or https://<cluster>:<port>/sparql/stream.
  • If your source Neptune database has IAM database authentication enabled, you must set IAMAuthEnabledOnSourceStream to true, and supply the StreamDBClusterResourceId.

Handler

  • Supply the LambdaS3Bucket and LambdaS3Key for your handler package.
  • Supply the name of your StreamRecordsHandler. In our demo application our NeptuneStreamHandler class is located in a neptune_stream_handler file at the root of our ZIP archive: hence the StreamRecordsHandler value is NeptuneStreamHandler.

The remaining handler parameters are all optional:

  • If your handler requires any additional parameters, supply an AdditionalParams string value. As discussed above, in our demo application we pass a JSON object containing the Neptune and ElastiCache endpoints to our handler.
  • LambdaMemorySize, LambdaRuntime, and LambdaLoggingLevel allow you to configure the Lambda runtime. By default, the LambdaMemorySize is set to 128 MB.
  • If your handler requires additional permissions to access other AWS resources, you can attach additional IAM-managed policies to the host Lambda function’s execution role. Supply a comma-delimited list of the Amazon Resource Names (ARNs) of the policies containing these additional permissions to the ManagedPolicies
  • StreamRecordsBatchSize, MaxPollingInterval, and MaxPollingWaitTime allow you to control the number of records retrieved per batch, the host Lambda function’s timeout value in seconds, and the wait time, in seconds, between successive polls of the stream. The host Lambda function will continue processing batches from the stream until its execution time reaches approximately 90% of the MaxPollingInterval. It then waits up to MaxPollingWaitTime before polling once again. For continuous polling, set MaxPollingWaitTime to 0.

Polling framework

All of the remaining CloudFormation parameters are optional:

  • DDBReadCapacity and DDBWriteCapacity control the provisioned throughput for the DynamoDB table used for checkpointing. The default value for both parameters is 5.
  • StepFunctionFallbackPeriod and StepFunctionFallbackPeriodUnit control the recovery period for Step Function failures.
  • CreateDDBVPCEndPoint and CreateMonitoringEndPoint create the necessary DynamoDB and CloudWatch endpoints in your VPC. Both parameters are true by default. If an endpoint already exists in your VPC, set the corresponding CloudFormation parameter to false.
  • If you want to be notified via email whenever polling fails more than twice in a row, set CreateCloudWatchAlarm to true and supply a NotificationEmail. By default, CreateCloudWatchAlarm is set to false.

While at first this looks like a lot of parameters, for many applications you need only supply values for ApplicationName, VPC, SubnetIds, SecurityGroupIds, RouteTableIds, LambdaS3Bucket, LambdaS3Key, and StreamRecordsHandler.

Conclusion

Neptune Streams makes it easy to react to changes in your graph data as they happen. Using a stream handler, you can react to events in your graph database and publish changes in your graph data to other AWS Managed Services. In this post I’ve shown how you can use the AWS Lambda polling framework provided by a Neptune CloudFormation template to host a stream handler that calculates vertex metrics and publishes the results to Amazon ElastiCache. By authoring your own handlers, you can quickly and easily build event driven graph applications.

Additional resources

The code for the demo, including the CloudFormation templates, Jupyter notebooks, stream handler, and Lambda functions, can be downloaded from the Amazon Neptune Samples GitHub repository.

The Amazon Neptune resources homepage has links to documentation, blog posts, videos, and code repositories with samples and tools.

Before you begin designing your database, we also recommend that you consult the AWS Reference Architectures for Using Graph Databases, where you can inform your choices about graph data models and query languages, and browse examples of reference deployment architectures.

 

 


About the Author

 

Ian Robinson is a member of the Database Services Customer Advisory Team at AWS. He is a co-author of ‘Graph Databases’ and ‘REST in Practice’ (both from O’Reilly) and a contributor to ‘REST: From Research to Practice’ (Springer) and ‘Service Design Patterns’ (Addison-Wesley).