使用 Amazon EMR 和 Apache Spark 创建 ETL 管道

了解如何使用 Amazon EMR 和 Apache Spark 构建用于批处理的 ETL 管道。
发布时间:2023 年 3 月 30 日
数据工程
数据分析
Python
Spark
EMR
Apache Spark
S3
教程
亚马逊云科技
Olawale Olaleye
亚马逊云科技使用经验
200 - 中级
完成所需时间
30 - 45 分钟
所需费用
0.30 美元
前提条件

注册 / 登录 亚马逊云科技账户
有权创建亚马逊云科技资源的 IAM 用户。
Python 基本知识

上次更新时间
2023 年 3 月 30 日

对于为任何组织构建强大的数据工程管道而言,提取、转换、加载(ETL,有时称为摄取、转换、导出)都至关重要。从本质上讲,如果使用适当的工具和服务设计和构建 ETL 管道,它将为任何组织在批处理和实时处理方面带来较高的价值。但是,设计和构建这样一个管道非常耗时,而且相关大数据领域中的工具和框架数量如此之多,还需要不同的技能组合。所幸的是,如果您使用的是 EMR 和 Spark,这项任务就会非常容易。

在许多组织中,批量 ETL 都是一种常见的使用场景。本教程将提供一个起点,可以帮助您使用 Amazon EMR (Amazon Elastic MapReduce) 和 Apache Spark 在亚马逊云科技中构建更复杂的数据管道。下面介绍了具体操作步骤。

我们将使用 PySpark 与 Spark 集群进行交互。借助 PySpark,可以使用 Python API 编写 Spark 应用程序。

学习目标

在本指南中,您将:

  • 创建并设置 Amazon EMR 集群
  • 在 EMR 上提交 PySpark 作业
  • 集成 Amazon EMR 与 Amazon S3

让我们开始吧!

使用场景和问题陈述

在本教程中,假设您有一个供应商,该供应商会在每个月末提供增量销售数据。该文件以 CSV 文件的形式存储到 S3,然后需要对其进行处理,使其可供数据分析师进行查询和分析。

架构

为了实现此数据管道,我们将使用 EMR 集群,并使用 Spark 作为分布式处理引擎。我们还将使用 S3 存储:

  • RAW 数据(即输入和未处理的数据)
  • CLEANSED 数据(即输出和经过处理的数据)

我们需要构建这样一个数据管道:从 S3 存储桶获取这个新的销售文件,使用 Amazon EMR 进行所需的转换处理,并将清理和转换后的数据保存到目标 S3 存储桶,以便稍后使用 Amazon Athena 进行查询。

执行步骤

若要实现数据处理管道,我们需要创建一个 EMR 集群来运行 ETL 作业,还需要创建一个 S3 存储桶来存储原始数据和经过处理的数据。下面,我们开始执行集群任务吧。

步骤 1:创建 EMR 集群

创建 EMR 集群之前,我们需要创建一个 Key Pair,稍后需要使用它访问 EMR 集群的主节点。那么,我们马上创建一个吧。

1. 登录您的亚马逊云科技账户,前往 EC2 控制台,然后点击左侧菜单栏中的 Key Pairs(密钥对)。接着点击 Create Key Pair(创建密钥对)。

2. 为密钥对命名 (mykey-emr),然后点击 Create Key Pair(创建密钥对)。

3. 现在我们继续创建 Amazon EMR cluster。为此,在控制台上前往 Amazon EMR,然后点击 Create cluster(创建集群)以创建一个 EMR 集群。

4. 为 EMR 集群命名,这里将 Cluster name(集群名称)设置为 MyDemoEMRCluster 并进行如下选择:

  • Software configuration(软件配置)部分,选择 EMR 的最新版本
  • Application bundle(应用程序捆绑包)部分,选择 Spark
  • Security and access(安全与访问)部分,选择正确的 EC2 密钥对(在上一步中创建的密钥对)

保留所有其他选项的默认设置,然后点击 Create cluster(创建集群)。此操作将创建包含三个实例的集群。

5. 集群创建需要一些时间,几分钟后,您将看到集群已启动并运行,状态为 Waiting(这意味着集群现已准备就绪,正等待执行任何 ETL 作业)。

步骤 2:创建 Amazon S3 存储桶

现在,我们创建一个 Amazon S3 存储桶,并在其中创建两个子文件夹,用于存储 RAW 和 CLEANSED 数据。

1. 前往 Amazon S3 控制台,然后点击 Create bucket(创建存储桶)。

2. 创建一个存储桶(如 etl-batch-emr-demo)。

3. 创建存储桶后,创建两个子文件夹,分别命名为:

  • cleaned_data
  • raw_data

4. 将销售数据集 CSV 文件上传到存储桶中的文件夹 raw_data 下。

步骤 3:提交 PySpark 作业

我们现已将数据集上传到 S3,接下来可以从 EMR 集群提交 PySpark 作业了。

  1. 登录亚马逊云科技管理控制台,然后进入 Amazon EMR 控制台

  2. 在左侧导航窗格的 EMR on EC2(EC2 上的 EMR)下,选择 Clusters(集群),然后选择要在其中检索公共 DNS 名称的 myDemoEMRCluster 集群。

  3. 在集群详细信息页面的 Summary(摘要)部分,记下 Primary node public DNS(主节点公共 DNS)显示的值。

4. 从终端通过 SSH 访问 EMR 集群的主节点

ssh -i "mykey-emr.pem" root@ec2-18-219-203-79.us-east-2.compute.amazonaws.com

5. 复制 PySpark 代码 etl-job.py 并将其保存到主目录下的 Primary Node,然后进行以下更改并保存文件:

  • S3_INPUT_DATA = 's3://<YOUR_BUCKET_LOCATION_OF_RAW_DATA>'
  • S3_OUTPUT_DATA = 's3://<YOUR_BUCKET_LOCATION_OF_CLEANED_DATA>'

6. 提交 PySpark job,并等待作业完成。

sudo spark-submit etl-job.py 

7. 作业完成后,查看 S3 存储桶中的文件夹 cleaned_data,您将看到经过转换和处理的新数据,新数据以 Parquet 格式进行存储。

步骤 4:使用 Amazon Athena 验证输出

cleansed 数据现以 Parquet 格式存储在 Amazon S3 中,但为了让数据分析师或数据科学家更容易使用这类数据,最好能支持数据库表的形式,以便通过 SQL 查询数据。

若要实现该集成,可以执行以下两个步骤:

  1. 我们需要运行 Glue 爬网程序,基于 S3 数据创建一个 Amazon Glue Data Catalog 表。
  2. 完成后,我们可以在 Amazon Athena 中运行查询,验证输出。

步骤 5:创建 Amazon Glue Data Catalog

1. 前往 Amazon Glue 控制台,在左侧导航窗格中选择 Crawlers(爬网程序),然后点击 Create crawler(创建爬网程序)。

2. 为 Glue 爬网程序设置名称 (my-crawler-1)。

3. 对于数据源,添加包含清洗和处理后数据的 S3 存储桶 (s3://etl-batch-emr-demo/cleaned_data)。 

4. 创建 IAM 角色 (AWSGlueServiceRole-default) 并添加该角色。您可以创建一个角色并附加以下策略(有关更多详细信息,可以参阅此文档):

  • AWSGlueServiceRole 亚马逊云科技托管策略,该策略授予 Data Catalog 所需的权限

  • 内联策略,该策略授予数据源(位于 S3_INPUT_DATA)权限

5. 点击 Add database(添加数据库)创建一个数据库,然后从下拉菜单中选择该数据库 (my_demo_db)。 

6. 查看并验证所有详细信息,然后点击 Create crawler(创建爬网程序)。 

7. 创建爬网程序后,选择该爬网程序,然后点击 Run(运行)。

8. 爬网程序完成运行后,您将看到 detected tables。

我们现已创建了 Glue Data Catalog 表,接下来可以前往 Amazon Athena,使用 SQL 查询数据。

到目前为止,我们已经从 Amazon S3 中提取了数据,然后通过 Glue ETL (pySpark) 作业将数据转换为 Parquet 格式,从而转换了数据。最后,我们将使用 Amazon Athena 对清理后的数据进行分析。

步骤 6:使用 Amazon Athena 标准 SQL 查询输出数据

1. 打开 Athena 查询编辑器。可以将 Data source(数据源)保留为默认的 AwsDataCatalog,并选择 my_demo_db 作为 Database(数据库),如以下截图所示,然后运行以下查询。 

SELECT * FROM "my_demo_db"."cleaned_data" limit 10;

2. 现在,可以执行其他 SQL 查询来分析数据。例如,如果我们想知道每个 region per segment wise 的 forecast_monthly_revenue,可以运行以下查询:

SELECT 
 region, 
 segment, 
 SUM(forecasted_monthly_revenue) as forecast_monthly_revenue 
FROM "my_demo_db"."cleaned_data" 
GROUP BY segment, region;

清理资源

现在您已经完成了本实操教程,为避免产生额外的费用,接下来可以删除以下所有资源:

  • 删除 EMR 集群
  • 删除 Amazon S3 存储桶
aws s3 rb s3://<YOUR_BUCKET_LOCATION> --force
  • 删除 Glue 数据库

总结

恭喜您!您已完成使用 Amazon EMR 和 Apache Spark 创建 ETL 管道的教程。

在本教程中,我们学习了如何构建 ETL 管道,此类管道可以应用于各种批处理使用场景,如电子商务销售数据分析。我们还学习了如何从 S3 中提取数据,然后使用简单的 Glue ETL (pySpark) 作业按需转换数据。最后,通过 Amazon Athena 使用 SQL 对数据进行了分析。