亚马逊AWS官方博客

使用AWS托管MSK Connector和EMR Flink从AWS RDS进行CDC数据消费

前言

对于CDC(change data capture)进行近实时的消费和计算的需求,是越来越多企业在大数据领域日益迫切的需求。比如,互联网券商在如下场景需要对数据做实时的计算:

  1. 股票价格的实时变化进行账户持仓分析
  2. 盘中对成交价格的实时计算和分析

因为大多数的企业在现有的大数据平台上,已经搭建了T+1的大数据平台,我们希望在步改变现有的数据分析路径和逻辑的前提下:

  1. 使用RDS read replica进行CDC数据变更的捕捉
  2. 使用MSK托管Connector进行数据CDC的发布和订阅
  3. 使用基于EMR的Flink进行MSK订阅数据的计算
  4. 进行Secret Manager, IAM 和MSK的集成

架构设计

整体架构图如下所示:

设计要点:

  • 开启RDS binlog使用RDS/Aurora read replica进行数据抽取,避免影响生产系统
  • 托管MSK Connector,无需维护Connector实例
  • 托管EMR,免运维运行Flink
  • 集成AWS Secret Manager,无需明文在托管MSK Connector中暴露数据库用户名和密码

EMR版本为6.4,Flink版本为1.13

整体过程

AWS RDS

1. 开启RDS二进制日志,以及二进制日志的留存时间

call mysql.rds_set_configuration('binlog retention hours', 168);

2. 编辑RDS参数组二进制日志设置为, format为ROW

创建MSK

这里使用同时支持IAM和明文认证的MSK集群。

如下为MSK的配置,其中replication factor为3,auto create topic 为true。

auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=24
num.replica.fetchers=4
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
log.retention.hours=6
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
max.incremental.fetch.session.cache.slots=10000

如下为MSK broker配置,采用3 az,每az 1 broker:

选择上一步创建好的MSK配置:

认证方式同时选择

  • Unauthenticated access (后续Flink端在测试过程中使用plaintext方式连接)
  • IAM role-based authentication

查看集群连接字符串:

测试MSK;

创建客户端配置文件client-config.properties

# Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL

# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM

# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;

# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler

1. 使用console producer生产数据:

kafka-console-consumer.sh --bootstrap-server b-2.xxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-4.xxxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-3.xxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098 --topic junjie-xxxxxx-topic --from-beginning --consumer.config client-config.properties

2. 使用相同客户端配置,在另一终端消费数据

至此,MSK集群创建完毕,并能够成功进行数据的生产和消费。

创建托管MSK Connector

1. 创建MSK Connector worker configuration

MSK worker configuration是影响 Kafka Connect 集群中的配置存储、工作分配以及偏移量和任务状态存储的设置和参数, 比如对JSON数据的格式转换,这里我们会忽略元数据的转换,只集中在数据库表数据的转换。MSK connector worker configuration的配置如下:

key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
config.providers=secretManager
config.providers.secretManager.param.aws.region=ap-southeast-1

其中针对schema的转换我们设置为false,对key使用StringConverter,对value的转换使用JsonConveter。

因为在MSK 连接器连接数据库需要使用到用户名和密码,在本文中我们使用AWS Secret Manager来保存对应数据库的用户名和密码。

config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider

创建完成的MSK Connector如下:

2. 在Secret Manager中创建保存数据库用户名和密码的Secret

3. 创建MSK Debezium Connector plugin

https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.1.Final/debezium-connector-mysql-1.6.1.Final-plugin.tar.gz下载debezioum mysql plugin

解压缩之后把下载之后的plugin压缩成zip包上传到S3

使用该上传的zip包制作custom plugin

4. 创建MSK Connector

创建MSK Connector过程中

  • 选择上一步准备好的custom plugin

  • 使用创建MSK过程中创建的MSK,并选择认证方式为IAM

  • 完善Connector Configuration

完整配置如下:

connector.class=io.debezium.connector.mysql.MySqlConnector
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.user=${secretManager:xxxxxx-database-secret:dbusername}
database.server.id=2234
tasks.max=1
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.security.protocol=SASL_SSL
database.history.kafka.topic=xxxxxx.quotation
database.history.kafka.bootstrap.servers=b-2.xxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-1.xxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098
database.server.name=junjie-sgp-xxxxx
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.consumer.security.protocol=SASL_SSL
database.port=3306
include.schema.changes=true
database.hostname=junjie-sgp-xxxx.xxxxxxx.ap-southeast-1.rds.amazonaws.com
database.password=${secretManager:xxxxx-database-secret:dbpassword}
database.history.consumer.sasl.mechanism=AWS_MSK_IAM
database.include.list=xxx

注意这里使用AWS secret manager中的secret来连接到数据库:

database.user=${secretManager:xxxxx-database-secret:dbusername}
database.password=${secretManager:xxxxx-database-secret:dbpassword}

配置MSK的连接参数为IAM连接字符串

database.history.kafka.bootstrap.servers=b-2.xxxxx-msk-flink.xxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-1.xxxx-msk-flink.xxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098

选择上述过程中的worker configuration,并配置所需的MSK connector所需的角色(参考:https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html

选择日志组,让connector的日志打到cloudwatch日志组:

创建完成后,观察对应日志组输出:

验证MSK Connector是否可以成功从RDS里消费CDC数据:

查看自动创建的topics:

kafka-topics.sh --bootstrap-server b-2.xxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-4.xxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-3.xxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098 --list --command-config client-config.properties

消费其中一个表的CDC,其中topic的格式为<connector里的database.server.name>.<databas ename>.<table name>, 如junjie-sgp-hstong.hst.flink

kafka-console-consumer.sh --bootstrap-server b-2.xxxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-4.xxxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-3.xxxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098 --topic junjie-sgp-xxxxxx.xxx.flink --from-beginning --consumer.config client-config.properties

于对应数据库插入测试数据,观测消费端输出:

至此,MSK Connector可以成功从数据库消费数据.

创建Flink on EMR

Flink on EMR集群配置如下:

EMR 版本: 6.4.0

Flink 版本: 1.13.1

本文采用单master节点的EMR。

创建完成后使用master DNS登陆EMR集群:

准备flink的Flink table connector,下载Source connector到/lib/flink/lib:

curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.13.1/flink-connector-kafka_2.11-1.13.1.jar

测试验证

使用flink SQL Client进行验证

登陆EMR master节点,并运行如下命令进入flink sql终端:

cd /lib/flink && ./bin/yarn-session.sh –detached
./bin/sql-client.sh
SET execution.checkpointing.interval = 1min;

进入Flink SQL交互客户端

创建flink table,这里对应数据库里的table<flink>:

CREATE TABLE Flink (
      `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `id` BIGINT,
      `name` STRING,
      `addr` STRING,
       WATERMARK FOR event_time AS event_time
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'junjie-sgp-xxxx.xxx.flink', -- created by debezium connector, corresponds to CUSTOMER table in Amazon Aurora database.
      'properties.bootstrap.servers' = 'b-5.xxxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-1.xxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-2.xxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9092',
      'properties.group.id' = 'xxxxx-table-group',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    );

执行Flink 表的查询

select * from Flink;

在数据库进行数据插入和更新

数据插入

数据更新

总结

  • 使用托管的MSK和MSK连接器,对RDBMS的CDC数据进行近实时的提取,减少了维护MSK和MSK连接器的运维负担。
  • 使用托管的EMR+Flink,能够近实时地消费在MSK发布的CDC数据。
  • 客户可以基于这一架构进行CDC数据的计算,进行流式架构的推进。

参考:

  1. https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/mysql_rds_set_configuration.html
  2. https://catalog.us-east-1.prod.workshops.aws/workshops/c2b72b6f-666b-4596-b8bc-bafa5dcca741/en-US/mskconnect/source-connector-setup
  3. https://aws.amazon.com/blogs/aws/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/

本篇作者

李俊杰

亚马逊云科技解决方案架构师,负责云计算方案的咨询与架构设计,同时致力于容器方面研究和推广。在加入亚马逊云科技之前曾在金融行业IT部门负责传统金融系统的现代化改造,对传统应用的改造,容器化具有丰富经验。

张世浩

亚马逊云科技解决方案架构师,负责企业客户的解决方案咨询与架构设计优化,现致力于容器和IoT相关领域的研究。