亚马逊AWS官方博客

使用 Amazon Kinesis Data Firehose 和 Amazon EMR 中的 Apache Spark 优化流式数据处理

Original URL: https://aws.amazon.com/blogs/big-data/optimizing-downstream-data-processing-with-amazon-kinesis-data-firehose-and-amazon-emr-running-apache-spark/

对于大多数公司而言,处理不断增加的数据量并整合新数据源充满挑战。  通常,AWS 客户会收到来自各种连接设备和传感器的海量消息,这些消息必须先经过有效注入和处理,之后才能执行进一步分析。  通常 Amazon S3 是适合保存所有类型数据的地点。  但是,数据在 Amazon S3 中的存储方式会对后续数据处理的效率和成本产生重大影响。  具体而言,如果 Apache Spark 处理的是大量小文件而不是较少的大文件,则可能会因文件操作量大而承受巨大负担。  在这些文件中,用于打开每个文件、读取元数据信息和关闭文件都会占用几毫秒时间。大量文件操作占用的总时间较多,这会导致处理缓慢。这篇博文将介绍如何使用 Amazon Kinesis Data Firehose 将传送到 Amazon S3 的大量小消息合并为较大消息。  这样可以加快运行 EMR 服务 中运行的 Spark 的的处理速度。与 Amazon Kinesis 数据流相似,Kinesis Data Firehose 接受的最大传入消息大小为 1 MB。  如果单个消息大于 1 MB,则可以在将其纳入流之前对其进行压缩。  但是,在文件量比较庞大时,大小为 1 MB 或更小的消息或文件就往往会显得过小。  对于文件到底应该多大,我们并没有什么“标准答案”。但是,对于许多数据集来说,大小为 1 MB 会造成文件数量和文件操作数过多。这篇文章还会展示如何使用 Apache Spark 读取 S3 中的压缩文件(这些文件没有正确的文件扩展名,Spark 处理完后以 Parquet 格式存储在 Amazon S3 中)。

解决方案概览

我们在此博文中遵循的步骤如下:

  1. 创建 Virtual Private Cloud (VPC) 和 Amazon S3 存储桶。
  2. 创建 Kinesis 数据流和 AWS Lambda 函数,使用 Lambda 处理来自 Kinesis 数据流的消息。
  3. 对 Kinesis Data Firehose 进行配置,以在步骤 2 中,将消息传递到由 Lambda 函数发送给 Kinesis Data Firehose, 再由 Kinesis Data Firehose 发送到 Amazon S3。此步骤还会预置 Amazon EMR 集群以处理 Amazon S3 中的数据。
  4. 使用在 Amazon EC2 上运行的自定义代码生成测试数据
  5. 从 Amazon EMR 集群的主实例运行示例 Spark 程序,以从 Amazon S3 读取文件,将其转换为 parquet 格式并写回 Amazon S3 目标。

下图展示了各项服务如何协同工作:

图中的 AWS Lambda 函数读取消息,向消息附加其他数据,并在发送到 Amazon Kinesis Data Firehose 之前使用 gzip 压缩它们。执行此操作原因在于,大多数客户在数据送达 Amazon S3 之前,需要对数据进行一些充实处理。

Amazon Kinesis Data Firehose 可以将传入的多个小消息缓冲到更大的记录中,然后再将它们传送到 Amazon S3 存储桶。它根据两个条件执行此操作:缓冲区大小(最大 128 MB)和缓冲区间隔(最多 900 秒)。只要满足这些条件中的任何一个,就会触发记录传送。

Apache Spark 作业从 Amazon S3 读取消息,处理数据后以 Parquet 格式存储它们。使用 Parquet 时,数据以列式格式存储,支持更高效的扫描,并支持 Amazon Athena 等服务的临时查询或进一步处理。

注意事项

发送到 Kinesis Data Firehose 的记录最大大小为 1,000 KB。如果您的消息大小大于此值,则最佳方法是在将消息发送到 Kinesis Data Firehose 之前压缩消息。Kinesis Data Firehose 还会在将消息写入 Kinesis Data Firehose 数据流后对其进行压缩。遗憾的是,这并不能克服消息大小限制,因为这种压缩发生在写入消息之后。当 Kinesis Data Firehose 将先前压缩过的消息传递到 Amazon S3 时,消息将写为没有文件扩展名的对象。例如,如果消息在写入 Kinesis Data Firehose 之前使用 gzip 压缩,则系统会在不适用 .gz 扩展名的情况下将其传递到 Amazon S3。如果您使用 Apache Spark 进行下游处理,则会出现问题,因为系统需要“.gz”扩展名。

在这篇博文的后续内容中,我们会介绍如何使用 Amazon S3 API 操作读取文件,从而克服此问题。

先决条件和假设

要按照这篇博文中列举的步骤操作,您需要满足以下条件:

  • 提供 AWS 服务访问权限的 AWS 账户。
  • 具有访问密钥和秘密访问密钥的 AWS Identity and Access Management (IAM) 用户,用于配置 AWS CLI
  • 模板和代码仅适用于美国东部(弗吉尼亚北部)区域。

此外,请注意以下事项:

  • 我们在同一个 VPC 中配置所有服务,以简化网络考虑事项。
  • 重要提示AWS CloudFormation 模板和我们提供的示例代码使用硬编码的用户名和密码以及开放的安全组。这些仅用于测试目的。如果不做任何修改,它们不适用于生产用途。

实施解决方案

您可以使用这个可下载模板进行一键部署。默认情况下,此模板在美国东部(弗吉尼亚北部)区域启动。请勿更改为其他区域。该模板仅适用于美国东部(弗吉尼亚北部)区域。要直接通过控制台启动,请选择 Launch Stack 按钮。

此模板采用以下参数。某些参数具有默认值,您无法编辑这些参数。这些预定义的名称硬编码在代码中。对于某些参数,您必须提供值。下表提供了更多详细信息。

对于此参数 请提供此值
StackName 提供堆栈名称。

ClientIP

 

允许使用 SSH 连接到集群的客户端的 IP 地址范围。
FirehoseDeliveryStreamName Amazon Firehose 传输流的名称。默认值设置为“AWSBlogs-LambdaToFireHose”。
InstanceType EC2 实例类型。
KeyName 用于启用登录访问权限的现有 EC2 密钥对的名称。
KinesisStreamName Amazon Kinesis Stream 的名称。默认值设置为“AWS-Blog-BaseKinesisStream”
Region AWS 区域,默认设置为 us-east-1 — 美国东部(弗吉尼亚北部)。请勿更改此设置,因为该脚本开发为仅可在此区域中工作。

EMRClusterName

 

EMR 集群的名称。
S3BucketName 您账户中创建的存储桶名称。为此存储桶制定唯一的名称。此存储桶用于存储 Spark 代码的消息和输出。

指定模板详细信息后,请选择 Next。在 Options 页面上,再次选择 Next。在 Review 页面上,选中 I acknowledge that AWS CloudFormation might create IAM resources with custom names 复选框,以及 I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND。然后单击 Create 按钮。

如果使用此一步式解决方案,您可以跳到步骤 7:生成测试数据集并加载到 Kinesis 数据流中。

要单独创建各个组件,请执行以下步骤。

1.使用 AWS CloudFormation 模板配置 Amazon VPC 并创建 Amazon S3 存储桶

在此步骤中,我们将设置 VPC、公共子网、互联网网关、路由表和安全组。安全组有两条入站访问规则。第一条入站规则允许从提供的客户端 IP CIDR 范围访问 TCP 端口22 (SSH),第二条入站规则允许从同一安全组中的任何主机访问任何 TCP 端口。我们会将此 VPC 和子网用于后续步骤中创建的所有其他服务。除了这些资源之外,我们还将创建一个标准的 Amazon S3 存储桶,其中包含提供的存储桶名称,用于存储传入数据和已处理数据。您可以使用此可下载的 AWS CloudFormation 模板来设置先前的组件。要直接通过控制台启动,请选择 Launch Stack

此模板采用以下参数。下表提供了更多详细信息。

对于此参数 执行此操作
StackName 提供堆栈名称。
S3BucketName 提供唯一的 Amazon S3 存储桶。此存储桶会在您的账户中创建。
ClientIp 提供添加到安全组入站规则的 CIDR IP 地址范围。您可以从“checkip.amazon.com”网址获取您当前的 IP 地址。

指定模板详细信息后,请选择 Next。在 Review 页面上,选择 Create

堆栈启动完成后,它应返回类似于以下内容的输出。

StackName 名称
VPCID Vpc-xxxxxxx
SubnetID subnet-xxxxxxxx
SecurityGroup sg-xxxxxxxxxx
S3BucketDomain <S3_BUCKET_NAME>.s3.amazonaws.com
S3BucketARN arn:aws:s3:::<S3_BUCKET_NAME>

记好输出结果,因为您要在下一步中使用此信息。您可以在 AWS 管理控制台中查看堆栈输出,也可以使用以下 AWS CLI 命令:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

2.  使用 AWS CloudFormation 模板创建必要的 IAM 角色

在此步骤中,我们要设置两个 AWS IAM 角色。AWS Lambda 函数将使用其中一个 IAM 角色来允许访问 Amazon S3 服务、Amazon Kinesis Data Firehose、Amazon CloudWatch Logs 和 Amazon EC2 实例。  Amazon Kinesis Data Firehose 服务使用第二个 IAM 角色来访问 Amazon S3 服务。您可以使用此可下载的 CloudFormation 模板来设置先前的组件。要直接通过控制台启动,请选择 Launch Stack

此模板采用以下参数。下表提供了更多详细信息。

对于此参数 执行此操作
StackName 提供堆栈名称。

指定模板详细信息后,请选择 Next。在 Options 页面上,再次选择 Next。在 Review 页面上选中 I acknowledge that AWS CloudFormation might create IAM resources with custom names。选择 Create

堆栈启动完成后,它应返回类似于以下内容的输出。

LambdaRoleArn arn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-lamdarole
FirehoseRoleArn arn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-firehoserole

堆栈启动完成后,它将返回包含有关已创建资源的信息的输出结果。记好输出结果,因为您要在下一步中使用此信息。您可以在 AWS 管理控制台中查看堆栈输出,也可以使用以下 AWS CLI 命令:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

3.使用 AWS CloudFormation 模板配置 Amazon Kinesis Data Firehose 数据流

在此步骤中,我们将配置 Amazon Kinesis Data Firehose 与 Amazon S3,并将 S3 设置为传入消息的目标。我们为压缩格式选择 Uncompressed 选项,缓冲选项为 128 MB 大小,间隔为 300 秒。您可以使用此可下载的 AWS CloudFormation 模板来设置先前的组件。要直接通过控制台启动,请选择 Launch Stack

此模板采用以下参数。下表提供了更多详细信息。

对于此参数 执行此操作
StackName 提供堆栈名称。
FirehoseDeliveryStreamName 提供 Amazon Kinesis Data Firehose 传输流的名称。默认值设置为“AWSBlogs-LambdaToFirehose”
Role 提供在步骤 2 中创建的 Kinesis Data Firehose IAM 角色 ARN。
S3BucketARN 选择 S3BucketARN。您可以从步骤 1 AWS CloudFormation 输出中获取此信息。

指定模板详细信息后,请选择 Next。在 Options 页面上,再次选择 Next。在 Review 页面上,选择 Create

4.使用 AWS CloudFormation 模板创建 Kinesis 数据流和 Lambda 函数

在此步骤中,我们设置了 Kinesis 数据流和 AWS Lambda 函数。我们可以使用 AWS Lambda 函数处理 Kinesis 数据流中的传入消息。此外还会创建事件源映射,作为此模板的一部分。这会为 Kinesis 数据流源的 AWS Lambda 函数添加一个触发器。有关创建事件源映射的更多信息,请参阅创建事件源映射。此 Kinesis 数据流使用 10 个分片创建,Lambda 函数使用 Java 8 运行时创建。我们分配 1920 MB 的内存大小和 300 秒的超时。您可以使用此可下载的 AWS CloudFormation 模板来设置先前的组件。要直接通过控制台启动,请选择 Launch Stack

此模板采用以下参数。下表提供了详细信息。

对于此参数 执行此操作
StackName 提供堆栈名称。
KinesisStreamName 提供 Amazon Kinesis 流的名称。默认值设置为“AWS-Blog-BaseKinesisStream”
Role 提供为第二个 AWS CloudFormation 模板中包含的 Lambda 函数创建的 IAM 角色。从第二个 AWS CloudFormation 模板的输出中获取值。
S3Bucket 提供使用第一个 AWS CloudFormation 模板创建的现有 Amazon S3 存储桶的名称。不要使用域名。仅提供存储桶名称。
Region 选择 AWS 区域。默认设置为 us-east-1 — 美国东部(弗吉尼亚北部)。

指定模板详细信息后,请选择 Next。在 Options 页面上,再次选择 Next。在 Review 页面上,选择 Create

5.使用 AWS CloudFormation 模板配置 Amazon EMR 集群

在此步骤中,我们设置一个 Amazon EMR 5.16.0 集群,使它包含 “Spark”、”Ganglia” 和 “Hive” 应用程序。我们使用一个主节点和两个核心节点创建此集群,并使用 r4.xlarge 实例类型。该模板使用 AWS Glue Metastore 作为 Amazon EMR hive Metastore。此 Amazon EMR 集群用于处理由 Amazon Kinesis Data Firehose 数据流创建的 Amazon S3 存储桶中的消息。您可以使用此可下载的 AWS CloudFormation 模板来设置先前的组件。要直接通过控制台启动,请选择 Launch Stack

此模板采用以下参数。下表提供了更多详细信息。

对于此参数 执行此操作
EMRClusterName 提供 EMR 集群的名称。
ClusterSecurityGroup 选择在第一个 AWS CloudFormation 模板中创建的安全组 ID。
ClusterSubnetID 选择在第一个 AWS CloudFormation 模板中创建的子网 ID。
AllowedCIDR 提供允许连接到集群的客户端的 IP 地址范围。
KeyName 提供现有 EC2 密钥对的名称,以访问 Amazon EMR 集群。

指定模板详细信息后,请选择 Next。在 Options 页面上,再次选择 Next。在 Review 页面上,选择 Create

堆栈启动完成后,它应返回类似于以下内容的输出。

EMRClusterMaster ssh hadoop@ec2-XX-XXX-XXX-XXX.us-east-1.compute.amazonaws.com -i <KEY_PAIR_NAME>.pem

记好输出结果,因为您要在下一步中使用此信息。您可以在 AWS 管理控制台中查看堆栈输出,也可以使用以下 AWS CLI 命令:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

6.使用 AWS CloudFormation 模板创建 Amazon EC2 实例以生成测试数据

在此步骤中,我们要设置 Amazon EC2 实例并安装 open-jdk 1.8 版。创建此 EC2 实例的 AWS CloudFormation 脚本运行另外两个步骤。首先,它会下载并安装 open-jdk 1.8 版。其次,它将 Java 程序 jar 文件下载到 EC2 实例的 ec2-user 主目录。我们使用这个 Java 程序生成大小约为 900 KB 的测试数据消息。然后,我们将其发送给在前面的步骤中创建的 Kinesis 数据流。Java jar 文件名为:“sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar”。

您可以使用此可下载的 AWS CloudFormation 模板来设置先前的组件。要直接通过控制台启动,请选择 Launch Stack

此模板采用以下参数。下表提供了更多详细信息。

对于此参数 执行此操作
EC2SecurityGroup 选择通过第一个 AWS CloudFormation 模板创建的安全组 ID。
EC2Subnet 选择通过第一个 AWS CloudFormation 模板创建的子网。
InstanceType 选择提供的实例类型。默认情况下,选择的是 r4.4xlarge 实例。
KeyName 用于启用对 EC2 实例的 SSH 访问权限的现有 EC2 密钥对的名称。

指定模板详细信息后,请选择 Next。在 Options 页面上,再次选择 Next。在 Review 页面上选中“I acknowledge that AWS CloudFormation might create IAM resources with custom names”选项,然后单击 Create 按钮。

堆栈启动完成后,它应返回类似于以下内容的输出。

EC2Instance ssh ec2-user@<Public-IP> -i <KEY_PAIR_NAME>.pem

记好输出结果,因为您要在下一步中使用此信息。您可以在 AWS 管理控制台中查看堆栈输出,也可以使用以下 AWS CLI 命令:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

7.生成测试数据集并加载到 Kinesis 数据流中

在成功创建前述所有 AWS CloudFormation 堆栈之后,登录到在步骤 6 中创建的 EC2 实例。使用 CloudFormation 堆栈模板输出中显示的“ssh”命令。此模板会复制“sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar”文件,该文件用于生成测试数据,并将所生成的数据发送给 Amazon 数据流。您可以在此 Git 代码库中找到与此示例 Kinesis 生成器对应的代码。

确保您的 EC2 实例的安全组允许来自您的 IP 地址的 ssh 端口 22(入站)的流量。如果不允许,请更新安全组的入站访问权限。

  • ssh ec2-user@<Public IP Address of the EC2 Instance> -i <SSH_KEY_PAIR_NAME>.pem

运行以下命令以生成一些测试数据。

$ cd;

 

$ ls -ltra sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

-rwxr-xr-x 1 ec2-user ec2-user 27536802 Oct 29 21:19 sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

 

$java -Xms1024m -Xmx25600m -XX:+UseG1GC -cp sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar com.optimize.downstream.entry.Main 10000

 

此 java 程序使用 PutRecords API 方法,该方法允许通过单独一个 HTTP 请求发送大量记录。有关此问题的更多信息,您可以查看这篇 AWS 博文。运行上面的 java 程序后,您将看到以下输出,表明消息正在发送到 Kinesis 数据流。

“Starting producer and consumer.....
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 0
Producer Thread # 9 is going to sleep mode for 500 ms.
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 1
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 2
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 3
::
::
Record sent to Kinesis Stream.Record size is ::: 5042850 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream.Record size is ::: 5042726 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream.Record size is ::: 5042729 KB”

运行示例 Kinesis 生成器 jar 时,请注意消息数为 10,000。该程序生成测试数据消息,不能用于取代负载测试工具。该程序是为了演示本文中提供的用例而创建的。

生成所有消息并将其发送到 Amazon Kinesis 数据流后,程序将正常退出。

JSON 输入消息格式示例如下:

"processedDate":"2018/10/30 19:05:19",
"currentDate":"2018/10/30 19:05:07",
"hashDeviceId":"0c2745e4-c2d6-4d43-8339-9c2401e80e92",
"deviceId":"94581b5f-a117-484a-8e3c-4fcc2dbd53b7",
"accelerometerSensorList":[
{
"accelerometer_Y":8,
"gravitySensor_X":5,
"accelerometer_X":9,
"gravitySensor_Z":4,
"accelerometer_Z":1,
"gravitySensor_Y":5,
"linearAccelerationSensor_Z":3,
"linearAccelerationSensor_Y":9,
"linearAccelerationSensor_X":9
},
{
"accelerometer_Y":1,
"gravitySensor_X":3,
"accelerometer_X":5,
"gravitySensor_Z":5,
"accelerometer_Z":7,
"gravitySensor_Y":9,
"linearAccelerationSensor_Z":6,
"linearAccelerationSensor_Y":5,
"linearAccelerationSensor_X":3
},
{

},
{

},
:
:
],
"tempSensorList":[
{
"kelvin":585.4928040286752,
"celsius":43.329574923775425,
"fahrenheit":50.13864584530086
},
{
"kelvin":349.95625855125814,
"celsius":95.68423052685313,
"fahrenheit":7.854854574219985
},
{

},
{

},
:
:

],
“illuminancesSensorList”:[
{
“illuminance”:44.65135784368194
},
{
“illuminance”:98.15404017082403
},
{

},
{

},
:
:
],
“gpsSensorList”:[
{
“altitude”:4.38273213294682,
“heading”:7.416314616289915,
“latitude”:5.759723677991661,
“longitude”:1.4732885894731842
},
{
“altitude”:9.816473807569487,
“heading”:5.118919157684835,
“latitude”:3.581361614110458,
“longitude”:1.3699272610616127
},
{

},
{

},
:
:
}

 

登录 Kinesis Data Streams 控制台,然后选择在步骤 4 中创建的 Kinesis 数据流。  选择 Monitor 选项卡以查看图形。运行数据生成实用程序至少 15 分钟以生成足够的数据。

8.使用 AWS Lambda 处理 Kinesis Data Streams 消息

根据先前介绍的设置,我们还使用 AWS Lambda 函数(名称是 LambdaForProcessingKinesisRecords)来处理来自 Kinesis 数据流的消息。此 Lambda 函数读取每个消息的内容并附加“额外数据”。 这表明来自 Kinesis 数据流的传入消息已经被系统所读取,并且附加了一些额外信息,以使消息大小超过 1 MB。 部分客户有这样的用例,需要通过添加额外信息来充实传入的消息。在 AWS Lambda 函数为传入消息附加额外数据之后,会将它们发送到 Amazon Kinesis Data Firehose。由于 Kinesis Data Firehose 仅接受小于 1 MB 的消息,因此我们必须在发送消息之前对其进行压缩。在 Lambda 函数中,我们在将消息发送到 Kinesis Data Firehose 之前使用 gzip 来压缩消息。除了压缩每条消息之外,我们还在压缩每条消息之后,为每条消息附加一个换行字符(“/n”),以分隔各条消息。

在创建 Kinesis Data Firehose 时,我们将缓冲区大小设置为 128 MB,缓冲区的持续时间为 900 秒。这有助于将传入的压缩消息合并为更大的消息,并发送到指定的 Amazon S3 存储桶。

AWS Lambda 函数在读取 Kinesis Data Streams 中的原始消息后,为这些消息附加以下内容。

"testAdditonalDataList": [
{
"dimesnion_X": 9,
"dimesnion_Y": 2,
"dimesnion_Z": 2
},
{
"dimesnion_X": 3,
"dimesnion_Y": 10,
"dimesnion_Z": 5
}
{

},
{

},
:
:
]

如果我们在将消息发送到 Kinesis Data Firehose 之前不压缩消息,则会在 Amazon CloudWatch Logs 中引发此错误消息。

以下是我们在 AWS Lambda 函数中压缩消息的代码段。完整的代码可以在这个 Git 代码库中找到。

private String sendToFireHose(String mergedJsonString)
{
PutRecordResult res = null;
try {
//To Firehose -
System.out.println("MESSAGE SIZE BEFORE COMPRESSION IS : " + mergedJsonString.toString().getBytes(charset).length);
System.out.println("MESSAGE SIZE AFTER GZIP COMPRESSION IS : " + compressMessage(mergedJsonString.toString().getBytes(charset)).length);
PutRecordRequest req = new PutRecordRequest()
.withDeliveryStreamName(firehoseStreamName);

// Without compression – Send to Firehose
//Record record = new Record().withData(ByteBuffer.wrap((mergedJsonString.toString() + “\r\n”).getBytes()));

// With compression – send to Firehose
Record record = new Record().withData(ByteBuffer.wrap(compressMessage((mergedJsonString.toString() + “\r\n”).getBytes())));
req.setRecord(record);
res = kinesisFirehoseClient.putRecord(req);
}
catch (IOException ie) {
ie.printStackTrace();
}
return res.getRecordId();
}

您可以检查所提供的存储桶,以查看消息是否正在流入存储桶。Amazon S3 存储桶应显示类似于以下示例的内容:

您会看到,从 Kinesis Data Firehose 生成的文件不带任何扩展名。默认情况下,除非您选择某个压缩选项,否则 Kinesis Data Firehose 不会为 Amazon S3 存储桶中生成的文件提供任何扩展名。但在我们的用例中,由于未压缩输入消息的大小超过 1 MB,我们在发送到 Kinesis Data Firehose 之前将其压缩。由于消息已经压缩,我们没有在 Kinesis Data Firehose 中选择任何压缩选项,以防消息双重压缩,导致下游 Spark 应用程序无法处理这种双重压缩的消息。

9.使用 Apache Spark 程序及 Amazon EMR 读取数据并将数据转换为 parquet 格式

正如我们在前面的屏幕截图中看到的那样,默认情况下,Kinesis Data Firehose 不会为写入 Amazon S3 存储桶的文件生成任何文件扩展名。这在使用 Apache Spark 读取文件时会产生问题。默认情况下,如果文件经过压缩,Apache Spark 会检查有效的文件扩展名。本例中使用的是 gzip 压缩,因此系统会寻找 <filename>.gz,找到后才能成功读取相应文件。

为了解决这个问题,我们可以使用 Amazon S3 API 操作(尤其是 AmazonS3Client 类)列出所有 Amazon S3 密钥,并使用 Spark 的 parallelize 方法来读取文件的内容。读取文件内容后,我们可以使用 GZipInputStream 类对其进行解压缩。请参见下面的代码段。完整的代码可以在此 Git 代码库中找到。

val allLinesRDD = spark.sparkContext.parallelize(s3ObjectKeys).flatMap
{ key => Source.fromInputStream
(
new GZipInputStream(s3Client.getObject(bucketName, key).getObjectContent: InputStream)
).getLines
}

var finalDF = spark.read.json(allLinesRDD).toDF()

成功完成 Amazon EMR 集群创建后,使用以下命令登录 Amazon EMR 主机。您可以从 AWS CloudFormation 堆栈 5(步骤 5)的输出参数“EMRClusterMaster”中获取“ssh”登录命令。

  • ssh hadoop@ec2-XX-XX-XX-XX.compute-1.amazonaws.com -i <KEYPAIR_NAME>.pem
  • 确保安全端口 22 以打开,以连接到 Amazon EMR 主机。

使用以下 Spark submit 命令运行 Spark 程序。

spark-submit --class com.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet --master yarn --deploy-mode client s3://aws-bigdata-blog/artifacts/aws-blog-optimize-downstream-data-processing/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar <S3_BUCKET_NAME> fromfirehose/<YEAR>/ output-emr-parquet/

注意更改上面这条 Spark 命令中的 S3_BUCKET_NAME 和 YEAR 值。

参数编号 属性
1 –class com.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet
2 –master yarn
3 –deploy-mode client
4 s3://aws-bigdata-blog/artifacts/aws-blog-avoid-small-files/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar
5 S3_BUCKET_NAME 在 AWS CloudFormation 模板中创建的 Amazon S3 存储桶名称。源文件在此存储桶中创建。
6 <INPUT S3 LOCATION> “fromfirehose/<YYYY>/”.输入文件会在已创建的存储桶下的此 Amazon S3 键位置中创建。“YYYY”表示当时年份。例如“fromfirehose/2018/”
7 <OUTPUT S3 LOCATION> 提供将在上面指定的 Amazon S3 存储桶下创建的输出目录名称。例如:“output-emr-parquet/”

 

程序运行完毕后,您可以检查 Amazon S3 输出位置以查看以 parquet 格式写入的文件。

迁移后清理

完成并测试此解决方案后,停止任务并删除 AWS CloudFormation 堆栈来清理资源。如果在已创建的 Amazon S3 存储桶中有任何文件,则堆栈删除操作将失败。在删除 AWS CloudFormation 模板之前,务必清理已创建的 Amazon S3 存储桶。

小结

在这篇博文中,我们描述了通过将传入消息发送到 Amazon Kinesis Data Firehose 来避免在 Amazon S3 中创建小文件的具体操作流程。我们还使用A pache Spark 和 Amazon EMR 集群完成了以 parquet 格式读取和存储数据的过程。

 


关于作者

Srikanth Kodali 的照片Srikanth KodaliAmazon Web Services 的一位高级 IOT 数据分析架构师。 他与 AWS 客户配合工作,为构建物联网数据和分析解决方案提供指导和技术协助,帮助客户在使用 AWS 时提高其解决方案的价值。