一、概述
虽然 AWS DynamoDB 提供了高性能、可扩展的 NoSQL 解决方案,但在某些业务场景下存在局限性:
复杂查询需求:
- AWS DynamoDB 不支持复杂的 JOIN 操作
- 缺乏灵活的多条件查询能力
- 不支持完整的 SQL 语法
业务适配性:
- 现有系统可能深度依赖 SQL 特性
- 团队更熟悉关系型数据库操作
- 需要与其他 MySQL 系统集成
成本考虑:
- AWS DynamoDB 按读写容量计费
- 大量复杂查询可能导致成本升高
- 数据规模增长时预算难以控制
基于以上因素,客户要求将数据迁移至 MySQL来满足以下要求:
- 提供更灵活的查询能力
- 降低开发维护成本
- 更好地满足业务需求
- 实现与现有系统的无缝集成
二、方案概述
以下是基于 AWS Glue 从 AWS DynamoDB 迁移数据到 AWS Aurora MySQL 的方案概述(使用 Crawler 爬取 AWS Aurora MySQL 表结构)。
项目概述:
使用 AWS Glue 实现从 AWS DynamoDB 到 AWS Aurora MySQL 的数据自动化迁移方案。
技术架构:
- 数据源:AWS DynamoDB
- 目标库:AWS Aurora MySQL 数据库
- ETL 工具:AWS Glue
- 开发语言:Python/PySpark
实现步骤:
目标数据库结构探索
- 使用 AWS Glue Crawler 爬取 AWS Aurora MySQL 表结构
- 将 AWS Aurora MySQL 表结构信息保存到 AWS Glue Data Catalog
- 用于后续 ETL 作业的 schema 映射
ETL 作业开发
- 创建 Glue ETL Job
- 从 AWS DynamoDB 读取源数据
- 根据爬取的 AWS Aurora MySQL 表结构进行数据转换
- 写入 AWS Aurora MySQL 目标表
任务调度
三、方案部署
如下是基于亚马逊云科技已有产品,确保数据能够快速且平滑地过渡到 AWS Aurora MySQL 环境的具体操作步骤,方便客户快速搭建环境以及配置数据过渡。
为 AWS Glue Crawler 创建一个安全组,方便抓取 AWS Aurora MySQL 的数据库表结构
aws ec2 create-security-group --group-name glue-crawler-sg --description "Security group for Glue Crawler" --vpc-id vpc-0a12ed3c51c66f3c8
通过 describe 命令获取已经创建的安全组
SG_ID=$(aws ec2 describe-security-groups --filters "Name=group-name,Values=glue-crawler-sg" --query 'SecurityGroups[0].GroupId' --output text)
增加自身关联,增加出入规则
aws ec2 authorize-security-group-ingress --group-id $SG_ID --protocol all --source-group $SG_ID --port -1
创建 AWS Glue 的数据库连接(注意此处需要根据已有的数据库连接构件如下命令)
JDBC_CONNECTION_URL: jdbc:mysql://xxx-aurora-serverless-v2.cluster-cadhydlq2qny.ap-southeast-1.rds.amazonaws.com:3306/xxx_db
SECRET_ID: rds!cluster-e71e8aba-5c72-47e3-bc08-a38cf735c6e9
本文使用 Secret Manager 的方式保存数据库密码,所以要找到对应的 ID。
SubnetId: 所在子网 subnet-0937f925b8018d822
SecurityGroupIdList: 安全组列表 sg-0021c104d18dc4222
AvailabilityZone: 可用区 ap-southeast-1a
aws glue create-connection --connection-input '{ "Name": "mysql-connection", "ConnectionType": "JDBC", "ConnectionProperties": { "JDBC_CONNECTION_URL": "jdbc:mysql://xxx-aurora-serverless-v2.cluster-cadhydlq2qny.ap-southeast-1.rds.amazonaws.com:3306/xxx_db", "JDBC_ENFORCE_SSL": "false", "SECRET_ID": "rds!cluster-e71e8aba-5c72-47e3-bc08-a38cf735c6e9" }, "PhysicalConnectionRequirements": { "SubnetId": "subnet-0937f925b8018d822", "SecurityGroupIdList": ["sg-0021c104d18dc4222"], "AvailabilityZone": "ap-southeast-1a" } }'
接下来我们根据导出 AWS DynamoDB 数据,再导入到 AWS Aurora MySQL 数据库,所需要的权限进行最小力度的角色的创建。
创建一个 create_role.sh 脚本文件
#!/bin/bash
# 设置变量
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
ROLE_NAME="XXX-GlueJobRole"
# 创建信任策略
cat << EOF > trust-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
# 创建权限策略
cat << EOF > permission-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"glue:*",
"s3:*",
"cloudwatch:*",
"logs:*",
"ec2:*",
"secretsmanager:GetSecretValue",
"rds-db:connect"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"dynamodb:*"
],
"Resource": "*"
}
]
}
EOF
# 创建角色
aws iam create-role \
--role-name $ROLE_NAME \
--assume-role-policy-document file://trust-policy.json
# 创建策略
aws iam create-policy \
--policy-name XXX-GlueJobPolicy \
--policy-document file://permission-policy.json
# 附加策略
aws iam attach-role-policy \
--role-name $ROLE_NAME \
--policy-arn arn:aws:iam::$ACCOUNT_ID:policy/XXX-GlueJobPolicy
aws iam attach-role-policy \
--role-name $ROLE_NAME \
--policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
# 清理临时文件
rm trust-policy.json permission-policy.json
ROLE_ARN=$(aws iam get-role --role-name $ROLE_NAME --query 'Role.Arn' --output text)
echo "Role ARN: $ROLE_ARN"
echo "Role creation completed!"
创建 Crawler 获取数据库结构并创建 table
aws glue create-crawler \
--name "mysql-crawler" \
--role "XXX-GlueJobRole" \
--targets '{
"JdbcTargets": [
{
"ConnectionName": "mysql-connection",
"Path": "xxx_db/xxx_files",
"Exclusions": [],
"EnableAdditionalMetadata": []
}
]
}' \
--database-name "xxx-mysql" \
--recrawl-policy '{
"RecrawlBehavior": "CRAWL_EVERYTHING"
}' \
--schema-change-policy '{
"UpdateBehavior": "UPDATE_IN_DATABASE",
"DeleteBehavior": "DEPRECATE_IN_DATABASE"
}' \
--lineage-configuration '{
"CrawlerLineageSettings": "DISABLE"
}'
启动 Crawler 来抓取表结构
aws glue start-crawler --name "mysql-crawler"
将会在 Table 中获取到 Table 的结构
创建 AWS S3 存储代码和 AWS Glue 的一些临时文件信息,创建 create_job.sh 脚本, 只需要修改您的 region 即可,当然请根据您的表结构进行一些代码的修改。
#!/bin/bash
# 设置变量
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
REGION="ap-southeast-1"
BUCKET_NAME="aws-glue-assets-${ACCOUNT_ID}-${REGION}"
# 创建 AWS S3 存储桶
aws s3api create-bucket \
--bucket ${BUCKET_NAME} \
--region ${REGION} \
--create-bucket-configuration LocationConstraint=${REGION}
# 创建必要的文件夹结构
aws s3api put-object --bucket ${BUCKET_NAME} --key scripts/
aws s3api put-object --bucket ${BUCKET_NAME} --key temporary/
aws s3api put-object --bucket ${BUCKET_NAME} --key sparkHistoryLogs/
cat << 'EOF' > etl_script.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_timestamp, udf, lit, when
from pyspark.sql.types import *
import uuid
# 获取作业参数
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'glue_database', # AWS Glue 数据目录数据库名
'glue_table', # AWS Glue 数据目录表名
'dynamodb_table_arn', # AWS DynamoDB 表 ARN
's3_bucket', # AWS S3 存储桶名称
's3_prefix' # AWS S3 前缀路径
])
# 创建 UUID 生成器 UDF
def generate_uuid():
return str(uuid.uuid4())
generate_uuid_udf = udf(generate_uuid, StringType())
# 初始化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# 从 AWS DynamoDB 读取数据
dyf_dynamodb = glueContext.create_dynamic_frame.from_options(
connection_type="dynamodb",
connection_options={
"dynamodb.export": "ddb",
"dynamodb.s3.bucket": args['s3_bucket'],
"dynamodb.s3.prefix": args['s3_prefix'],
"dynamodb.tableArn": args['dynamodb_table_arn'],
"dynamodb.unnestDDBJson": True
},
transformation_ctx="AmazonDynamoDB_node1729663842557"
)
# 转换为 DataFrame
df = dyf_dynamodb.toDF()
# 打印现有的列名
print("现有列名:", df.columns)
# 添加 UUID 作为 id 列
df_with_id = df.withColumn("id", generate_uuid_udf())
# 为所有可能缺失的列添加默认值
columns_with_defaults = {
"fileId": ("string", ""),
"recipientEmail": ("string", ""),
"dateAdded": ("timestamp", None),
"expiryDate": ("timestamp", None),
"dateShared": ("timestamp", None),
"downloadCount": ("int", 0),
"downloadLimit": ("int", 0),
"downloads": ("string", ""),
"fileName": ("string", ""),
"folderId": ("string", ""),
"notify": ("boolean", False),
"ownerEmail": ("string", ""),
"ownerId": ("string", ""),
"ownerName": ("string", ""),
"classification": ("string", ""),
"ipLimit": ("string", ""),
"documentId": ("string", ""),
"stampId": ("string", ""),
"stampDate": ("timestamp", None),
"pwKey": ("string", ""),
"pwErrorDetail": ("string", ""),
"pwStatus": ("string", ""),
"size": ("long", 0),
"type": ("string", ""),
"userType": ("string", ""),
"downloadType": ("string", "")
}
# 添加缺失的列并设置默认值
df_with_defaults = df_with_id
for col_name, (col_type, default_value) in columns_with_defaults.items():
if col_name not in df_with_id.columns:
if col_type == "timestamp" and default_value is None:
df_with_defaults = df_with_defaults.withColumn(col_name, lit(None).cast("timestamp"))
else:
df_with_defaults = df_with_defaults.withColumn(col_name, lit(default_value))
# 数据类型转换
select_expressions = []
for col_name, (col_type, _) in columns_with_defaults.items():
select_expressions.append(col(col_name).cast(col_type))
# 添加 id 列到选择表达式
select_expressions.insert(0, col("id").cast("string"))
# 执行选择和类型转换
df_processed = df_with_defaults.select(select_expressions)
# 数据验证
print("记录总数:", df_processed.count())
print("Schema 信息:")
df_processed.printSchema()
# 显示样本数据
print("样本数据:")
df_processed.show(5)
# 转回 DynamicFrame
dyf_processed = DynamicFrame.fromDF(df_processed, glueContext, "processed_data")
# 写入 AWS Aurora MySQL
glueContext.write_dynamic_frame.from_catalog(
frame=dyf_processed,
database=args['glue_database'],
table_name=args['glue_table'],
transformation_ctx="MySQL_node1729671565717",
additional_options={
"writeMode": "append"
}
)
job.commit()
EOF
# 上传脚本到 AWS S3
aws s3 cp etl_script.py s3://${BUCKET_NAME}/scripts/xxx_etl.py
# 验证上传
aws s3 ls s3://${BUCKET_NAME}/scripts/
# 设置存储桶策略
cat << EOF > bucket-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "GlueAccess",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::${ACCOUNT_ID}:role/XXX-GlueJobRole"
},
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::${BUCKET_NAME}",
"arn:aws:s3:::${BUCKET_NAME}/*"
]
}
]
}
EOF
aws s3api put-bucket-policy \
--bucket ${BUCKET_NAME} \
--policy file://bucket-policy.json
# 启用版本控制
aws s3api put-bucket-versioning \
--bucket ${BUCKET_NAME} \
--versioning-configuration Status=Enabled
# 清理临时文件
rm etl_script.py bucket-policy.json
# 打印重要信息
echo "Bucket created: ${BUCKET_NAME}"
echo "Script location: s3://${BUCKET_NAME}/scripts/xxx_etl.py"
此刻准备资源和文件都已经好了,我们根据 AWS S3 的位置和名称创建 AWS Glue Job
|
aws glue create-job \
--name "dynamodb-to-mysql" \
--role "XXX-GlueJobRole" \
--command '{
"Name": "glueetl",
"ScriptLocation": "s3://aws-glue-assets-xxx-ap-southeast-1/scripts/xxx_etl.py",
"PythonVersion": "3"
}' \
--default-arguments '{
"--enable-metrics": "true",
"--enable-spark-ui": "true",
"--spark-event-logs-path": "s3://aws-glue-assets-xxx-ap-southeast-1/sparkHistoryLogs/",
"--enable-job-insights": "true",
"--enable-observability-metrics": "true",
"--enable-glue-datacatalog": "true",
"--enable-continuous-cloudwatch-log": "true",
"--job-bookmark-option": "job-bookmark-disable",
"--job-language": "python",
"--TempDir": "s3://aws-glue-assets-xxx-ap-southeast-1/temporary/",
"--glue_database": "xxx-mysql",
"--glue_table": "xxx_db_xxx_files",
"--dynamodb_table_arn": "arn:aws:dynamodb:ap-southeast-1:xxx:table/xxx-files",
"--s3_bucket": "aws-glue-assets-xxx-ap-southeast-1",
"--s3_prefix": "temporary/ddbexport/"
}' \
--connections '{
"Connections": ["mysql-connection"]
}' \
--max-retries 0 \
--timeout 2880 \
--worker-type "G.1X" \
--number-of-workers 10 \
--glue-version "4.0" \
--execution-property '{
"MaxConcurrentRuns": 1
}' \
--execution-class "STANDARD"
您可以直接在 AWS Web Console 启动 Job,也可以使用如下命令启动 Job
aws glue start-job-run \
--job-name "dynamodb-to-mysql" \
--arguments '{
"--glue_database": "xxx-mysql",
"--glue_table": "xxx_db_xxx_files",
"--dynamodb_table_arn": "arn:aws:dynamodb:ap-southeast-1:xxx:table/xxx-files",
"--s3_bucket": "aws-glue-assets-xxx-ap-southeast-1",
"--s3_prefix": "temporary/ddbexport/"
}'
四、总结
该方案通过 AWS Glue 实现了从 AWS DynamoDB 到 AWS Aurora MySQL 的数据迁移,利用 Crawler 自动探索 AWS Aurora MySQL 表结构并存储到 AWS Glue Data Catalog,再通过 ETL Job 完成数据的抽取、转换和加载过程,具有全托管、自动化程度高、可扩展性强等优势。
本篇作者