亚马逊AWS官方博客
AppSync实现行情数据订阅/发布
在一个交易系统中,一次完整的交易,大致可有三个步骤:接收行情→分析行情→发出买卖指令并成交。可以看出,行情数据是非常基本也是非常重要的部分。特别是对于量化高频的交易者来说,行情数据的精度和实时性就尤其重要。
AWS AppSync是一个完全托管的服务,允许在AWS Cloud中部署无服务器GraphQL后端。它提供了开发人员可以用来创建现代数据驱动应用程序的功能,允许从单个GraphQL端点轻松查询多个数据库、微服务和API。客户可以利用AppSync实时功能、离线数据同步、内置服务器端缓存、细粒度访问控制、安全性、使用GraphQL解析器支持API层中的业务逻辑等。在本文中,我们将重点介绍AWS AppSync内置功能,以实现实时行情发布订阅的用例。
推荐方案
如上图,需要发布的数据,通过flink聚合后,结果被Lambda程序推入到Appsync中;或者数据无需聚合,以单笔的方式直接发布到AppSync中。AppSync通过和客户端的websocket通道,把这些数据推送给订阅者。
在我们接下来的Demo中,我们使用MSK(AWS 托管的Kafka)作为流数据处理平台,使用Lambda作为流数据的消费端程序平台。客户在生产实现中,可以把Lambda程序替换为基于EC2的kafka消费程序,获得更低的时延。
准备
- Cloud9:程序开发环境
- MSK(Amazon Managed Streaming for Apache Kafka)
- Lambda函数(在VPC内)
1. 创建Kafka 集群
参见链接:https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/create-cluster.html
2.在Cloud9中关联kafka集群
3.为kafka创建主题
登入cloud9,请确保上面第2步已经完成。为kafka创建一个心的topic。参见链接:https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/create-topic.html 在这里我们使用multipartition作为主题的名称
4.创建虚拟行情数据生成程序
登入cloud9,创建一个新的python文件,文件请下载
https://github.com/sun-biao/appsync-demo/blob/main/ticker_kafka.py
修改代码中的bootstrap_servers 值,如果topic的名称发生了改变,请一并修改topicName参数。
5.创建一个VPC内的Lambda
我们使用Lambda来模拟行情推送程序,作用是从kafka拉取行情数据推送到发布系统上。这里的Lambda需要能够访问Kafka,请将Lambda配置到kafka所在的vpc内,并确保Lambda可以访问kafka和Internet. 参考链接下面链接。本文使用Python3.9 作为开发语言。 https://docs.aws.amazon.com/lambda/latest/dg/configuration-vpc.html
AppSync 设置
- 在console中打开Appsync服务https://us-west-2.console.aws.amazon.com/appsync/
- 点击Create API
- 选择Build from scratch 并点击右侧的Start
- 输入名称TickerDemo并点击Create
- 在创建好的AppSync API界面上,点击左侧Schema按钮,并在出现的输入框中粘贴如下内容,之后点击右上角Save Schema
- 点击界面左侧的Data Sources链接,然后点击右侧Create data source
- 在Data source name中输入名称tickerresolover,Data source type里选择 None,点击Create
- 回到Schema页面,找到右侧Mutation下的tickerlocal(…): Tickerdata!,点击右侧的Attach按钮。
- 在弹出的Data source name中选择刚刚创建的tickerresolover
- 在下面弹出的Configure the request mapping template输入框中复制如下语句:
- 点击右上角Save Resolver
- 点击左侧setting链接,记录下API URL和API KEY
配置行情推送程序
将kafka作为数据源与Lambda绑定
- 打开之前创建的Lambda程序,在Function overview中,点击Add trigger
- 数据源选择MSK,然后选中之前创建的MSK集群。
- 确认Enable trigger 被选中,Topic name 设置为之前创建的multipartition
- 点击保存
点击Lambda程序的Code标签,替换为如下代码。注意替换代码中的API_KEY和API_URL
- 配置Lambda程序的权限,使Lambda具有Kafka读取权限
模拟AppSync消费端程序
我们使用python程序来订阅AppSync中发布的数据。将文件https://github.com/sun-biao/appsync-demo/blob/main/appsyncrealtimeclient.py 下载到Cloud9,并修改文件中的API_KEY和API_URL。
测试
现在,Cloud9中有两个python程序,ticker_kafka.py 和 appsyncrealtimeclient.py。分别运行两个程序,会看到屏幕上会显示发送的消息和接收的消息,确定我们的订阅成功。
同时,我们也可以测试订阅的过滤功能,我们将 appsyncrealtimeclient.py文件中的
‘query’: ‘subscription mysub { tickerlocal { id ticker price } }’
修改为
‘query’: ‘subscription mysub { tickerlocal(ticker:”BTC”) { id ticker price } }’
我们就可以实现只订阅BTC数据的目标。
总结:
在这篇文章中,我们展示了如何利用AWS AppSync实时功能来解决行情系统中的一个重要用例。使用实时行情更新解决方案,您可以根据基于最佳实践设计的参考架构,在自己的AWS帐户中部署可扩展的无服务器架构。最重要的是,您不需要管理服务器或基础设施来实现可扩展和可靠的实时后端。您可以自定义解决方案和代码,现在可以通过访问解决方案页面开始构建它。
参考文档: