亚马逊AWS官方博客
Amazon Redshift 的新增功能 – 适用于 Kinesis Data Streams 的流式摄取和适用于 Apache Kafka 的托管流式处理正式推出
十年前,在我加入 AWS 仅几个月后,Amazon Redshift 就正式推出。多年来,该工具加入诸多功能以提高性能并更易于使用。借助 Amazon Redshift,您现在可以使用 SQL 跨数据仓库、运营数据库和数据湖分析结构化和半结构化数据。最近,Amazon Redshift Serverless 正式发布,可让用户更轻松地运行和扩展分析,而无需管理数据仓库基础设施。
为了尽快处理来自实时应用程序的数据,客户正在采用流式处理引擎,例如 Amazon Kinesis 和 Amazon Managed Streaming for Apache Kafka。以前,要将流式数据加载到您的 Amazon Redshift 数据库中,您必须配置一个流程,以便于加载之前在 Amazon Simple Storage Service (Amazon S3) 中暂存数据。这样做会产生一分钟或更长时间的延迟,具体取决于数据量。
今天,我很高兴与大家分享 Amazon Redshift Streaming Ingestion 的正式发布。借助这项新功能,Amazon Redshift 每秒可以从 Amazon Kinesis Data Streams 和 Amazon MSK 将数百兆字节的数据提取到 Amazon Redshift 实体化视图中,然后在几秒钟内对其进行查询。
流式摄取受益于通过实体化视图优化查询性能的能力,并可让用户更有效地使用 Amazon Redshift 进行运营分析和作为实时控制面板的数据源。流式摄取的另一个有趣使用案例是分析来自游戏玩家的实时数据,以优化他们的游戏体验。这种全新的集成还有助于实现物联网设备分析、点击流分析、应用程序监控、欺诈检测和实时排行榜。
我们来看看该方法的实际应用。
配置 Amazon Redshift Streaming Ingestion
除了管理权限外,可以在 Amazon Redshift 中完全使用 SQL 配置 Amazon Redshift 流式摄取。这对于缺少 AWS 管理控制台访问权限或配置 AWS 服务之间集成的专业知识的企业用户特别有用。
您可以分三个步骤设置流式摄取:
- 创建或更新 AWS Identity and Access Management (IAM) 角色以允许访问您使用的流式平台(Kinesis Data Streams 或 Amazon MSK)。请注意,IAM 角色应具有允许 Amazon Redshift 担任该角色的信任策略。
- 创建外部架构以连接到流式服务。
- 创建引用外部架构中流对象(Kinesis 数据流或 Kafka 主题)的实体化视图。
之后,您可以查询实体化视图以在分析工作负载中使用来自流的数据。流式摄取适用于 Amazon Redshift 预置的集群和新的无服务器选项。为了最大限度地提升简单性,我将在本演练中使用 Amazon Redshift Serverless。
为了准备环境,我需要一个 Kinesis 数据流。在 Kinesis 控制台中,我在导航窗格中选择 Data streams(数据流),然后选择 Create data stream(创建数据流)。对于 Data stream name(数据流名称),我使用 my-input-stream
,然后将所有其他选项设置为默认值。几秒钟后,Kinesis 数据流就准备就绪。请注意,默认情况下我使用的是按需容量模式。在开发或测试环境中,您可以选择具有一个分片的预置容量模式来优化成本。
现在,我创建了一个 IAM 角色来授予 Amazon Redshift 访问 my-input-stream
Kinesis 数据流的权限。在 IAM 控制台中,我使用以下策略创建了一个角色:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStreamSummary",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream"
],
"Resource": "arn:aws:kinesis:*:123412341234:stream/my-input-stream"
},
{
"Effect": "Allow",
"Action": [
"kinesis:ListStreams",
"kinesis:ListShards"
],
"Resource": "*"
}
]
}
为了让 Amazon Redshift 担任此角色,我使用了以下信任策略:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "redshift.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
在 Amazon Redshift 控制台中,我从导航窗格中选择 Redshift serverless,然后创建一个新的工作组和命名空间,类似于我在此博客文章执行的操作。创建命名空间时,在 Permissions(权限)部分中,我从下拉菜单中选择 Associate IAM roles(关联 IAM 角色)。然后,我选择刚才创建的角色。请注意,只有当信任策略允许 Amazon Redshift 担任该角色时,该角色才会可供选择。之后,我使用默认选项完成命名空间的创建。几分钟后,无服务器版数据库就可供使用。
在 Amazon Redshift 控制台中,我在导航窗格中选择 Query editor v2(查询编辑器 v2)。我通过从资源列表中选择来连接新的无服务器版数据库。现在,我可以使用 SQL 来配置流式摄取。首先,我创建一个映射到流式服务的外部架构。因为我将使用模拟的 IoT 数据作为示例,因此称之为外部架构传感器
。
CREATE EXTERNAL SCHEMA sensors
FROM KINESIS
IAM_ROLE 'arn:aws:iam::123412341234:role/redshift-streaming-ingestion';
为了访问流中的数据,我创建了一个从流中选择数据的实体化视图。通常,实体化视图包含基于查询结果的预先计算的结果集。在这种情况下,查询正在从流中读取,而 Amazon Redshift 是数据流的使用者。
流式数据将作为 JSON 数据进行摄取,因此我有两个选择:
- 将所有 JSON 数据保留在一列中,然后使用 Amazon Redshift 功能查询半结构化数据。
- 将 JSON 属性提取到自己的单独列中。
让我们看看这两种选择的优缺点。
SELECT
语句中的 approximate_arrival_timestamp
、partition_key
、shard_id
和 sequence_number
列由 Kinesis Data Streams 提供。流中的记录位于 kinesis_data
列中。refresh_time
列由 Amazon Redshift 提供。
为了将 JSON 数据保留在 sensor_data
实体化视图的单列中,我使用 JSON_PARSE 函数:
CREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS
SELECT approximate_arrival_timestamp,
partition_key,
shard_id,
sequence_number,
refresh_time,
JSON_PARSE(kinesis_data, 'utf-8') as payload
FROM sensors."my-input-stream";
CREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS
SELECT approximate_arrival_timestamp,
partition_key,
shard_id,
sequence_number,
refresh_time,
JSON_PARSE(kinesis_data) as payload
FROM sensors."my-input-stream";
我使用了 AUTO REFRESH YES
参数,因此当流中有新数据时,实体化视图的内容会自动刷新。
为了将 JSON 属性提取到 sensor_data_extract
实体化视图的单独列中,我使用 JSON_EXTRACT_PATH_TEXT 函数:
CREATE MATERIALIZED VIEW sensor_data_extract AUTO REFRESH YES AS
SELECT approximate_arrival_timestamp,
partition_key,
shard_id,
sequence_number,
refresh_time,
JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'sensor_id')::VARCHAR(8) as sensor_id,
JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'current_temperature')::DECIMAL(10,2) as current_temperature,
JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'status')::VARCHAR(8) as status,
JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'event_time')::CHARACTER(26) as event_time
FROM sensors."my-input-stream";
将数据加载到 Kinesis 数据流中
要将数据放入 my-input-stream
Kinesis Data Stream 中,我使用以下 random_data_generator.py
Python 脚本模拟来自 IoT 传感器的数据:
import datetime
import json
import random
import boto3
STREAM_NAME = "my-input-stream"
def get_random_data():
current_temperature = round(10 + random.random() * 170, 2)
if current_temperature > 160:
status = "ERROR"
elif current_temperature > 140 or random.randrange(1, 100) > 80:
status = random.choice(["WARNING","ERROR"])
else:
status = "OK"
return {
'sensor_id': random.randrange(1, 100),
'current_temperature': current_temperature,
'status': status,
'event_time': datetime.datetime.now().isoformat()
}
def send_data(stream_name, kinesis_client):
while True:
data = get_random_data()
partition_key = str(data["sensor_id"])
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey=partition_key)
if __name__ == '__main__':
kinesis_client = boto3.client('kinesis')
send_data(STREAM_NAME, kinesis_client)
我启动该脚本并查看正在放入流中的记录。它们使用 JSON 语法并包含随机数据。
查询来自 Amazon Redshift 的流式数据
为了比较两个实体化视图,我从每个视图中选择前十行:
- 在
sensor_data
实体化视图中,流中的 JSON 数据位于payload
列中。我可以使用 Amazon Redshift JSON 函数来访问以 JSON 格式存储的数据。 - 在
sensor_data_extract
实体化视图中,流中的 JSON 数据已提取到不同的列中:sensor_id
、current_temperature
、status
和event_time
。
现在,我可以在分析工作负载中使用这些视图中的数据,并且结合我的数据仓库、操作数据库和数据湖中的数据。我可以将这些视图中的数据与 Redshift ML 结合使用,以此训练机器学习模型或使用预测分析。由于实体化视图支持增量更新,因此这些视图中的数据可以有效地用作控制面板的数据源,例如,使用 Amazon Redshift 作为 Amazon Managed Grafana 的数据源。
可用性和定价
适用于 Kinesis Data Streams 的 Amazon Redshift 流式摄取和适用于 Apache Kafka 的托管流式处理现已在所有商用 AWS 区域正式推出。
使用 Amazon Redshift 流式摄取不会产生额外费用。有关更多信息,请参阅 Amazon Redshift 定价。
在数据仓库和数据湖中使用低延迟流式数据从未如此简单。欢迎与我们分享您使用此新功能构建了什么!
– Danilo