亚马逊AWS官方博客
使用 Amazon Redshift 设计数据湖架构的 ETL 和 ELT 模式:第 2 部分
先决条件
在开始之前,请确保您满足以下先决条件:
- 这篇文章使用了 US-West-2(俄勒冈)区域中可公开访问的 AWS 示例数据集。建议您使用 US-West-2(俄勒冈)区域进行测试运行,以降低因数据移动而造成的跨区域网络延迟和费用。
- 在同一区域拥有一个 AWS 账户。
- 您已向您的 AWS 账户授予
AdministratorAccess
策略(对于生产环境,应进一步限制此策略)。 - 您的数据湖中已有一个名为
eltblogpost
的 Amazon S3 存储桶,用于存储从 Amazon Redshift 卸载的数据。由于存储桶名称在所有 AWS 账户中必须是唯一的,因此请在提供的示例代码中使用合适的唯一存储桶名称替换eltblogpost
。 - 您已安装 AWS CLI 并配置为与您的 AWS 账户一起使用。
- 您拥有一个名为
redshift-elt-test-s3-policy
的 IAM 策略,并授予名为eltblogpost
的 Amazon S3 存储桶以下读取和写入权限: - 您拥有一个名为
redshift-elt-test-sampledata-s3-read-policy
的 IAM 策略,授予名为awssampledbuswest2
的 Amazon S3 存储桶(托管用于此演练的示例数据)只读权限。 - 您拥有一个名为
redshift-elt-test-role
的 IAM 角色,该角色为 redshift.amazonaws.com 和 glue.amazonaws.com 以及以下 IAM 策略(对于生产环境,您应根据需要进一步限制)所信任:redshift-elt-test-s3-policy
redshift-elt-test-sampledata-s3-read-policy
AWSGlueServiceRole
AWSGlueConsoleFullAccess
- 记下
redshift-elt-test-role
IAM 角色的 ARN。 - 您拥有具有以下参数的 Amazon Redshift 集群:
- 集群名称为
rseltblogpost
。 - 数据库名称为
rselttest
。 - 四个 dc2.large 节点。
- 名为
redshift-elt-test-role
的关联 IAM 角色。 - 一个公开可用的终端节点。
- 名为
eltblogpost-parameter-group
的集群参数组,用于更改并发扩展 - 集群工作负载管理设置为手动。
- 集群名称为
- 您拥有 SQL Workbench/J(或您选择的其他工具),并且可以成功连接到集群。
- 您在同一区域中拥有具有 PostgreSQL 客户端 CLI (psql) 的 EC2 实例,并且可以成功连接到集群。
- 您拥有一个名为
eltblogpost
的 AWS Glue 目录数据库,作为 Amazon Athena 和 Redshift Spectrum 查询的元数据目录。
将数据加载到 Amazon Redshift 本地存储
这篇文章使用星型模式基准 (SSB) 数据集。它在 S3 存储桶 (s3://awssampledbuswest2/ssbgz/
) 中公开提供,任何有权访问 Amazon S3 的经过身份验证的 AWS 用户都可以使用。
要将数据加载到 Amazon Redshift 本地存储,需完成以下步骤:
- 从 SQL Workbench/J 连接到集群。
- 通过 SQL Workbench/J 从 Github 存储库执行 CREATE TABLE 语句,以从 SSB 数据集创建表。下图显示了表列表。
- 从 Github 存储库执行 COPY 语句。此步骤使用
s3://awssampledbuswest2/ssbgz/
中可用的示例数据将数据加载到创建的表中。切记要将 ARN 替换为您先前记下的 IAM 角色 ARN。 - 要验证每个表是否正确加载,请运行以下命令:
以下结果表显示了 SSB 数据集中每个表的行数:
除了记录计数之外,您还可以检查每个表中的一些示例记录。
使用 Amazon Redshift 执行 ELT 和 ETL 并卸载到 S3
以下是本演练的大致步骤:
- 您希望从加载到 Amazon Redshift 本地存储中的销售点 (POS) 数据中,预先聚合一些您的最终用户经常问及的数据。
- 然后,您想要以开放的、分析优化和经过压缩的 Parquet 文件格式将聚合数据从 Amazon Redshift 卸载到数据湖 (S3)。您还希望对已卸载到数据湖中的数据优化分区,以帮助提高最终用户的查询性能并最终降低成本。
- 您想要使用 Redshift Spectrum 在数据湖中查询已卸载的数据。您还希望与其他 AWS 服务共享数据,例如:使用 Athena 的按使用量付费和无服务器临时和按需查询模型查询数据;使用 AWS Glue 和 Amazon EMR 对卸载的数据执行 ETL 操作,以及与储存在数据湖中的其他数据集(例如 ERP、财务或第三方数据)进行数据集成;以及通过 Amazon SageMaker 利用这些数据进行机器学习。
请执行以下步骤:
- 要计算必要的预聚合,请从您的 SQL Workbench/J 执行 Github 存储库上可用的以下三个 ELT 查询:
- ELT 查询 1 – 按制造商、类别和品牌,汇总每年、每月、每个供应商区域的收入进行查询。
- ELT 查询 2 – 按品牌,汇总每年、每月、每个供应商区域和城市的收入进行查询。
- ELT 查询 3 – 按客户所在城市、供应商所在城市、月份和年份按时间向下进行查询。
- 要将聚合数据以 Parquet 文件格式卸载到 S3,并进行适当分区的以帮助数据湖中已卸载数据的访问模式,请从您的 SQL Workbench/J 执行 Github 存储库上可用的三个 UNLOAD 查询。要使用 Redshift Spectrum 查询卸载的数据,您需要满足以下条件:
- 一个 Amazon Redshift 集群和一个可以连接到集群并执行 SQL 命令的 SQL 客户端(SQL Workbench/J 或您选择的其他工具)。该集群和 S3 中的数据文件必须位于同一区域。
- 在 Amazon Redshift 中设置外部架构,以引用外部数据目录中的数据库,并提供 IAM 角色 ARN 以授权您的集群代表您访问 S3。 最佳实践是在 AWS Glue 中拥有一个外部数据目录。您现在可以创建一个 AWS Glue 爬网程序。
- 在 AWS CLI 中,运行以下代码(替换<Your AWS Account>):
aws glue create-crawler --cli-input-json file://mycrawler.json --region us-west-2
您还可以根据使用场景设置爬网程序定期运行。例如,对于每 30 分钟卸载一次数据,您可以安排爬网程序每 35 分钟运行一次,以使 AWS Glue 目录表保持更新。但是,在本文中未配置任何计划。
- 创建 AWS Glue 爬网程序之后,请使用以下命令从 AWS CLI 手动运行它:
aws glue start-crawler --name "eltblogpost_redshift_spectrum_etl_elt_glue_crawler" --region us-west-2
- AWS Glue 爬网程序运行完成后,请转到 AWS Glue 控制台,以查看数据库
eltblogpost
下的以下三个 AWS Glue 目录表:monthly_revenue_by_region_manufacturer_category_brand
monthly_revenue_by_region_city_brand
yearly_revenue_by_city
- 现在您已经在 AWS Glue 中创建了一个名为
etlblogpost
的外部数据目录,接下来通过 SQL Workbench/J 使用以下 SQL 在名为eltblogpost
的持久集群中创建一个外部架构(替换<Your AWS Account>):您现在可以使用 Spectrum 查询您先前建立的三个 AWS Glue 目录表。
- 转到 SQL Workbench/J 并运行以下示例查询:
- 在 1992 年 3 月为非洲区域
贡献最多收入的十大品牌(按类别和制造商):
- 所有品牌在 1995 年在
美洲
地区的月收入: - 1992-1995 年 12 月的供应商所在城市
ETHIOPIA
4 的年收入:
- 在 1992 年 3 月为非洲区域
当数据在 S3 中并已在 AWS Glue 目录中完成分类时,您可以使用 Athena、AWS Glue、Amazon EMR、Amazon SageMaker、Amazon QuickSight 以及其他许多与 S3 无缝集成的 AWS 服务来查询相同的目录表。
使用 Redshift Spectrum 加速 ELT 和 ETL 并卸载到 S3
假设您需要使用熟悉的 SQL 在存储在数据湖 (S3) 冷存储中的大型数据集上,预先聚合一组最终用户经常请求的指标,然后将聚合的指标卸载到数据湖中以供下游使用。
以下是本演练的大致步骤:
- 这是一个批处理工作负载,需要对相当数量的关系和结构化数据进行标准 SQL 连接和聚合。您希望利用 Redshift Spectrum 对存储在 S3 中的数据执行所需的 SQL 转换,并将转换后的结果卸载回 S3。
- 您希望使用 Redshift Spectrum 从数据湖中查询已卸载的数据(如果您已拥有 Amazon Redshift 集群);使用 Athena 的按使用付费和无服务器临时按需查询模型查询其中的数据;使用 AWS Glue 和 Amazon EMR 对卸载的数据执行 ETL 操作,以及与数据湖中的其他数据集进行数据集成;以及通过 Amazon SageMaker 利用这些数据进行机器学习。
由于 Redshift Spectrum 让您可以直接从数据湖查询数据而无需加载到 Amazon Redshift 本地存储中,因此您可以使用 Redshift Spectrum 启动一个短期的集群以大规模执行 ELT,并在工作完成时终止集群。您可以使用 AWS CloudFormation 自动启动和终止短期集群。这样,您只需支付 Amazon Redshift 集群为工作负载提供支持期间的费用。短期集群还可以避免因来自实时用户的交互式查询而导致当前持久性集群超载。对于本文中的示例,请使用现有的集群 rseltblogpost。
本文使用由 AWS 提供的名为 tickit
的公共示例数据集,任何有权访问 S3 的经过身份验证的 AWS 用户都可以使用:
- Sales – s3://awssampledbuswest2/tickit/spectrum/sales/
- Event – s3://awssampledbuswest2/tickit/allevents_pipe.txt
- Date – s3://awssampledbuswest2/tickit/date2008_pipe.txt
- Users – s3://awssampledbuswest2/tickit/allusers_pipe.txt
出于性能原因,Redshift Spectrum 的最佳实践是将维度表加载到短期集群的本地存储中,并将外部表用于事实表 Sales
。
请执行以下步骤:
- 从 SQL Workbench/J 连接到集群。要使用 Redshift Spectrum 从数据湖 (S3) 查询数据,您需要具备以下条件:
- 一个 Amazon Redshift 集群和一个可以连接到集群并执行 SQL 命令的 SQL 客户端(SQL Workbench/J 或您选择的其他工具)。该集群和 S3 中的数据文件必须位于同一区域。
- 在 Amazon Redshift 中设置外部架构,以引用外部数据目录中的数据库,并提供 IAM 角色 ARN 以授权您的集群代表您访问 S3。最佳实践是在 AWS Glue 中拥有一个外部数据目录。
- 您已经创建的名为
eltblogpost
的 AWS Glue 目录数据库。 - Redshift 集群中您已创建的名为
spectrum_eltblogpost
的外部架构。
- 执行 Github 存储库上可用的 SQL,以名为
spectrum_eltblogpost
的相同外部架构创建名为sales
的外部表。如上一节中所示,您还可以使用 AWS Glue 爬网程序来创建外部表。 - 执行 Github 存储库上可用的 SQL 以创建维度表,以将数据加载到 Amazon Redshift 本地存储中,这是实现 Redshift Spectrum 性能的最佳实践。
- 执行 Github 存储库上可用的 COPY 语句,将
s3://awssampledbuswest2/tickit/
中的示例数据加载到维度表。将 IAM 角色 ARN 替换为您前面记下的与集群关联的 IAM 角色 ARN。 - 要验证每个表是否具有正确的记录计数,请执行以下命令:
以下结果表显示了
tickit
数据集中每个表的行数:除了记录计数之外,您还可以检查每个表中的一些示例记录。
- 要计算必要的预聚合,请从您的 SQL Workbench/J 执行 Github 存储库上可用的以下三个 ELT 查询:
- ELT 查询 1 – 在给定日期的总销售量。
- ELT 查询 2 – 出售给每个买家的总数量。
- ELT 查询 3 – 以历史总销售额计,处于 99.9% 百分位的活动。
- 要将聚合数据以 Parquet 文件格式卸载到 S3,并进行适当分区的以帮助数据湖中已卸载数据的访问模式,请从您的 SQL Workbench/J 执行 Github 存储库上可用的三个 UNLOAD 查询。
- 要使用 Redshift Spectrum 查询已卸载的数据,您可以创建一个新的 AWS Glue 爬网程序,也可以修改此前名为
eltblogpost_redshift_spectrum_etl_elt_glue_crawler
的爬网程序。从 AWS CLI 使用以下来代码更新现有的爬网程序(替换<Your AWS Account>):aws glue update-crawler --cli-input-json file://mycrawler.json --region us-west-2
- 成功创建爬网程序之后,使用以下命令从 AWS CLI 手动运行它:
aws glue start-crawler --name "eltblogpost_redshift_spectrum_etl_elt_glue_crawler" --region us-west-2
- 爬网程序运行完成后,请转到 AWS Glue 控制台。目录数据库
eltblogpost
中包含以下其他目录表:- total_quantity_sold_by_date
- total_quantity_sold_by_buyer_by_date
- total_price_by_eventname
- 现在可以使用 Spectrum 查询前面的三个目录表。转到 SQL Workbench/J 并运行以下示例查询:
-
- 2008 年 2 月和 3 月售出数量最多的 10 天:
- 2008 年 2 月和 3 月购买量排名前 10 的买家:
- 总价的前 10 个活动名称:
-
当数据在 S3 中并已在 AWS Glue 目录中完成分类时,您可以使用 Amazon Athena、AWS Glue、Amazon EMR、Amazon SageMaker、Amazon QuickSight 以及其他许多与 S3 无缝集成的 AWS 服务来查询相同的目录表。
使用并发扩展在 ELT 和卸载并行运行时进行扩展
假设您有混合工作负载并行运行,在打开并发扩展的情况下,在集群中并行运行 UNLOAD 查询和 ELT 作业。启用并发扩展后,当您需要处理的并发读取查询(包括 UNLOAD 查询)增加时,Amazon Redshift 会自动添加额外的集群容量。默认情况下,集群的并发扩展模式是关闭的。在本文中,您将为集群启用并发扩展模式。
请执行以下步骤:
-
- 转到名为
eltblogpost-parameter-group
的集群参数组,然后完成以下操作:- 将
max_concurrency_scaling_clusters
更新为5
。 - 为接下来步骤中的 UNLOAD 作业创建一个名为
Queue 1
的新队列,并将并发扩展模式设置为Auto
,再创建一个名为unload_query
的查询组。
- 将
- 进行这些更改之后,请重新启动集群以使更改生效。
- 在本文中,使用 psql 客户端从先前设置的 EC2 实例连接到集群
rseltblogpost
。 - 打开一个连接到 EC2 实例的 SSH 会话,然后将以下九个文件从 Github 存储库复制到 EC2 实例中的路径:/home/ec2-user/eltblogpost/ 。
- 查看
concurrency-elt-unload.sh
脚本,该脚本并行运行以下八个作业:- SSB 数据集的 ELT 脚本,该脚本一次启动一个查询。
-
tickit
数据集的 ELT 脚本,该脚本一次启动一个查询。 - 并行启动三个针对 SSB 数据集的卸载查询。
- 并行启动三个针对
tickit
数据集的卸载查询。
- 在脚本运行时运行
concurrency-elt-unload.sh
,您将看到以下示例输出:以下是脚本的响应时间:real 2m40.245s user 0m0.104s sys 0m0.000s
- 运行以下查询以验证某些 UNLOAD 查询在并发扩展集群中运行(在下面的查询输出中找到 “
which_cluster = Concurrency Scaling
”):SELECT query, Substring(querytxt,1,90) query_text, starttime starttime_utc, (endtime-starttime)/(1000*1000) elapsed_time_secs, case when aborted= 0 then 'complete' else 'error' end status, case when concurrency_scaling_status = 1 then 'Concurrency Scaling' else 'Main' end which_cluster FROM stl_query WHERE database = 'rselttest' AND starttime between '2019-10-20 22:53:00' and '2019-10-20 22:56:00’ AND userid=100 AND querytxt NOT LIKE 'padb_fetch_sample%' AND (querytxt LIKE 'create%' or querytxt LIKE 'UNLOAD%') ORDER BY query DESC;
请参阅查询的以下输出:
- 注释掉六个 UNLOAD 查询文件(
ssb-unload<1-3>.sql
和tickit-unload<1-3>.sql
)中的以下 SET 语句,以强制让所有六个 UNLOAD 查询在主集群中运行:set query_group to 'unload_query';
换言之,为 UNLOAD 查询禁用并发扩展模式。
- 运行
concurrency-elt-unload.sh
脚本。在脚本运行时,您将看到以下示例输出:以下是脚本的响应时间:real 3m40.328s user 0m0.104s sys 0m0.000s
以下显示了 Redshift 集群的工作负载管理设置:
- 运行以下查询以验证所有查询都在主集群中运行(在下面的查询输出中找到“
which_cluster = Main
”):SELECT query, Substring(querytxt,1,90) query_text, starttime starttime_utc, (endtime-starttime)/(1000*1000) elapsed_time_secs, case when aborted= 0 then 'complete' else 'error' end status, case when concurrency_scaling_status = 1 then 'Concurrency Scaling' else 'Main' end which_cluster FROM stl_query WHERE database = 'rselttest' AND starttime between '2019-10-20 23:19:00' and '2019-10-20 23:24:00’ AND userid=100 AND querytxt NOT LIKE 'padb_fetch_sample%' AND (querytxt LIKE 'create%' or querytxt LIKE 'UNLOAD%') ORDER BY query DESC;
请参阅查询的以下输出:
在启用并发扩展的情况下,端到端运行时改进了 37.5%(快了 60 秒)。
小结
本文分步演练了 Amazon Redshift 的常见 ELT 和 ETL 设计模式的一些简单示例。这些示例中使用了 Amazon Redshift 的一些关键功能,例如 Amazon Redshift Spectrum、并发扩展以及最近新增的对数据湖导出的支持。
与往常一样,AWS 欢迎反馈。欢迎在评论中提出想法或问题。
关于作者
- 转到名为