业务背景&痛点
- 流式处理的业务场景,经常会遇到实时消息数据需要与历史存量数据关联查询或者聚合,比如电商常见的订单场景,订单表做为实时事实表,是典型的流式消息数据,通常会在kafka中,而客户信息,商品SKU表是维度表,通常存在业务数据库或者数仓中,是典型的离线数据。实时订单数据在实时处理时通常需要事实表与维度表join做reference补全,以便拿到订单详情并实时统计当天或截至当天的所有订单的商品分布详情。
- 流式计算通常采用Flink做为数据处理平台,上文中提到的实时和离线数据join的场景,Flink提供了Hive/ jdbc/Hudi/ filesystem各种connector实现与离线数据的提取和读写,这样一来在Flink 应用程序中,即可使用Table,Sql API来join关联流态表和离线表数据,实现聚合计算等操作
使用Flink Sql 离线表Join流态表的常规lookup join,是通过Flink hive sql connector或者filesystem connector,对离线hive库表或者S3上离线数据建Flink Table,然后对kafka消息流中的数据建流态表,然后直接做量表做join操作
该方式架构如下图所示:
该方式主要面临的问题是:
- lookup维度表数据只会在首次拉起Flink应用的时候,保存在task manager state中,后续持续查询或者开窗聚合等操作时,是不会再次拉取维度表数据,业务需要定期重启Flink应用,或者刷新维度表数据到临时表,以便join聚合时和最新的维度数据关联:
- 每次需要重新全量拉取维度表数据,存在冷启动问题,且维度表数据量大的时候(如上千万注册用户信息表,上万的商品SKU属性字段),造成很大IO开销,存在性能瓶颈
- Flink的checkpoint机制在持续查询或者开窗聚合时,需要保存state状态及处理数据到检查点快照中,造成state 快照数据膨胀
解决方案思路
基于以上业务难点,本文提出一种解决方案思路,即通过Alluxio缓存层,将hive维度表数据自动加载至Alluxio UFS缓存中,同时通过Flink时态表join,把维度表数据做成持续变化表上某一时刻的视图
同时使用Flink的Temporal table function表函数,传递一个时间参数,返回Temporal table这一指定时刻的视图,这样实时动态表主表与这个Temporal table表关联的时候,可以关联到某一个版本(历史上某一个时刻)的维度数据
优化后的整体架构如下图所示:
方案实施落地Detail
本文以Kafka中用户行为日志数据做为实时流态的事实表数据,hive上用户信息数据做为离线维度表数据,采用Alluxio+Flink temproal 的demo,来验证其flink join优化的解决方案
实时事实表
本实例中我们使用json-data-generator开源组件模拟的用户行为json数据,实时写入kafka中,通过Flink kafka connector转换为持续查询的Flink流态表,从而做为实时join的时候的Fact事实表数据
用户行为json模拟数据如下格式:
[{ "timestamp": "nowTimestamp()",
"system": "BADGE",
"actor": "Agnew",
"action": "EXIT",
"objects": ["Building 1"],
"location": "45.5,44.3",
"message": "Exited Building 1"
}]
包含用户行为的业务时间,登录系统,用户署名,行为activity动作,操作涉及对象,位置信息,及相关文本消息字段。我们在
Flink Sql 中建选择主要字段建事实表如下
CREATE TABLE logevent_source (`timestamp` string,
`system` string,
actor STRING,
action STRING
) WITH (
'connector' = 'kafka',
'topic' = 'logevent',
'properties.bootstrap.servers' = 'b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092 (http://b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092/)',
'properties.group.id' = 'testGroup6',
'scan.startup.mode'='latest-offset',
'format' = 'json'
);
Alluxio缓存维度表
Alluxio是大数据技术堆栈的分布式缓存,它提供了一个统一的UFS文件系统可以对接底层S3,hdfs数据,在读写Alluxio UFS的时候,可以针对S3,HDFS分布式存储层实现warm up,显著提升吞吐量和减少网络开销,且与上层计算引擎如Hive,spark,Trino都有深度的集成,很适合做为离线维度数据的缓存加速器
AWS EMR对Alluxio提供了良好的集成,可以通过boostrap启动脚本方式,在EMR创建时自动部署Alluxio组件并启动Alluxio master、worker进程,详细EMR安装和部署Alluxio步骤可以参考另一篇文章 Alluxio EMR 集成实践
在集成Alluxio的AWS EMR集群中,使用Alluxio中创建hive离线维表数据的缓存表方法如下:
hive-env.sh中设置设置client jar包:
$ export HIVE_AUX_JARS_PATH=/<PATH_TO_ALLUXIO>/client/alluxio-2.2.0-client.jar:${HIVE_AU
确保安装部署alluxio的EMR集群上ufs已配置,并且表或者db路径已创建
alluxio fs mkdir alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer
alluxio fs chown hadoop:hadoop alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer
在AWS EMR集群上,创建hive表路径指向alluxio namespace uri:
!connect jdbc:hive2://xxx.xxx.xxx.xxx:10000/default;
hive> CREATE TABLE customer(
c_customer_sk bigint,
c_customer_id string,
c_current_cdemo_sk bigint,
c_current_hdemo_sk bigint,
c_current_addr_sk bigint,
c_first_shipto_date_sk bigint,
c_first_sales_date_sk bigint,
c_salutation string,
c_first_name string,
c_last_name string,
c_preferred_cust_flag string,
c_birth_day int,
c_birth_month int,
c_birth_year int,
c_birth_country string,
c_login string,
c_email_address string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION 'alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer';
OK
Time taken: 3.485 seconds
如上所示,该Alluxio表location指向的路径即为hive维度表所在S3路径,因此对Customer用户维度信息表的写入操作会自动同步到alluxio缓存中。
创建好Alluxio hive离线维度表后,在flink sql中,可以通过hive的catalog,连接到hive元数据,即可以查看到alluxio缓存表的详细信息:
CREATE CATALOG hiveCatalog WITH ( 'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/etc/hive/conf/',
'hive-version' = '3.1.2',
'hadoop-conf-dir'='/etc/hadoop/conf/'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG hiveCatalog;
show create table customer;
create external table customer(
c_customer_sk bigint,
c_customer_id string,
c_current_cdemo_sk bigint,
c_current_hdemo_sk bigint,
c_current_addr_sk bigint,
c_first_shipto_date_sk bigint,
c_first_sales_date_sk bigint,
c_salutation string,
c_first_name string,
c_last_name string,
c_preferred_cust_flag string,
c_birth_day int,
c_birth_month int,
c_birth_year int,
c_birth_country string,
c_login string,
c_email_address string
)
row format delimited fields terminated by '|'
location 'alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/30/customer'
TBLPROPERTIES (
'streaming-source.enable' = 'false',
'lookup.join.cache.ttl' = '12 h'
)
如上图所示,可以看到该维度表location路径是alluxio缓存ufs路径的uri,业务程序读写该维度表时,alluxio会自动更新缓存中的customer维度表数据,并异步写入到alluxio的backend storage的S3表路径,实现数据湖的表数据同步更新。
Flink Temporal 时态表join
Flink时态表(Temporal table)也是动态表的一种,时态表的每条记录都会有一个或多个时间字段相关联,当我们事实表join维度表的时候,通常需要获取实时的维度表数据做lookup,所以通常需要在事实表create table或者join时,通过proctime()函数指定事实表的时间字段,同时在join时,通过FOR SYSTEM_TIME AS OF语法,指定维度表lookup时对应的事实表时间版本的数据
在本Demo示例中,客户信息在hive离线表作为一个变化的维度表的角色,客户行为在kafka中作为事实表的角色,因此在flink kafka source table中,通过proctime()指定时间字段,然后在flink hive table做join 时,使用FOR SYSTEM_TIME AS OF指定lookup的kafka source table的时间字段,从而实现Flink temporal 时态表join业务处理
如下所示,Flink Sql中通过Kafka connector创建用户行为的事实表,其中ts字段即为时态表join时的时间戳:
CREATE TABLE logevent_source (`timestamp` string,
`system` string,
actor STRING,
action STRING,
ts as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'logevent',
'properties.bootstrap.servers' = 'b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092 (http://b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092/)',
'properties.group.id' = 'testGroup-01',
'scan.startup.mode'='latest-offset',
'format' = 'json'
);
Flink 离线维度表与流式实时表具体join方法如下:
select a.`timestamp`,a.`system`,a.actor,a.action,b.c_login from
(select *, proctime() as proctime from user_logevent_source) as a
left join customer FOR SYSTEM_TIME AS OF a.proctime as b on a.actor=b.c_last_name;
如上代码示例,在事实表logevent_source join lookup维度表时,通过proctime函数获取到维度表的瞬时最新的版本数据,保障join时的一致性和实时性
同时,该维度表数据已经在alluxio cache,因此读取时性能远高于离线读取s3上的表数据
通过hive切换S3和alluxio路径的customer信息维度表,对比测试flink join可以看出alluxio缓存后性能明显优势
通过alter table方便切换本地和cache的location路径:
alter table customer set location "s3://xxxxxx/data/s3/30/customer";
alter table customer set location "alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/30/customer";
选取某一split数据分片的TaskManager日志:
2022-06-29 02:54:34,791 INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem [] - Opening 's3://salunchbucket/data/s3/30/customer/data-m-00029' for reading
2022-06-29 02:54:39,971 INFO org.apache.flink.table.filesystem.FileSystemLookupFunction [] - Loaded 433000 row(s) into lookup join cache
2022-06-29 03:25:14,476 INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem [] - Opening 's3://salunchbucket/data/s3/30/customer/data-m-00029' for reading
2022-06-29 03:25:16,397 INFO org.apache.flink.table.filesystem.FileSystemLookupFunction [] - Loaded 433000 row(s) into lookup join cache
在JobManager上查看Timeline,对比alluxio和s3路径下job的执行时间可以看到更加清楚
可以看到, 单个task查询提升1倍以上,整体job性能提升更加明显
其他需要考虑的问题
持续Join每次都需要拉取维度数据做join,Flink的checkpoint state是否一直膨胀导致TM的RockDB撑爆或者内存溢出?
state自带有ttl机制,可以设置ttl过期策略,触发Flink清理过期state数据,Flink Sql可以通过Hint方式设置
insert into logevent_sink
select a.`timestamp`,a.`system`,a.actor,a.action,b.c_login from
(select *, proctime() as proctime from logevent_source) as a
left join
customer/*+ OPTIONS('lookup.join.cache.ttl' = '5 min')*/ FOR SYSTEM_TIME AS OF a.proctime as b
on a.actor=b.c_last_name;
Flink Table/Streaming API类似:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter()
.build();
ValueStateDescriptor<Long> lastUserLogin =
new ValueStateDescriptor<>("lastUserLogin", Long.class);
lastUserLogin.enableTimeToLive(ttlConfig);
StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max);
设置后重新启动lookup join,从Flink TM日志中可以看到,ttl到期后,会触发清理并重新拉取hive 维表数据:
2022-06-29 04:17:09,161 INFO org.apache.flink.table.filesystem.FileSystemLookupFunction
[] - Lookup join cache has expired after 5 minute(s), reloading
此外,可以通过配置flink state retain,减少checkpoint时候快照数量,从而减少快照时候state的占用空间
Flink job中配置:
-D state.checkpoints.num-retained=5
设置后,可以看到s3 checkpoint路径上,Flink Job会自动清理历史快照,只保留最近的5次快照数据,从而确保checkpoint快照数据不会堆积
[hadoop@ip-172-31-41-131 ~]$ aws s3 ls s3://salunchbucket/data/checkpoints/7b9f2f9becbf3c879cd1e5f38c6239f8/
PRE chk-3/
PRE chk-4/
PRE chk-5/
PRE chk-6/
PRE chk-7/
附录
Alluxio整体架构
Alluxio on EMR 快速部署
在 Amazon EMR 中利用 Alluxio 的分层存储架构
EMR Alluxio集成detail
Flink Temporal Join 详细
本篇作者