亚马逊AWS官方博客

使用 AWS Glue 对非原生 JDBC 数据源运行 ETL 作业

Original URL:https://aws.amazon.com/cn/blogs/big-data/use-aws-glue-to-run-etl-jobs-against-non-native-jdbc-data-sources/

AWS Glue 是一项完全托管的ETL(提取、转换和加载) 服务,可以帮助您更轻松地准备和加载数据以进行分析。在 AWS 管理控制台上,简单点击几下,就可以创建和运行 ETL 作业。只需要将 AWS Glue 指向您的数据源,AWS Glue 就可以发现您的数据,并将相关的元数据(例如,表定义和结构)存储在 AWS Glue的数据目录中。只要建立 IP 连接,AWS Glue 就可以在 AWS 或其他地方,使用 JDBC 驱动程序建立原生连接器与数据源的连接。在本文中,我们将演示如何连接到 AWS Glue目前还没有提供原生支持的数据源。我们将分步介绍如何连接两个这样的数据源(IBM DB2 和 SAP Sybase)并运行 ETL 作业。另外,您也可以使用同一流程连接到任何其他 JDBC 可访问的数据库。

AWS Glue 数据源

AWS Glue 通过使用 JDBC 协议原生支持以下数据存储:

  • 可以公开访问的数据库
    • Amazon Aurora
    • MariaDB
    • Microsoft SQL Server
    • MySQL
    • Oracle
    • PostgreSQL

有关更多信息,请参阅 AWS Glue 开发人员指南中的将连接添加到您的数据存储部分。

数据湖是部署在 AWS 上增长最快的架构之一。用于提取、清理、转换和结构化数据的 ETL 流程对于此架构至关重要。能够灵活地与广泛的数据库引擎进行互操作,可以加速数据湖架构的采用。

对于 AWS Glue 原生不支持的数据源,例如 IBM DB2、Pivotal Greenplum、SAP Sybase 或任何其他关系数据库管理系统 (RDBMS),您可以从 Amazon S3 导入自定义数据库连接器到 AWS Glue 作业。在这种情况下,必须从 AWS Glue 脚本建立到数据源的连接以提取数据,而不是使用 AWS Glue 连接。要了解更多信息,请参阅 AWS Glue 开发人员指南中的提供您自己的自定义脚本部分。

为 IBM DB2 数据源设置 ETL 作业

第一个示例展示了如何将 AWS Glue ETL 作业连接到 IBM DB2 实例、转换源数据库中的数据,以及将其以 Apache Parquet 格式存储在 Amazon S3 中。要使用外部 JDBC 驱动程序成功创建 ETL 作业,必须定义以下内容:

  • 作业脚本在 S3 中的存放位置
  • 临时目录在 S3 中的存放位置
  • JDBC 驱动程序在 S3 中的存放位置
  • Parquet 数据(输出)在 S3 中的存放位置
  • 作业的 IAM 角色

默认情况下,AWS Glue 使用以下建议格式的脚本存储桶名称和临时目录:

s3://aws-glue-scripts-<ACCOUNT_ID>-<REGION>/<USER>
s3://aws-glue-temporary-<ACCOUNT_ID>-<REGION>/<USER>

对于 JDBC 驱动程序,您可以创建一个类似的存放位置:

s3://aws-glue-jdbc-drivers-<ACCOUNT_ID>-<REGION>/<USER>

同样对于 Parquet 数据(输出),您可以创建一个类似的存放位置:

s3://aws-glue-data-output-<ACCOUNT_ID>-<REGION>/<USER>

请记得,将 AWS Glue 作业和 S3 存储桶放在同一 AWS 区域中,有助节省跨区域数据传输费用。在本文中,我们将使用美国东部(俄亥俄)区域 (us-east-2)。

创建 IAM 角色

下一步是设置 ETL 作业将使用的 IAM 角色:

  1. 登录到 AWS 管理控制台,然后搜索 IAM

  1. 在 IAM 控制台上,在左侧导航面板中选择角色
  2. 选择创建角色。受信任实体的角色类型必须是AWS 服务,再选中 AWS Glue

  1. 选择下一步: 权限
  2. 搜索策略AWSGlueServiceRole,然后选中它。

      6. 继续搜索SecretsManagerReadWrite。此策略允许 AWS Glue 作业访问存储在 AWS Secrets Manager 中的数据库凭证。

注意:此策略当前是过于开放的,在此仅用于测试目的。实际场景中,您应该创建一个自定义策略,仅允许访问要在 ETL 作业中使用的密钥。

  1. 选择此策略,然后选择下一步: 查看
  2. 给您的角色命名,例如 GluePermissions并确认选择了两个策略。

  1. 选择创建角色

现在您已经创建了 IAM 角色,接下俩要将 JDBC 驱动程序上传到前面指定的 Amazon S3 中的位置。对于此示例,我们将使用 DB2 驱动程序,该驱动程序在 IBM Support 网站上可以找到。

存储数据库凭证

最佳实践是将数据库凭证存储在安全存储中。在本例中,我们使用 AWS Secrets Manager 来安全存储凭证。按照以下步骤创建这些凭证:

  1. 打开控制台,然后搜索Secrets Manager
  2. 在 AWS Secrets Manager 控制台中,选择存储新密钥
  3. 选择密钥类型下,选择其他密钥类型
  4. 密钥键/中,为以下每个参数设置一行:
    • db_username
    • db_password
    • db_url(例如,jdbc:db2://10.10.12.12:50000/SAMPLE
    • db_table
    • driver_name (db2.jcc.DB2Driver)
    • output_bucket:(例如,aws-glue-data-output-1234567890-us-east-2/User
  5. 选择下一步
  6. 对于密钥名称,使用DB2_Database_Connection_Info
  7. 选择下一步
  8. 禁用自动轮换复选框保持选中状态。
  9. 选择下一步
  10. 选择存储

在 AWS Glue 中添加作业

接下来是按照以下步骤创建 AWS Glue 作业:

  1. 在 AWS 管理控制台中,搜索 AWS Glue。

  1. 在左侧的导航面板中,在ETL 下选择作业
  2. 选择添加作业

  1. 填写基本的作业属性
  2. 为作业命名(例如,db2-job)。
  3. 选择先前创建的 IAM 角色 (GluePermissions)。
  4. 选择执行类型为Spark
  5. 选择Glue版: Spark2.4 , Python3
  6. 对于此作业运行,选择您将编写的新脚本

  1. 脚本库和作业参数部分,为依赖 JAR 路径选择 JDBC 驱动程序的位置。

  1. 选择下一步
  2. 连接页面上,选择下一步
  3. 在摘要页面上,选择保存作业并编辑脚本。这将创建作业并打开脚本编辑器。

在编辑器中,使用以下脚本替换已有代码。重要提示:脚本的第 47 行用来表示映射源表中字段到目标字段,同时删除空字段以节省 Parquet 目标中的空间,最后以 Parquet 格式写入 Amazon S3。

import sys
import boto3
import json
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job


## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Getting DB credentials from Secrets Manager
client = boto3.client("secretsmanager", region_name="us-east-2")

get_secret_value_response = client.get_secret_value(
        SecretId="DB2_Database_Connection_Info"
)

secret = get_secret_value_response['SecretString']
secret = json.loads(secret)

db_username = secret.get('db_username')
db_password = secret.get('db_password')
db_url = secret.get('db_url')
table_name = secret.get('db_table')
jdbc_driver_name = secret.get('driver_name')
s3_output = "s3://" + secret.get('output_bucket') + "/" + table_name

# Connecting to the source
df = glueContext.read.format("jdbc").option("driver", jdbc_driver_name).option("url", db_url).option("dbtable", table_name).option("user", db_username).option("password", db_password).load()

df.printSchema()
print df.count()

datasource0 = DynamicFrame.fromDF(df, glueContext, "datasource0")

# Defining mapping for the transformation
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("EMPNO", "string", "EMPNO", "string"), ("FIRSTNME", "string", "FIRSTNME", "string"), ("MIDINIT", "string", "MIDINIT", "string"), ("LASTNAME", "string", "LASTNAME", "string"), ("WORKDEPT", "string", "WORKDEPT", "string"), ("PHONENO", "string", "PHONENO", "string"), ("HIREDATE", "date", "HIREDATE", "date"), ("JOB", "string", "JOB", "string"), ("EDLEVEL", "integer", "EDLEVEL", "integer"), ("SEX", "string", "SEX", "string"), ("BIRTHDATE", "date", "BIRTHDATE", "date"), ("SALARY", "double", "SALARY", "double"), ("BONUS", "double", "BONUS", "double"), ("COMM", "double", "COMM", "double")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

# Writing to destination
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": s3_output}, format = "parquet", transformation_ctx = "datasink4")

job.commit()

  1. 选择保存
  2. 选择屏幕右侧的黑色 X 以关闭编辑器。

运行 ETL 作业

现在,您已经创建了作业,下一步就是按照以下步骤执行它:

  1. 作业页面上,选择刚才创建的作业。在操作菜单上,选择运行作业,然后确认您要运行作业。等待它执行完成。

  1. 在作业显示为成功后,选择日志以读取作业的输出。

  1. 在作业的输出中,您将找到执行 df.printSchema() 的结果和 df.count() 的消息。

同样,如果您在 S3 中定位到输出对象的存储桶,您将找到 ETL 作业的 Parquet 结果。

现在您已经使用 AWS Glue 创建了一个 ETL 作业,该作业使用外部 JDBC 驱动程序连接到了已有数据库。它使您能够执行所需的任何转换。

为 SAP Sybase 数据源设置 ETL 作业

在本节中,我们将介绍如何针对 SAP Sybase 数据源创建 AWS Glue ETL 作业。上一节中提到的流程同样适用于 Sybase 数据源,但需要在作业中进行一些更改:

  1. 创建作业时,为 JDBC 依赖项选择正确的 JAR。
  2. 在脚本中,从 AWS Secrets Manager 更改对要使用的密钥的引用:
get_secret_value_response = client.get_secret_value(
        SecretId="Sybase_Database_Connection_Info"
)

并如下所示更改第 47 行中字段的映射:

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("au_id", "string", "au_id", "string"), ("au_lname", "string", "au_lname", "string"), ("au_fname", "string", "au_fname", "string"), ("phone", "string", "phone", "string"), ("address", "string", "address", "string"), ("city", "string", "city", "string"), ("state", "string", "state", "string"), ("country", "string", "country", "string"), ("postalcode", "string", "postalcode", "string")], transformation_ctx = "applymapping1")

 

成功执行新的 ETL 作业后,输出包含的信息与 DB2 数据源生成的信息类型相同。

请注意,每个 JDBC 驱动程序都有自己的细微差别和不同的许可条款,在使用它们之前应加以了解。

最大化 JDBC 读取并行性

使用大型数据源时要注意的一点是内存消耗。在某些情况下,将所有数据读入单个执行程序时会生成“内存不足”错误。其中一种优化方法是依靠可以使用 Apache Spark 和 AWS Glue 实现的读取并行性。要了解更多信息,请参阅 Apache Spark SQL 模块

您可以使用以下选项:

  • partitionColumn:分区字段
  • lowerBound:下界
  • upperBound:上界
  • numPartitions:最大分区数量。这与 lowerBound (包含)和 upperBound (不包含)一起成为 WHERE 子句表达式中的分区步幅,该表达式用于拆分 partitionColumn。未设置时,此默认设置为 SparkContext.defaultParallelism。
  • 这些选项指定表读取的并行性。lowerBound 和 upperBound 决定分区的步幅,但是它们不过滤表中的行。因此,Spark 会分区并返回表中的所有行。例如:
df = glueContext.read.format("jdbc").option("driver", jdbc_driver_name).option("url", db_url).option("dbtable", table_name).option("user", db_username).option("password", db_password).option("partitionColumn", <column_name>).option("lowerBound", 1).option("upperBound", 100).option("numPartitions", 2).load()

请务必注意分区的数量,因为过多的分区也可能导致 Spark 使外部数据库系统崩溃。

小结

您可以使用本文中描述的流程,连接到能够通过 JDBC 驱动程序访问的任何数据源并运行 AWS Glue ETL 作业。这些包括新一代通用分析数据库,例如 Greenplum 等。

您可以使用分区和下推谓词来提高对这些数据集的查询效率。有关更多信息,请参阅管理 AWS Glue 中用于 ETL 输出的分区。该技术为开启了在混合环境中移动数据和馈送数据湖的大门。

 


其他阅读资源

如果您认为本文有用,不妨前往阅读使用 AWS Glue 中的分区数据一文。


本篇作者

Kapil Shardha

Kapil Shardha 是技术客户经理,负责协助企业客户采用 AWS。他具有基础设施自动化和开发运营背景。

William Torrealba

William Torrealba 是一名 AWS 解决方案架构师,负责协助客户采用 AWS。他具有应用程序开发、高可用性分布式系统、自动化和开发运营背景。