使用 Amazon Athena 查询分析 Apache Paimon 数据
使用 Zero ETL 将关系型数据库入仓到 Redshift 进行分析
本文将探索使用 Amazon Athena 来对 Apache Paimon 表中的数据进行查询分析,包括使用 Athena for Apache Spark 以及开发 Athena Paimon Connector,使用 Athena Federated Query 这两种方式来连接和查询 Paimon 表。
1. 使用 Athena for Apache Spark 查询分析 Apache Paimon 数据
Athena for Apache Spark 使用 Spark 来进行交互式数据分析和探索,不需要用户关心底层计算资源的规划、配置和管理,通过提供的 Notebook 环境,用户可以直接提交 Spark 代码或者 SQL 来处理和接收结果数据,而不需要进行额外的配置。Athena for Apache Spark 目前已经支持 Hive 表格式 和 Apache Iceberg,Hudi,Delta Lake 这三个开源的非 Hive 表格式。接下来我们使用 Athena for Apache Spark 来查询和分析 Paimon 表数据。
首先,我们在 Amazon Athena 中创建一个 WorkGroup,并选择 Apache Spark 作为分析引擎,其余的保留默认选项即可。
使用默认配置创建好 WorkGroup 后,系统将自动为该 WorkGroup 对应的 Service Role,为了能让 Athena for Apache Spark 使用 Amazon Glue Data Catalog,需要给该 Service Role 添加 Glue 相关的权限。方便起见,可以给该 Service Role 在 IAM 中添加“AWSGlueConsoleFullAccess” 这个 Policy。
然后,我们在 Notebook editor 中选择创建的 WorkGroup,创建 Notebook,如下图所示:
在 Apache Spark properties 选项选择“Custom”,并填入以下属性(需先将相关的 Jar 包下载后放到 S3 中,Athena for Apache Spark 的 runtime 是 Spark 3.2 版本,建议下载对应版本的 Apache Paimon Jar 包):
{
"spark.jars": "s3://path/of/paimon-spark-3.2.jar,s3://path/of/paimon-hive-connector-common.jar",
"spark.sql.catalog.paimon": "org.apache.paimon.spark.SparkCatalog",
"spark.sql.catalog.paimon.warehouse": "s3://path/of/paimondb/",
"spark.sql.extensions": "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions"
}
创建好 Notebook 之后,就可以以交互式的方式进行 Paimon 表数据的查询分析。在 Notebook 中,我们既可以执行 Spark 代码,也可以通过 Magic Command “%%sql” 来执行 Spark SQL 语句,Athena for Apache Spark 还可以直接读取存储在 Glue Data Catalog 中的元数据信息。
1.1 使用 Glue Data Catalog,并选择使用 Apache Paimon 数据库
use paimon_db;
1.2 使用 Spark SQL 来查询统计 Paimon 数据表信息
select province_code, item_id, sum(item_cnt) as item_sold_cnt
from dwd_order
group by province_code, item_id
order by item_sold_cnt desc
limit 5;
1.3 使用 PySpark 代码来查询统计 Paimon 数据表信息
from pyspark.sql import functions as sf
df = spark.read.format("paimon")\
.load("s3://PAIMON/TABLE/PATH")\
.groupBy('province_code', 'item_id') \
.agg(sf.sum('item_cnt').alias('item_sold_cnt')) \
.orderBy(sf.desc('item_sold_cnt')) \
.select('province_code', 'item_id', 'item_sold_cnt') \
.limit(5)
df.show()
1.4 使用 PySpark 对 Paimon 表进行 Time Travel 方式读取数据
df = spark.read \
.option("scan.snapshot-id", 50) \
.format("paimon") \
.load("s3://PAIMON/TABLE/PATH")\
.select('order_no','province_code', 'item_id', 'item_cnt')
df.show(5)
1.5 使用 PySpark 对 Paimon 表进行 Batch Incremental 方式读取数据
df = spark.read \
.format("paimon") \
.option("incremental-between", "30,35") \
.load("s3://PAIMON/TABLE/PATH") \
.select('id', 'item_name', 'item_spec', 'item_cat1_code')
df.show(5)
1.6 使用 Spark SQL 创建 Paimon 分区表,并插入数据
set hive.exec.dynamic.partition.mode=nonstrict
CREATE TABLE dwd_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
);
insert into dwd_table values (1, 1, 'buy', '20240229', 12),
(2, 2, 'buy', '20240229', 12), (3, 3, 'buy', '20240225', 11);
创建表并插入数据后, 可以查看建立的分区信息:
以上是使用 Athena 的 Spark Engine 以 Notebook 的方式来对 Apache Paimon 中的数据进行查询分析。总体来看,Athena for Apache Spark 可以比较好地支持 Apache Paimon 的各项功能,如 Batch Read,Batch Write,Time Travel,Create Table 等,但由于 Athena for Apache Spark 的 runtime 基于 Spark 3.2 版本,Athena for Apache Spark 还不支持 Paimon 的 Streaming Read / Write 功能。
2. 使用 Athena Federated Query 查询分析 Apache Paimon 数据
Amazon Athena 允许用户以标准 SQL 的方式查询分析存储在 Amazon S3 中的非结构化、半结构化和结构化的数据,如 CSV,JSON,Apache Parquet,Apache ORC 等,使用户可以快速地从数据中获得洞察。对于非 Amazon S3 中的数据或者还不支持的数据格式,可以通过 Athena Federated Query 方式来构建数据管道来抽取目标数据源的数据,用 SQL 来跨数据源进行查询分析,具有很好的开放能力。
Athena Federated Query 使用运行在 AWS Lambda 中的 Data Source Connector 来运行联邦查询。Data Source Connector 是目标数据源和 Athena 之间的桥梁。Athena 已经提供了像 Amazon CloudWatch Logs,Amazon DynamoDB,Amazon DocumentDB,Amazon RDS 等预先构建好的 Data Source Connector。用户也可以通过 Athena Query Federation SDK 来构建自定义的 Data Source Connector,本文将基于此 SDK 来构建 Apache Paimon Data Source Connector,实现通过标准 SQL 来查询 Apache Paimon 表中的数据。
2.1 构建 Apache Paimon Data Source Connector
如何使用的 Athena Query Federation SDK 来构建 Data Source Connector 可以参考这个说明文档。Athena 和 Data Source Connector 的协作过程如下图所示:
- 用户提交 Athena SQL 查询
- Athena 引擎对 SQL 进行语法分析后,生成逻辑执行计划和物理执行计划
- 物理计划执行过程中, 将对目标数据集的基本操作代理给 Data Source Connector 执行。比如获取目标表的字段元数据信息,根据条件进行数据查询等操作
- Data Source Connector 和目标数据源进行交互,执行Athena 引擎代理过来的具体操作,并将执行结果返回给 Athena 引擎
- Athena 引擎按执行计划对数据进行处理,如数据合并、聚合计算等,并将最后的执行结果返回给用户
使用的 Athena Query Federation SDK 构建自定义的 Data Source Connector 只需要用户实现 MetadataHandler 和 RecordHandler 这两个接口,其中:
- MetadataHandler 接口主要是获取数据库、表、分区等元数据信息,并返回 Athena 引擎底层数据源支持的优化选项,比如是否支持分区减枝、谓词下推、Limited 扫描
- RecordHandler 接口主要功能是实现具体的数据获取,根据 Athena 引擎给的查询条件从目标数据源获取数据,并将数据转换成 Athena 的数据格式
结合 Athena Query Federation SDK 以及 Apache Paimon 的开源代码,本文实现了 Athena Paimon Data Source Connector,具体代码见 Github Repo。代码结构如下 UML 图所示:
- PaimonMetadataHandler: 该类继承MetadataHandler, 通过 Apache Paimon 的 Catalog 获取数据库、表、分区等元数据信息, 并返回Athena 支持 分支修剪、Limited 扫描以及查询条件下推等优化方法
- PainmonRecordHandler: 该类继承RecordHandler, 通过 Apache Paimon 的 Catalog 实现数据过滤,数据查询
- PaimonCompositeHandler: 该类组合PaimonMetadataHandler 和 PainmonRecordHandler 两个类, 作为 Connector Lambda function 的入口
- AthenaTypeUtils: Apache Paimon 数据类型和Athena 数据类型映射的工具类
- PushdownUtils: 根据Athena 给的查询条件, 构建分支优化、Limited 扫描、谓词下推等优化场景下的Apache Paimon 查询条件
- ValueExtractUtils: 从Paimon中提取数据的工具类
2.2 部署 Apache Paimon Data Source Connector
部署 Apache Paimon Data Source Connector 时,本地需要安装 Git,Java 11+,Docker,AWS SAM CLI,AWS CLI 环境,并配置好 AWS Credentials。Athena Data Source Connector 以 AWS Lambda Function 的方式运行,而 Lambda 在以 Zip 包方式上传部署时,要求解压缩后的大小不能超过 250MB。由于开发的 Paimon Data Source Connector 依赖的包体较大,解压后超过 250MB 的限制,所以在部署该 Connector 时,将采用自定义 Lambda 镜像的方式进行部署。
以 AWS “us-west-2” Region 为例,部署 Apache Paimon Data Source Connector 过程如下:
2. 编译、打包成镜像,并推送镜像到 Amazon ECR 仓库中
# 1. git clone 代码
git clone https://github.com/Moonlight-CL/athena-paimon-federated-query.git
# 2. 进入 athena-paimon 目录,编译 Paimon Connector
mvn clean package -DskipTests
# 3. Docker Cli 认证以及创建 ECR 私有镜像仓库:athena-connector
aws ecr get-login-password --region us-west-2 | docker login --username AWS --password-stdin YourAccoutID.dkr.ecr.us-west-2.amazonaws.com
aws ecr create-repository --repository-name athena-connector --region us-west-2 --image-scanning-configuration scanOnPush=true --image-tag-mutability MUTABLE
# 4. 构建本地 Docker镜像,打 Tag,并推送镜像到 AWS ECR 远程仓库
docker build --platform linux/amd64 -t athena-paimon .
docker tag athena-paimon:latest YourAccoutID.dkr.ecr.us-west-2.amazonaws.com/athena-connector:latest
docker push YourAccoutID.dkr.ecr.us-west-2.amazonaws.com/athena-connector:latest
2. 创建 AWS SAM 部署时需要的配置文件 samconfig.yaml,内容如下
version: 0.1
default:
deploy:
parameters:
stack_name: 部署时的 CloudFormation Stack 名,如:athena-paimon-connector
region: us-west-2
s3_bucket: 保存部署模版的 S3 bucket
s3_prefix: 保存部署模版的 S3 路径
confirm_changeset: true
capabilities: CAPABILITY_IAM
parameter_overrides:
- AthenaCatalogName = 在 Athena 中的 Catalog 名称,也是 Lambda 的名称,如:paimon
- SpillBucket = Athena 计算过程中 Spill 数据的保存 S3 bucket,如:paimon-resc
- SpillPrefix = Athena 计算过程中 Spill 数据的保存路径,如:athena-spill
- PaimonDataBucket = Paimon 数据存储的 S3 bucket,如:paimon-resc
- PaimonDataPrefix = Paimon 数据存储的 S3 路径,如: data/paimon/
- AWSAccessKey = 访问 Paimon S3 数据的 Access Key
- AWSSecretKey = 访问 Paimon S3 数据的 Secret Key
- AWSSessionToken = 可选,访问 Paimon S3数据可以基于SessionToken来认证
- LambdaTimeout = Lambda 超时时间(秒),如:900
- LambdaMemory = Lambda 的内存大小,如:3008
- DisableSpillEncryption = 是否禁用 Spill 数据加密,如:true
3. 使用 SAM 命令进行部署
sam deploy -t athena-paimon.yaml --region us-west-2 \
--s3-bucket emr-resc --image-repository YourAccoutID.dkr.ecr.us-west-2.amazonaws.com/athena-connector \
--config-file ./samconfig.yaml
部署过程将会产生 AWS CloudFormation 堆栈,并创建 AWS Lambda 函数以及相关的执行角色,部署成功后如下图所示:
2.3 查询分析 Apache Paimon 数据
Apache Paimon Data Source Connector 部署成功后,就可以利用 Athena Federated Query 功能进行 Paimon 数据的查询分析。指定数据表进行查询时,表名需按照"lambda:"."schema"."table"的形式指定,其中的 function_name 和用 SAM 部署时指定的 AthenaCatalogName 一致。
1)查询数据库、数据表以及表字段元数据信息
-- show databases in `lambda:paimon`;
-- show tables in `lambda:paimon`.paimon_db;
describe `lambda:paimon`.paimon_db.dwd_order;
2)使用标准 SQL 查询统计 Paimon 表中数据如下
select province_code, item_id, sum(item_cnt) item_sold_cnt
from "lambda:paimon"."paimon_db"."dwd_order"
group by province_code, item_id
order by item_sold_cnt desc
limit 5;
可以看到查询结果与之前使用 Athena for Spark 使用 Spark SQL 查询结果一致,查询耗时 6.58 秒,扫描 1.56 MB 的底层数据。
3)使用 Athena 的聚合函数并结合过滤条件进行查询
select array_agg(item_cat3_code) filter (where id < 4)
from "lambda:paimon"."paimon_db"."dwd_order";
从查询结果可以看出,通过创建自定义的 Apache Paimon Data Source Connector,结合 Athena 的 Federated Query 功能,可以充分利用 Athena 的相关函数对 Paimon 数据进行聚合查询、统计分析,并且通过实现谓词下推优化,将过滤条件下推到 Paimon 层执行过滤,减少底层数据扫描量。
以上是通过自定义实现 Apache Paimon Data Source Connector,部署成 AWS Lambda Function 后,利用 Athena Federated Query 特性,实现通过标准的 SQL 对 Apache Paimon 数据库、表元数据以及业务数据的查询统计分析,具有以下优势:
- Athena 和 Apache Paimon Data Source Connector 都是 Serverless 的,不需要管理底层资源,不需要部署其他计算引擎。
- 通过标准 SQL 对 Paimon 数据进行查询分析,结合 Apache Paimon 强大的数据摄取和流式数据湖能力,可以快速从业务数据中获得业务洞察。
这种方式目前不支持 Create Table 以及 Alter Table 特性。执行的查询分析是基于 Paimon 的 Batch Read,不过这点与目前 Apache Paimon 的 Trino 和 Presto 引擎能力相同。
总结
本文探索了两种通过 Amazon Athena 对 Apache Paimon 数据进行查询、统计的方法,其中:
- 通过 Athena for Spark 引擎方式:
- 这种方式通过 Spark SQL 或者 PySpark 代码的方式来查询 Paimon 表数据
- 可以支持 Apache Paimon 的 Batch Read,Time Travel Query,Batach Incremental Query,创建数据表等功能
- Serverless 架构,不需要关心底层计算资源的管理,并以 Notebook 的方式进行查询交互
- 通过 Athena Federated Query 结合自定义 Data Source Connector 方式:
- 以标准 SQL 查询统计 Paimon 数据,并支持分区减枝、Limited Scan、谓词下推等优化,并能充分利用 Athena 函数和并行计算能力
- 同样也是 Serverless 架构,不需要关心底层计算资源的管理
- 仅支持 Apache Paimon Batch Read 查询
这两种方式充分表明亚马逊云科技服务的开放性,正是这种开放能力可使 Apache Paimon 在 Amazon Athena 平台中可以比较好地适配运行。