亚马逊AWS官方博客

基于Amazon Lambda 和 DuckDB 构建轻量 OLAP 引擎

主流的OLAP数据分析引擎,比如Redshift, Spark,都是采用分布式架构将计算任务分发到各个工作节点上。利用大规模的机器集群来提高数据处理吞吐量。 但是如果我们只是分析处理几个G的数据,去搭建一个大数据集群,有点大材小用。数据大小能够加到单个主机内存,但又没有合适的工具可以使用。DuckDB就为这种场景应用而生。在单个主机,单进程完成数据分析工作。

DuckDB是基于SqlLite演化而来。它定位和SqlLite也非常类似。DuckDB于OLAP 的关系就和SqlLite于OLTP关系一样独特。最开始DuckDB宣传标语也是“DuckDB – The SQLite for Analytics”。 未来DuckDB是否能达SqlLite的普及程度,还有很长一段路要走。正如项目文档说明那样:

“有许多数据库管理系统(DBMS)。但是没有 one-size-fits-all 的数据库系统。所有这些都需要不同的权衡,以更好地适应特定的用例。DuckDB也不例外。”

DuckDB有丰富的OLAP功能和强悍单机性能:

1.完整的SQL语法支持。

2.列式向量查询计算引擎。

3.丰富的数据分析函数。

4.通用存储文件,支持计算存储分离。

5.支持事务ACID。

6.C++实现计算引擎。

笔者第一次了解到DuckDB功能后,就开始尝试将DuckDB部署到AWS Lambda。 整个过程比预想的要顺利很多。本文总结整个部署过程,结合serverless 灵活性能和DuckDB的数据处理功能,搭建轻量的OLAP引擎,

最后会基于NYC Taxi 2020和2021两年行程数据测试 DuckDB on Lambda 的性能和极限。

创建Lambda Layer:

Lambda Layer提供了一种方便的方法来打包库和其他可与 Lambda 函数搭配使用的依赖项。使用层可以缩小上传的部署存档的大小,并加快代码的部署速度。

层是可以包含其他代码或数据的 .zip 文件存档。层可以包含库、自定义运行时、数据或配置文件。层可促进代码共享和责任分离,以便您可以更快地迭代编写业务逻辑。由于Duckdb有依赖原生代码库,则必须使用 Linux 开发计算机编译和构建这些库,以便二进制文件与 Amazon Linux  兼容。需要在Amazon Linux 制作Lambda Layer.

详细说明请参考:https://docs.aws.amazon.com/zh_cn/lambda/latest/dg/configuration-layers.html

第一步,安装并制作压缩包。

pip install duckdb -t python/

zip -r duckdb.zip python

aws s3 cp duckdb.zip s3://<s3_bucket>

第二步:发布lambda layer

aws lambda publish-layer-version --layer-name duckdb \\
--content S3Bucket=<s3_bucket>,S3Key=duckdb.zip \\
--compatible-runtimes python3.7

创建Lambda :

你可以选择在界面控制台上或者使用命令行创建lambda函数。Lambda配置为10G内存。创建时候需要选择上一步骤制作的lambda layer。整个过比较简单,具体步骤这里就不赘述。

编写函数代码:

Lambda函数会支持3个输入参数:

1.parquet 文件列表。

2.执行处理语句。

3.结果输出路径。

本文重点关键DuckDB核心处理能力,读者可以自己考虑用API Gateway封装成http接口调用,方便使用。

函数代码主要实现以下逻辑:

1.安装加载httpfs插件

2.初始化AWS认证信息

3.将输入文件映射到视图:input_table

4.执行查询分析语句。将计算结果存储到输出表。

5.建输出结果表保存至

Python 源代码如下:

import json
import os
def lambda_handler(event, context):
import duckdb

env=os.environ
# to start an in-memory database
con = duckdb.connect(database=':memory:')

home_directory="/tmp/duckdb/"
if not os.path.exists(home_directory) :
    os.mkdir(home_directory)

#1.install httpfs plugin
con.execute(f"SET home_directory='{home_directory}';INSTALL httpfs;LOAD httpfs;")

#2.init aws credential
con.execute(f"SET s3_region='{env['AWS_REGION']}';")
con.execute(f"SET s3_access_key_id='{env['AWS_ACCESS_KEY_ID']}';")
con.execute(f"SET s3_secret_access_key='{env['AWS_SECRET_ACCESS_KEY']}';")
con.execute(f"SET s3_session_token='{env['AWS_SESSION_TOKEN']}';")

#3.maping input files to view input_table.
tbs=','.join(["'"+file+"'"  for file in event['input_files']])
sql="CREATE VIEW input_table AS SELECT * FROM read_parquet(["+tbs+"]);";
print(sql)
con.execute(sql)

#4.store query result to output table.
query=event['query']
con.execute(f" create table output_table AS {query};")
output_file=event['output_file']

#5.copy output table to s3 bucket.
con.execute(f"COPY output_table TO '{output_file}' (FORMAT PARQUET);")

1
return {
    'statusCode': 200,
    'body': json.dumps('Query Executed!')
}

数据准备:

我从AWS Marketplace 获取测试2020至2021年黄车行程数据: New York City Taxi and Limousine Commission (TLC) Trip Record Data 。 需要提前将数据复制你所在的region,防止跨区域数据传输影响测试时间。

文件名:yellow_tripdata_2020-00.parquet – yellow_tripdata_2021-12.parquet

总记录数据55,228,574 ,

文件数量:24个文件

文件大小: 大约900M

样例数据:

{
  "VendorID": 1,
  "tpep_pickup_datetime": "2020-01-01T00:28:15.000Z",
  "tpep_dropoff_datetime": "2020-01-01T00:33:03.000Z",
  "passenger_count": 1,
  "trip_distance": 1.2,
  "RatecodeID": 1,
  "store_and_fwd_flag": "N",
  "PULocationID": 238,
  "DOLocationID": 239,
  "payment_type": 1,
  "fare_amount": 6,
  "extra": 3,
  "mta_tax": 0.5,
  "tip_amount": 1.47,
  "tolls_amount": 0,
  "improvement_surcharge": 0.3,
  "total_amount": 11.27,
  "congestion_surcharge": 2.5
}

功能测试:

创建测试输入:

Lambda执行输出:

Duration: 1533.82 ms Billed Duration: 1534 ms Memory Size: 10240 MB Max Memory Used: 155 MB

数据输出:

性能测试:

性能测试实验一共准备5个查询语句,然后逐渐增加输入文件来观察执行事件的变化。

Query 1:

select * from input_table

不过滤数据,将多个输入文件夹合并成一个大的数据文件输出到s3

Query 2:

select * from input_table where passenger_count >2

会过滤出17%数据,输出到s3上

Query 3:

select * from input_table where passenger_count >3

会过滤出11%数据,输出到s3上

Query 4:

select strftime(tpep_pickup_datetime, '%d/%m/%Y') pickup_date ,count(*) trip_count,
               sum(passenger_count) total_passenger,sum(total_amount) amount 
               from input_table group by strftime(tpep_pickup_datetime, '%d/%m/%Y') 

按天统计乘客数量,费用等汇总信息。最后存储24条汇总数据到s3。大小为17.1 KB

测试结果:

总结:

对于查询4:  Duckdb 在Lambda上的性能表现令人满意。 对于900M,6千万条数据,24个parquet文件,能够在5秒内完成日期汇总统计。

从查询2,3: 能够看到Duckdb局限。由于是基于单进程处理数据, 随着数据量增加,查询的时长也会线性增加。 而其它OLAP数据库能够借助分布式的架构来均衡工作负载。

查询 1表现:  在输出数据量是2千5百万时候,数据文件大小是700M,Lambda函数就报错无法执行。

由于DuckDB只能输出单个Parquet文件到时S3,综合全部测试结果,可以看出输出数量大小会严重影响数据。未来可以考虑将文件保留惊喜测试。

DuckDB还是一个比较新的框架,行业内没有大规模在生产环境应用案例。读者可以根据自己的场景来测试DuckDB。但它的定位和性能表现也让很多技术专家充满想象,欢迎大家一起交流探讨。

本篇作者

李国建

李国建,专业服务团队,AWS大数据架构师。主要为客户提供数据处理和分析相关的架构设计和技术咨询服务。包括数据分析系统性能优化 ,数据平台架构和规划,实时流处理架构,数据湖架构等。在零售,金融等行业有丰富实践经验。