亚马逊AWS官方博客

在 Amazon EMR 上运行 PySpark 报表业务

关于Spark和Amazon EMR

在大数据领域,Apache Spark是非常流行的集群计算框架,用于快速处理、查询和分析大数据。Spark是基于Scala语言编写的框架,但如果您从未使用过Scala也无需担心。Spark除了提供Scala/Java支持外,还提供了Python API接口。使用PySpark,您可以通过Python直接使用Spark SQL、Spark Streaming和Spark MLlib等核心组件。大多数数据科学家和分析专家都习惯使用Python,因为它具有丰富的第三方库集。PySpark使更广泛的用户群体可以利用分布式处理大数据,而不仅局限于传统意义上的大数据工程师。

Amazon EMR服务提供托管的Hadoop环境,可以在几分钟内启动集群并自动完成Spark环境的配置。EMR提供PySpark、PySpark3支持,还自带Jupyter Notebook功能,可以方便地在NoteBook中对python代码进行调试。EMR还提供了多种高级功能,比如与AWS S3和Glue数据目录集成、可使用Spot实例降低成本、可通过EMR步骤功能提交多阶段任务等等。

Spark在业界有非常广泛的应用场景。在这篇博客中,我们会向您展示如何通过EMR快速、高效地编写和提交报表任务。本文中的示例将使用pyspark.sql模块操作Spark DataFrame,从S3中读取数据,经过一系列转换,再将合并报表保存到S3和Glue数据目录中。示例中所使用的数据为模拟数据,您可以在自有环境中通过开源工具生成任意大小的数据集进行PySpark的学习和实验。

 

数据集

本文使用TPC-H开源工具产生模拟数据集。TPC-H是一个用于支持商业决策的测试基准,它可以逼真地模拟数据仓库的实际应用环境,进行大量的数据分析、执行高复杂度的查询和回答关键的商业问题。我们将通过TPC-H的数据工具生成一些模拟数据表,并通过PySpark实现合并报表操作。

通过以下命令,可以在Amazon Linux2的EC2实例中生成总大小为1G的数据表并上传至指定的S3存储桶。在此之前,请先创建一个新的S3存储桶,并将命令中的<YOUR BUCKET NAME>替换为存储桶的名称。如果稍后想用更大对数据集和EMR集群进行实验,TPC-H工具也可以支持生成10G、100G甚至更大的数据集。该工具可生成的数据共包括客户信息、订单信息等在内的10张结构化数据表,不同数据表之间可以通过客户编号、订单编号、零部件编号等数个字段进行关联。详细的表结构可以参考此链接中的第13页。

sudo yum install -y git make gcc
git clone https://github.com/gregrahn/tpch-kit.git
cd tpch-kit/dbgen
make MACHINE=LINUX DATABASE=POSTGRESQL
export DSS_CONFIG=/home/ec2-user/tpch-kit/dbgen
export DSS_QUERY=$DSS_CONFIG/queries
mkdir $DSS_CONFIG/data
export DSS_PATH=$DSS_CONFIG/data
./dbgen -vf -s 1
aws s3 sync ./data/ s3://<YOUR BUCKET NAME>/

注:有关aws cli的安装,请参考https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-install.html

有关s3的命令使用,请参考https://docs.aws.amazon.com/cli/latest/reference/s3/

 

启动EMR Spark集群

通过控制台启动集群

要在 EMR中使用Spark,请EMR登录控制台,在创建集群界面点击“转到高级选项”,并在软件配置界面勾选Spark。在创建Spark集群时,建议同时勾选将AWS数据目录“用于Spark表元数据”。元数据包含用Spark创建的database、table等元信息。不同于Hive Metastore,Glue Catalog可以提供持久性的元数据存储,这些元数据的生命周期并不会随EMR集群终止,这便于您随时启动和关闭EMR集群来进行数据操作。

 

在EMR集群中使用Spot实例作为任务节点是节约成本的一种有效方法。任务节点是可选的计算型节点,不负责HDFS数据存储。根据数据大小和代码逻辑的复杂程度,可以适当配置任务节点来加速计算。Spot实例有可能在运行任务时被回收,但Spark在各个层面有多重失败重试机制,可以保证大多数情况下任务的继续运行。

如果需要通过SSH登录master节点,需要在安全性设置中事先选择EC2密钥对。然后点击“创建集群”。

 

PySpark编程和调试

通常来讲,PySpark shell可以作为一个简易的交互式入口。如果在创建集群时配置了EC2秘钥对和相应的安全组规则,可以SSH进入Master节点使用“pyspark”命令进入PySpark Shell。但通常notebook可以更好地支持编程和数据分析,便于进行添加叙述性文本、进行可视化等操作。在EMR中,可以直接使用EMR Notebook(一个置于 Amazon EMR 控制台中的 Jupyter Notebook 环境,详情请参考此链接)。

使用EMR Notebook

EMR Notebook可以通过控制台创建,并且不需要单独为Notebook支付任何费用。Notebook的内容存放在S3中,在集群关闭后仍然保留,可以加载到新的EMR集群进行使用。要创建EMR Notebook,在EMR控制台中依次点击“笔记本”-“创建笔记本”:

 

在Notebook准备就绪后,可以从控制台打开Jupyter Notebook并尝试编写代码:

 

使用Spark SQL API和DataFrame编写报表任务 

Spark SQL是Spark用于处理结构化和半结构化数据的一个高级模块。Spark SQL中的DataFrame是一种以命名列方式组织的分布式数据集。简单来讲,DataFrame类似于传统关系型数据库中的表结构,代表了对底层数据源(例如csv、json、parquet文件)的抽象化。Spark DataFrame的用法之一是进行SQL查询:在关系型数据库中对单表进行的查询操作,都可以在DataFrame中直接执行SQL语句实现。除此之外,DataFrame还提供了一套与Pandas DataFrame类似的API。对已经在Python和R中熟悉DataFrame的用户而言,可以快速熟悉PySpark DataFrame的API,对S3、HDFS和HBase等数据存储进行访问。

基于PySpark API提供的强大功能,我们可以快速编写出复杂的业务逻辑。常用的API功能包括:

创建SparkSession

Spark中所有功能的入口是SparkSession类。可以使用如下代码创建一个基本的SparkSession对象:

>>>from pyspark.sql import SparkSession
>>>spark = SparkSession.builder.getOrCreate()

在notebook中创建了SparkSession后,可以在EMR控制台中看到相应的应用程序记录。执行代码时,EMR Notebook通过livy接口向Spark集群传输指令。

读取存在S3中的数据

如果事先为EMR的角色配置了访问S3的权限,则可以在EMR中直接访问保存在S3中的数据。比如,当我们想要访问partsupp表中的数据时,可以使用如下的代码指定表结构并加载数据:

>>>from pyspark.sql.types import *
>>>path = 's3://<YOUR BUCKET NAME>/partsupp.tbl'
>>>schema = StructType([
    StructField('partkey',IntegerType(),True),
    StructField('suppkey',IntegerType(),True),
    StructField('availqty',IntegerType(),True),
    StructField('supplycost',DecimalType(),True),
    StructField('comment',StringType(),True)
])
>>>partsuppDF = spark.read.csv(str(path), schema=schema, sep='|')

打印数据和表结构

PySpark DataFrame提供了包括count、first、head、show、printSchema在内的常用API。详情可以参见pyspark.sql.DataFrame API文档

 

  • 打印数据结构:
>>>partsuppDF.printSchema()
root
 |-- partkey: integer (nullable = true)
 |-- suppkey: integer (nullable = true)
 |-- availqty: integer (nullable = true)
 |-- supplycost: decimal(10,0) (nullable = true)
 |-- comment: string (nullable = true)
  • 查询总行数:
>>>print("Count: ", partsuppDF.count())
Count:  40000000
  • 打印行:
>>>partsuppDF.first()
Row(partkey=1, suppkey=2, availqty=3325, supplycost=Decimal('772'), comment=', even theodolites. regular, final theodolites eat after the carefully pending foxes. furiously regular deposits sleep slyly. carefully bold realms above the ironic dependencies haggle careful')

>>>partsuppDF.head(1)
[Row(partkey=1, suppkey=2, availqty=3325, supplycost=Decimal('772'), comment=', even theodolites. regular, final theodolites eat after the carefully pending foxes. furiously regular deposits sleep slyly. carefully bold realms above the ironic dependencies haggle careful')]

>>>partsuppDF.take(1)
[Row(partkey=1, suppkey=2, availqty=3325, supplycost=Decimal('772'), comment=', even theodolites. regular, final theodolites eat after the carefully pending foxes. furiously regular deposits sleep slyly. carefully bold realms above the ironic dependencies haggle careful')]

>>>partsuppDF.show(n=2, truncate=5)
+-------+-------+--------+----------+-------+
|partkey|suppkey|availqty|supplycost|comment|
+-------+-------+--------+----------+-------+
|      1 |      2 |    3325 |       772 |  , ...|
|      1 |  12...|    8076 |       993 |  ve...|
+-------+-------+--------+----------+-------+
only showing top 2 rows

数据过滤

>>>partsuppDF.filter("partkey=1").show()
+-------+-------+--------+----------+--------------------+
|partkey|suppkey|availqty|supplycost|                comment|
+-------+-------+--------+----------+--------------------+
|       1|       2|     3325|         772|, even theodolite...|
|       1| 125002|     8076|         993|ven ideas. quickl...|
|       1| 250002|     3956|         337|after the fluffil...|
|       1| 375002|     4069|         358|al, regular depen...|
+-------+-------+--------+----------+--------------------+

列操作

  • 选择一列或多列:
>>>partsuppDF.select("partkey").show(2)
+-------+
|partkey|
+-------+
|       1|
|       1|
+-------+
  • 添加或替换数据列:
>>>partsuppDF.withColumn('newkey', partsuppDF['partkey']*2)
DataFrame[partkey: int, suppkey: int, availqty: int, supplycost: decimal(10,0), comment: string, newkey: int]

拼接(join)

常见的并表场景包括想要将多个小表横向合并为大表,这时可以直接使用DataFrame.join对数据进行操作。比如,我们想要将lineitem表和之前导入的partsupp表通过“suppkey”字段进行并表:

>>>lineitemDF = spark.read.csv(path, schema=schema, sep='|')
>>>lineitemDF = lineitemDF.join(partsuppDF, ['suppkey'], 'outer')

聚合(aggregation)

并表后可以根据实际业务需求对字段进行聚合。PySpark可以先使用选定的字段生成pyspark.sql.GroupedData对象,再通过agg函数进行聚合。pyspark.sql.functions中包含了聚合操作常用的求和、求平均数等功能。示例代码如下:

>>>import pyspark.sql.functions as f
>>>lineitemDF.groupby('orderkey').agg(f.count('linenumber'), 
                                             f.sum('supplycost').alias('sumcost'), 
                                             f.sum('profit').alias('sumprofit'), 
                                             f.avg('quantity').alias('avgquantity'), 
                                             f.avg('discount').alias('avgdiscount'))

将结果写入到S3和Glue Catalog

如果在创建集群时Glue数据目录设置中勾选了“用于Spark表元数据”的选项,我们就可以在写入S3的同时将DataFrame的元数据写入Glue数据目录。

 

>>>customerDF.write.format('parquet').mode('overwrite').option('path', 's3://<YOUR BUCKET NAME>/<YOUR TABLE NAME>/').saveAsTable('<YOUR TABLE NAME>')
 
       

 

使用EMR步骤功能提交PySpark任务

准备工作

在接下来的步骤中,我们将提供一个名为pyspark_job.py的示例脚本。您也可以选择自行编写脚本进行实验。如果要使用提供的示例脚本,在提交任务之前,请先进行如下的准本工作:

  • 在Glue中创建一个新的database
  • 将脚本中的<YOUR GLUE DATABASE NAME> 替换为新创建的database名称,并将<YOUR TABLE NAME>替换为想要的输出表格名称
  • 将脚本中的四处<YOUR BUCKET NAME>替换为之前存放数据文件的S3存储桶名称
  • 修改完成后保存并将py脚本上传至S3
  • 如果EMR默认角色没有足够的权限,可能会导致任务失败。这时可以在控制台中编辑EMR_EC2_DefaultRole并赋予其访问Glue的权限。生产环境中,请遵循权限最小化原则,这里为了简化实验步骤,可以直接添加AWS托管策略:AWSGlueServiceRole
  • 如果您的账户之前在Glue尚未上线时使用了Amazon Athena数据目录,需要升级到Glue数据目录才能正常运行示例脚本。详细步骤可参照此链接

 

通过控制台添加步骤功能

EMR提供了步骤功能,可以直接运行存储在S3中的代码,而无需登录主节点进行Spark-submit。此功能可以支持提交.py文件。

要添加步骤,选择“集群”-“步骤”-“添加步骤”。

 

“应用程序位置”处请选择之前上传至S3的pyspark_job.py文件路径:

提交成功后可以在“步骤”菜单中可以看到当前步骤的状态和日志信息。

 

创建EMR一次性集群运行PySpark任务

通过控制台创建一次性集群

对于一些要求低成本的离线任务场景,可以借助步骤功能提交PySpark作业的工作流,并在工作流完成后自动关闭集群。离线任务可以利用S3和Glue数据目录存储计算结果和元数据,实现计算和存储分离。

要创建一次性集群,在“创建集群”-“软件与步骤”-“添加步骤”中勾选“最后的步骤完成后,集群自动终止”。

步骤完成后,我们可以在控制台中看到集群自动终止。

 

对并表后的数据进行查询

报表任务完成之后,可以在相应的Glue数据目录中看到新建的表,以及S3路径下新生成的数据文件:

 

常见的业务场景可能会要求对聚合后的数据表进行即席查询(ad-hoc query),即用户根据自己的需求随时调整查询条件。这种情景下使用Presto可以获得极佳的性能。Amazon Athena可以轻松实现交互式SQL查询的需求,无需预配置服务器,只需要在控制台中输入SQL语句就可以对之前存入Glue中的数据目录进行即时查询,查询所需时间可能低至数秒。

 

如果您之前使用了提供的示例脚本进行任务,可以使用Athena进行查询。比如,我们想要获取营收排名前十的客户名单,可以在Athena控制台中输入如下SQL语句:

SELECT custkey,name,totalrevenue FROM “tpch-data”.”customer” ORDER BY totalrevenue DESC limit 10;

 

结语

在此博客中,我们介绍了如何高效地利用Amazon EMR和PySpark编写和运行报表任务。参照博客中的操作进行实践后,您应该已经上手了EMR,并且能够通过PySpark编写自定义的业务逻辑。

要了解其他PySpark API功能,请参考PySpark文档

要了解EMR,请参考Amazon EMR网页
 
 

本篇作者

柯逸楠

AWS解决方案架构师,具有丰富的数据分析和挖掘经验,负责基于AWS云平台的解决方案咨询和设计。