Amazon Kinesis Data Analytics Studio 使客户能够轻松地实时分析流数据,并使用标准SQL、Python和Scala构建由Apache Flink 支持的流处理应用程序。 只需在AWS管理控制台中单击几下,客户就可以启动serverless notebook来查询数据流并在几秒钟内获得结果。
Apache Flink是一个用于处理数据流的开源框架和引擎。它具有高可用性和可扩展性,为流处理应用程序提供了高吞吐量和低延迟。Kinesis Data Analytics降低了构建和管理Apache Flink应用程序的复杂性。
运行Apache Flink工作负载的客户面临着一个艰巨的挑战: 开发他们的分布式流处理应用程序,却无法真正了解他们的应用程序执行的数据处理步骤。Kinesis Data Analytics Studio将Apache Zeppelin 笔记本电脑的易用性与Apache Flink处理引擎的强大功能相结合,在完全管理的产品中提供高级流式分析功能。此外,它还加快了流处理应用程序的开发和运行,从而不断生成实时见解。
在本文中,我们将向您介绍Kinesis Data Analytics Studio,并开始使用Apache Flink(Pyflink)的Python API从Amazon Kinesis数据流交互式查询数据。本文我们使用Kinesis Data Stream作为数据源。Kinesis Data Analytics Studio还与Amazon Managed Streaming for Apache Kafka(Amazon MSK)、Amazon Simple Storage Service(Amazon S3)以及Apache Flink支持的各种其他数据源兼容。
准备:
- Kinesis Data Stream
- Cloud9
创建 Kinesis Data Stream
进入Cloud9, 新建一个terminal,并执行下列cli创建一个名为teststream的消息队列:
$ aws kinesis create-stream \
--stream-name teststream \
--shard-count 1 \
--region ap-northeast-1
创建一个Kinesis Data Analytics Studio notebook
您可以通过以下步骤开始与数据流交互:
- 打开AWS管理控制台并导航至Amazon Kinesis/Data Analytics/Streaming applications
- 选择主页上的Studio选项卡,然后选择Create Studio Notebook。
- 选择Create with custom settings, 输入Studio笔记本的名称,并让Kinesis Data Analytics Studio为此创建AWS IAM角色。
- 选择一个AWS Glue数据库来存储Kinesis Data Analytics Studio使用的源和目标周围的元数据。
- 选择创建Studio notebook。
创建应用程序后,选择Run以启动Apache Flink应用程序。这将需要几分钟的时间来完成,此后可以点击Open in Apache Zeppelin打开。
在Cloud9中创建样本数据
在cloud9中创建ticker.py文件,并复制如下代码到文件内并保存
import datetime
import json
import random
import boto3
import time
STREAM_NAME = "teststream"
price = 100
def get_data():
global price
price = price + (random.random()*2-1)*10
price = 0 if price < 0 else price
return {
#'EVENT_TIME': datetime.datetime.now().isoformat(),
'ticker': random.choice(['BTC','ETH','BSC','SOL']),
'price': price,
'event_time': datetime.datetime.now().isoformat()
}
def generate(stream_name, kinesis_client):
while True:
data = get_data()
print(data)
time.sleep(1)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey="partitionkey")
if __name__ == '__main__':
generate(STREAM_NAME, boto3.client('kinesis'))
运行代码,程序会将模拟的数据发送到Kinesis中。这里注意,由于Cloud9使用临时凭证,所以有可能会出现token过期的问题,重新运行即可。
编写Studio代码实现自定义聚合
首先我们创建一个source table,用于定义数据的schema以及watermark:
%flink.ssql(type=update)
create table stock_from_flink (
ticker varchar(6),
price double,
event_time TIMESTAMP(3),
WATERMARK for event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH(
'connector' = 'kinesis',
'stream' = 'teststream',
'aws.region' = 'ap-northeast-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
点击右上角的执行按钮,然后添加新的paragraph并拷贝自定义聚合函数,这里我们定义计算窗口内的蜡烛图四元组,最大值,最小值,初始值和最新值:
%flink.pyflink
class CountAndSumAggregateFunction(AggregateFunction):
def get_value(self, accumulator):
return Row(accumulator[0], accumulator[1], accumulator[2], accumulator[3])
def create_accumulator(self):
return Row(-1, 0,-1,0)
def accumulate(self, accumulator, row: Row):
accumulator[0] = min(accumulator[0],row[1]) if accumulator[0] > 0 else row[1]
accumulator[1] = max(accumulator[1],row[1])
accumulator[2] = accumulator[2] if accumulator[2] > 0 else row[1]
accumulator[3] = row[1]
def retract(self, accumulator, row: Row):
pass
def merge(self, accumulator, accumulators):
pass
def get_accumulator_type(self):
return DataTypes.ROW(
[DataTypes.FIELD("minp", DataTypes.DOUBLE()),
DataTypes.FIELD("maxp", DataTypes.DOUBLE()),
DataTypes.FIELD("initialp", DataTypes.DOUBLE()),
DataTypes.FIELD("lastedp", DataTypes.DOUBLE())])
def get_result_type(self):
return DataTypes.ROW(
[DataTypes.FIELD("minp", DataTypes.DOUBLE()),
DataTypes.FIELD("maxp", DataTypes.DOUBLE()),
DataTypes.FIELD("initialp", DataTypes.DOUBLE()),
DataTypes.FIELD("lastedp", DataTypes.DOUBLE())])
function = CountAndSumAggregateFunction()
agg = udaf(function,
result_type=function.get_result_type(),
accumulator_type=function.get_accumulator_type(),
name=str(function.__class__.__name__))
创建新的paragraph,粘贴主程序,调用自定义聚合函数,我们定义一个10秒钟的窗口方便数据观察:
%flink.pyflink
input_table = st_env.from_path("stock_from_flink")
new_table3 = input_table.window(Tumble.over("10.seconds").on("event_time").alias("ten_seconds_window")) \
.group_by("ten_seconds_window, ticker") \
.aggregate(agg.alias("minp","maxp","initialp","lastedp")) \
.select(" ticker as ticker, minp as min_price, maxp as max_price, initialp as initial_price, lastedp as latest_price, ten_seconds_window.end as epoch_time")
最后一个paragraph我们用来展示我们聚合后的结果:
%flink.pyflink
z.show(new_table3, stream_type="update")
点击执行后,我们会很快接收到数据并生成窗口:
我们还可以通过不同的可视化图形来观察数据
同时,同传统的应用一样,Studio提供了Apache Flink Dashboard, 方便查询程序运行时的状况。
总结:
Kinesis Data Analytics Studio使Apache Flink应用程序开发的速度大大加快。此外,所有这些都是通过丰富的可视化、可扩展且用户友好的界面来实现的,并能协作开发,灵活选择语言,使得流式工作负载性能强大。用户可以按照本文所述段落,或者选择将他们升级为具有持久状态的针对Apache Flink的Kinesis Data Analytics应用程序。
参考文档:
https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html
https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-notebook.html
本篇作者