亚马逊AWS官方博客

实时数据处理中的AWS lambda (python) 性能优化

在数据处理领域,相比常见的离线批处理,今天越来越多的实时/准实时处理在各大企业中应用。在AWS云服务体系,与数据湖相关的实时处理可通过AWS Lambda / Glue Streaming / Amazon EMR / Kinesis Data Analytics 等多种方式实现。

这篇博客主要介绍基于事件触发AWS Lambda (python) 的实时数据处理所要面临的性能优化方向与实践,实时数据处理往往需要端到端低延迟为目标,该总结为从需求架构、程序、触发、IO等多个角度出发,说明性能优化可选用的方法及最佳实践。

一、需求及架构方面的优化

一个好的架构可以从根本上优化处理性能,所以在项目开始阶段,建议首先从架构层面考虑好以达到最佳性能。比如,是否可以减少历史数据的调用。

另外,实时处理由于有较紧的数据处理SLA,如果在需求评估时发现部分需求点可能对SLA有所影响,我们可以考虑从需求上作出调整。

二、程序上的调优

在程序上,我们可以从逻辑及语言自身特性上做出优化。

1. 逻辑的基本优化

(1)首先,考虑把程序初始化从循环体内移到循环体外,示例如下:

for key in s3_source:
	s3=boto3.resource('s3')
	…

s3=boto3.resource('s3')
for key in s3_source:

(2)再次,减少不必要的包引用或处理,比如import <library> 等,这常见于经过多次改版的程序。

(3)只选择需要的字段

AWS Lambda里往往会使用到pandas包的dataframe,dataframe中的字段并不一定都是我们所需要的。除了减少数据量,还可以通过减少需要处理的dataframe字段数目来降低dataframe的内存消耗,降低处理时长。

比如在python使用pandas读取 csv 文件时,使用 usecols 参数指定所需字段,可以有效减少内存使用。

(4)然后,把加工新增的字段放在排序操作之后,比如:

df = df.sort_values(['id', 'eventTime'], ignore_index=True)
df['idx'] = df.id  # 新增字段

而不是:

df['idx'] = df.id  # 新增字段
df = df.sort_values(['id', 'eventTime'], ignore_index=True)
这是为了减少待排序的数据量,对于海量数据排序操作会有明显的性能提升。

(5)使用条件过滤的时候,尽可能把过滤条件在数据处理过程中前移,其基本思想也是为了减少需要处理的数据量。

2. 基于不同语言如python对应的优化

(1)使用矢量化计算代替行操作

行操作是一次对一个值或一次对一组值进行运算。使用矢量化计算代替行操作的好处在于能充分利用CPU效率,在实践中往往能极大地提升处理性能。以下面两种处理方式为例。

方式A:

def calculatetime(df):
    if (df['starttime_y'] <= df['endtime_y']):
        return df['endtime_y']-df['starttime_y']
df[index_name] = df.apply(calculatetime, axis = 1)

方式B:

df.loc[(df['starttime_y'] <= df['endtime_y']), index_name] = df['endtime_y'] - df['starttime_y']

方式A使用apply函数,该函数并不能应用到矢量化计算(vectorization)的优势,而是相当于每条记录遍历处理,与在for循环中计算类似,而方式B则是利用了dataframe 的矢量化计算进行列操作。经测试,方式A比方式B可以多出100x – 1000x的处理时长。所以应该尽可能使用如方式B那样的矢量化计算,以达到较优的计算性能。

(2)数据类型替换

对于大量的数据,使用空间占用更少的数据类型(如:int, category替代object类型),或者降低同数据类型的占位数,可以有效地降低空间的占用,从而提升I/O及数据处理的效率。

(3)对象删除及垃圾回收

为了更高效地释放已不再使用的对象所占用的内存空间,我们可以选择手动删除对象及进行垃圾对象回收。示例如下:

import gc
var1 = ...
var2 = ...
del var1, var2
gc.collect()
gc.garbage

需要留意的是,对于大部分的情况,我们不需手工调用gc.collect(),python与很多语言类似,有自动垃圾回收的机制,特别是内存不足的情况下。

三、触发(trigger)方面的改进

(1)SQS Visibility timeout
当我们使用Amazon SQS(一种消息队列服务)触发Lambda时,可以通过调整SQS Visibility timeout 参数加快下一条消息的处理。该参数用于设置消息从一个消费者队列中接收但不可被其它消息消费者所看到的时长,所以该参数会影响到消息每次处理的等待时长。在创建或编辑SQS 时可调整以下配置项以修改该参数:

(2)SQS delete message

我们可以在程序里删除已处理完毕的SQS message,加快下一条message的接收。

示例:

queue_url = 'https://sqs.<region>.amazonaws.com/...'
sqs_client = boto3.client('sqs')
receiptHandle = event["Records"][0]["receiptHandle"]
sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receiptHandle)

四、I/O(Input/Output)的处理

由于物理存储的时延性,I/O(Input/Output)往往是实时数据处理的瓶颈之一。以下通过I/O合并、File compaction的示例来说明如何提升I/O处理效率。

(1)替代pandas的IO

对于pandas,内存生成原数据数倍大小的数据集,我们通过使用第三方库如pyarrow (或awswrangler) 替代pandas的I/O访问,利用内存映射计算特性(详见pyarrow介绍)可以提升数据的读写性能,也能更方便地支持批量读取(同一个bucket key下读取多个同schema的文件),同时在写入S3时支持动态分区(即无需在代码中指定分区的键值)。代码示例如下:

# Read S3 parquet files in the specific S3 bucket key
import pyarrow as pa
df = pa.parquet.ParquetDataset(path, filesystem=pa.fs.S3FileSystem())
.read_pandas().to_pandas()

# Write S3 parquet with data partitioning
table = pa.Table.from_pandas(df)
partition_cols = ['year', 'month', 'day']
pa.dataset.write_dataset(table, path, basename_template=file_name, format="parquet", 
partitioning=partition_cols, filesystem=pa.fs.S3FileSystem(), 
existing_data_behavior='delete_matching',
partitioning_flavor='hive')

# Read S3 csv files in the specific S3 bucket key
import awswrangler
df = awswrangler.s3.read_csv(path)

(2)基于内存处理后再落盘

在某些情况下,可以通过内存处理来达到Read/Write合并的效果。比如某个处理需要写入Amazon S3,读取出来加工,再写回,反复读写,就可以考虑在内存处理好,最后再统一存储/持久化,即只保留一次input及一次output。

(3)File compaction

对于同一个partition路径下高频生成小文件的情况,一般我们都会加上一个file compaction的逻辑,把大量的小文件合并成一个大文件,减少读文件的次数,从而提高每次访问的性能。代码示例如下:

df = pq.ParquetDataset(
path, filesystem=pa.fs.S3FileSystem()).read_pandas().to_pandas()
s3=boto3.resource('s3')
s3_source = s3.Bucket(bucket).objects.filter(Prefix=key)
count_obj = sum(1 for _ in s3_source.all())
if count_obj>1:
for key1 in s3_source:
copy_source = {
'Bucket': bucket,
'Key': key1.key
}
move_after_key = KEY + "pre-compaction/"+"/".join(key1.key.split('/')[1:])
if(len(list(s3.Bucket(bucket).objects.filter(Prefix=move_after_key)))==0):
s3.Bucket(bucket).copy(copy_source, move_after_key)
s3.Object(bucket,key1.key).delete()

iob_df=io.BytesIO()
df.to_parquet(iob_df,index=False)
boto3.client('s3', region_name=REGION).put_object(Bucket=bucket,
Key=key+target_file,Body=iob_df.getvalue())

五、其他

在应用了以上方法后,如果性能依然不满足用户需求,我们可考虑multiprocessing / async / callback异步处理以加快处理速度,但需要注意线程安全、对象锁及需要更多时间的异步测试。

另外,AWS Lambda有内存、执行时长的限制(见参考1),Lambda基于事件触发处理数据,更适合少量或中等数据量的场景。如果需要处理海量数据或考虑特别用户需求时,需要结合Glue、EMR、Redshift等服务进行选择。

以上都是正常情况下的优化方式,但有时候程序运行变慢可能是程序有bug或有异常数据,比如异常数据引起的数据扩散,导致处理的数据量剧增,这时候我们需要对代码的业务逻辑层进行优化。

结论

在实际客户案例中,优化前从SQS触发Lambda执行需要接近4分钟去处理,在应用了以上主要的优化方案后,平均只需约1分钟可完成处理,性能提升显著,内存消耗更低,从而能满足数据处理实时性的需求。

通过本文,你可以快速了解到实时数据处理中,AWS Lambda可应用的性能优化方法及实践。

如你有其它的问题或建议,欢迎留言给我们。

参考

[1] New for AWS Lambda – Functions with Up to 10 GB of Memory and 6 vCPUs

本篇作者

樊在阔

亚马逊云科技专业服务团队的数据架构师。他帮助客户设计和构建数据驱动解决方案,提供专家技术咨询、最佳实践指南和基于亚马逊云科技平台实施服务。

孔庆强

AWS专业服务团队的大数据顾问,十多年从事数据湖仓及数据分析,为客户提供数据建模、数据治理及整体专业数据解决方案。在个人爱好方面,喜爱健身、音乐、旅行。

李烨炜

AWS专业服务团队大数据咨询顾问。专注于企业级客户云上数据架构与数据平台设计等相关咨询服务。