亚马逊AWS官方博客

Amazon MWAA 性能优化实践

概述

MWAA 是 Apache Airflow 的托管式编排服务,让您能够更轻松地在云中大规模设置和操作数据管道;Apache Airflow 是一种开源工具,用于以编程方式编写、调度和监视统称为工作流程的各种流程和任务序列。借助 Amazon MWAA,您可以使用 Apache Airflow 和 Python 来创建工作流程,而无需管理底层基础设施以实现可扩展性、可用性和安全性。本文将深入探讨如何在大规模数据调度场景下优化 Amazon MWAA 的性能,为数据工程师和 DevOps 团队提供实用的最佳实践指南。

Apache Airflow 架构

Apache Airflow 的核心架构组件:

Scheduler

Scheduler 是 Airflow 的核心组件,负责周期性触发工作流并将任务提交给执行器运行. 它的主要职责包括:

  • 周期性扫描 DAG 文件夹,检测新的或更新的 DAG 定义
  • 根据 DAG 的调度设置触发工作流
  • 将准备就绪的任务提交给执行器
  • 监控任务执行状态并更新元数据库

WebServer

WebServer 提供了 Airflow 的用户界面,允许用户通过 HTTP 请求与 Airflow 交互。其主要功能包括:

  • 展示 DAG 和任务的执行状态
  • 提供任务日志查看接口
  • 允许用户手动触发、暂停或恢复任务
  • 提供 DAG 代码查看和编辑功能(在某些配置下)

Executor

执行器负责实际运行任务的组件。Airflow 支持多种执行器,如 LocalExecutor、CeleryExecutor和KubernetesExecutor 等;在 MWAA 中,使用的是 CeleryExecutor,它允许任务分布在多个 Worker 节点上执行。

Worker

Worker 是执行具体 DAG 任务的节点,在使用 CeleryExecutor 时,Worker 会监听 Celery 任务队列,接收并执行分配给它的任务。

DAG 文件夹

这是存放 DAG 定义文件的目录,调度器和执行器都需要能够访问这个文件夹以读取 DAG 定义。

元数据数据库

元数据数据库存储 Airflow 的各种状态信息,包括 DAG 运行历史、任务状态、变量和连接信息等。

Amazon MWAA 架构

Amazon MWAA 在保留 Apache Airflow 核心功能的同时,利用云服务的特点进行了优化和扩展:

  1. 基于 AWS Fargate 的容器化部署:MWAA 的调度器和 Worker 运行在 AWS Fargate 容器中,连接到用户 VPC 的私有子网;
  2. 托管元数据数据库:每个 MWAA 环境都有一个由 AWS 管理的 Aurora PostgreSQL 数据库作为元数据数据库;
  3. 灵活的访问模式:支持公共和私有两种 Apache Airflow Web 服务器访问模式;
  4. 集成 AWS 安全服务:通过 AWS Identity and Access Management(IAM)控制访问权限;
  5. 自动扩展能力:MWAA 可以根据工作负载自动调整资源。

MWAA 性能优化策略

要在大规模数据调度场景下充分发挥 MWAA 的性能,我们需要从多个角度进行优化:

1. 选择合适的 Environment Class

MWAA 提供了多种 Environment Class 选项,从 mw1.small 到 mw1.4xlarge 不等:

选择合适的 Environment Class 对性能至关重要:

  • 评估工作负载:分析您的 DAG 复杂度、任务数量和资源需求。
  • 监控资源使用:使用 AWS CloudWatch 监控 CPU、内存和网络使用情况。
  • 逐步扩展:从较小的 Environment Class 开始,并密切监控性能指标,根据需要逐步升级。

2. 优化 Scheduler 性能

Scheduler 性能有很多方面因素影响,比如 Scheduler 的实例配置,Scheduler 的参数配置以及优化 DAG 文件设计,下面我们可以分别进行讨论:

Scheduler 调度器实例配置

在 MWAA v2 版本中,可以配置多个调度器实例来提高性能:

  • 增加调度器数量:对于复杂的工作流和大量 DAG,增加调度器数量可以显著提高处理能力。
  • 调整 max_threads 参数:根据 DAG 复杂度和数量调整每个调度器的最大线程数。

建议从 2 个调度器实例开始,逐步增加并观察性能改善情况。

优化 Scheduler 调度器参数配置

  • scheduler.dag_dir_list_interval:这个参数控制调度器扫描 DAG 目录以查找新文件的频率,默认值:300 秒(5 分钟);如果新 DAG 添加不频繁,可以适当增加这个值,增加这个值可以减少 Scheduler 的工作负载。所以对于稳定的生产环境,可以考虑将此值设置为 600 秒(10 分钟)或更高,对于频繁更新 DAG 的开发环境,可以保持默认值或略微降低。
  • scheduler.min_file_process_interval:这个参数决定了每个 DAG 文件被解析的频率,默认值:30 秒;如果 DAG 更新不频繁,可以增加此值,如果 DAG 解析时间较长,也应增加此值。可以监控 DAG 解析时间(dag_processing.total_parse_time),将此值设置为略高于平均解析时间。
  • scheduler.parsing_processes:这个参数控制调度器可以并行运行多少个进程来解析 DAG,默认值:(2 * vCPU数) – 1,通常,减少此值可以改善任务调度,如果有大量 DAG,可以增加此值;对于集群<100 DAGs 时可以使用默认值。
  • .airflowignore :一个特殊的配置文件,用于指定 Airflow 应该忽略的目录或文件。它的工作原理类似于 .gitignore 文件,使用正则表达式模式来匹配应该被忽略的文件或目录名;使用 .airflowignore 减少需要解析的文件数量,可以显著提高调度器的性能。

优化 DAG 文件设计

DAG 的结构和复杂度直接影响调度器的工作负载,建议从以下几个方面进行优化:

  • 简化 DAG 结构,复杂的 DAG 结构不仅难以维护,还会增加调度器的负担
  • 减少任务依赖,过多的任务依赖会增加调度器的计算复杂度,尽量减少不必要的依赖关系
  • 使用 TaskGroups,对于相关的任务,使用 TaskGroups 可以提高 DAG 的可读性和管理效率
  • 避免过度使用分支操作符,虽然 BranchPythonOperator 很有用,但过度使用会增加 DAG 的复杂度,需要尽可能简化分支逻辑
  • 加快 DAG 加载速度,将不必要的代码(如数据处理、API 调用)移到任务执行函数中,避免在 DAG 文件顶级执行
  • 使用动态 DAG 生成,对于需要生成大量相似 DAG 的场景,使用动态 DAG 生成可以减少 DAG 文件数量,提高加载效率
    from airflow.models import DAG
    from airflow.operators.python_operator import PythonOperator
    
    def create_dag(dag_id, schedule, default_args):
        dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
        
        with dag:
            task = PythonOperator(
                task_id='task',
                python_callable=lambda: print(f"This is DAG {dag_id}")
            )
        
        return dag
    
    # 动态生成多个DAG
    for i in range(10):
        dag_id = f'dynamic_dag_{i}'
        globals()[dag_id] = create_dag(dag_id, '@daily', default_args)
    
  • 减少使用并行任务,如果可能,考虑将并行任务序列化执行,除非并行执行能显著提高效率
    # 优化前(并行结构)
    start >> [task1, task2, task3] >> end
    
    # 优化后(线性结构)
    start >> task1 >> task2 >> task3 >> end
    

3. 配置 Worker 动态伸缩

MWAA 支持 WebServer 和 Worker 节点的动态伸缩,这对于处理变化的工作负载至关重要:

  • 设置合适的最小和最大 Worker 数:根据基础负载和峰值负载设置,MWAA 根据(tasks running + tasks queued)/(tasks per worker)=(required workers)来判断是否进行扩缩当前的 worker 节点数量。

将最小 Worker 数设置为能处理基本负载的数量,最大数设置为峰值负载的 1.5 倍。

4. 避免保存持久化数据

MWAA 每一个 task 的存储空间上限是 20 GB,这个限制由它底层 ECS Fargate 1.4 特性决定的,同时,临时数据可能会被存放在不同的/tmp 目录中,而且并不会保证后续的任务会被调度到相同的 fargate 容器,也就是说数据暂时保存在某个容器的/tmp 目录中,下一个任务也可能无法访问到这些数据。所以考虑到上述特性,我们应当遵循以下最佳实践:

  • 避免存储持久化数据:不要依赖 MWAA 环境来存储需要长期保存的数据。
  • 使用外部存储服务:对于需要在任务之间共享或长期保存的数据,使用如 Amazon S3 这样的外部存储服务。
  • 优化数据处理流程:设计工作流时,尽量减少中间数据的存储需求,临时文件应当注意在任务完成或异常退出时删除,避免累计无用数据。

例如,在下面的代码示例中我们直接从 S3 读取数据,在内存中处理,然后将结果直接写回 S3,这种方法避免了在 MWAA 环境中存储任何持久化数据,同时也减少了对临时存储的依赖。

import boto3
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(
    'optimized_data_processing_boto3',
    default_args=default_args,
    description='An optimized DAG for data processing using boto3',
    schedule_interval=timedelta(days=1),
)
def process_data(**kwargs):
    # 初始化S3客户端
    s3_client = boto3.client('s3')
    
    # 从S3读取数据
    input_bucket = 'my-input-bucket'
    input_key = 'input_data.csv'
    response = s3_client.get_object(Bucket=input_bucket, Key=input_key)
    input_data = response['Body'].read().decode('utf-8')
    
    # 处理数据(这里仅为示例,实际处理逻辑可能更复杂)
    processed_data = input_data.upper()
    
    # 将处理后的数据直接写回S3,而不是保存在本地
    output_bucket = 'my-output-bucket'
    output_key = 'processed_data.csv'
    s3_client.put_object(
        Body=processed_data.encode('utf-8'),
        Bucket=output_bucket,
        Key=output_key
    )
process_task = PythonOperator(
    task_id='process_data_boto3',
    python_callable=process_data,
    dag=dag,
)
process_task

5. 其他维护事项

将 metadata 数据导出到 S3

metadata 用来存储 workflow 和任务的状态,将元数据以 CSV 文件的格式导出到 S3 可以进行离线分析 Airflow 环境的运行状况。

实现方法:

  1. 创建一个 DAG,定期查询 Aurora PostgreSQL 数据库中的特定表(如 DagRun, TaskFail, TaskInstance)。
  2. 将查询结果写入 CSV 文件,并存储到 Amazon S3 桶中。
  3. 使用 boto3 库与 S3 交互,确保 MWAA 环境有适当的 S3 写入权限。

示例代码:

from airflow.decorators import dag, task
from airflow import settings
import os
import boto3
from airflow.utils.dates import days_ago
from airflow.models import DagRun, TaskFail, TaskInstance
import csv, re
from io import StringIO

DAG_ID = os.path.basename(__file__).replace(".py", "")

MAX_AGE_IN_DAYS = 30 
S3_BUCKET = '<your-export-bucket>'
S3_KEY = 'files/export/{0}.csv' 

# You can add other objects to export from the metadatabase,
OBJECTS_TO_EXPORT = [
    [DagRun,DagRun.execution_date], 
    [TaskFail,TaskFail.execution_date], 
    [TaskInstance, TaskInstance.execution_date],
]
 
@task()
def export_db_task(**kwargs):
    session = settings.Session()
    print("session: ",str(session))
 
    oldest_date = days_ago(MAX_AGE_IN_DAYS)
    print("oldest_date: ",oldest_date)

    s3 = boto3.client('s3')

    for x in OBJECTS_TO_EXPORT:
        query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
        print("type",type(query))
        allrows=query.all()
        name=re.sub("[<>']", "", str(x[0]))
        print(name,": ",str(allrows))

        if len(allrows) > 0:
            outfileStr=""
            f = StringIO(outfileStr)
            w = csv.DictWriter(f, vars(allrows[0]).keys())
            w.writeheader()
            for y in allrows:
                w.writerow(vars(y))
            outkey = S3_KEY.format(name[6:])
            s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
 
@dag(
    dag_id=DAG_ID,
    schedule_interval=None,
    start_date=days_ago(1),
    )
def export_db():
    t = export_db_task()

metadb_to_s3_test = export_db()

通过 aws cli 运行此命令将 python 脚本放入 DAGs 目录:

aws s3 cp your-dag.py s3://your-environment-bucket/dags/

定期清理 metadata 元数据数据库

每个 MWAA 环境都有一个由 AWS 管理的 Aurora PostgreSQL 数据库作为元数据数据库,用户只能访问并不能自行管理,可以通过指标进行监控元数据的资源剩余情况,定期清理元数据数据库是保持 MWAA 环境性能的关键。

监控元数据库指标:

清理元数据库代码:

from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep

from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
    from airflow.jobs.job import Job
else:
    # The BaseJob class was renamed as of Apache Airflow v2.6
    from airflow.jobs.base_job import BaseJob as Job

# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7

# This is a list of (table, time) tuples. 
# table = the table to clean in the metadata database
# time  = the column in the table associated to the timestamp of an entry
#         or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
    [TaskInstance, TaskInstance.execution_date],
    [TaskReschedule, TaskReschedule.execution_date],
    [DagTag, None], 
    [DagModel, DagModel.last_parsed_time], 
    [DagRun, DagRun.execution_date], 
    [ImportError, ImportError.timestamp],
    [Log, Log.dttm], 
    [SlaMiss, SlaMiss.execution_date], 
    [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], 
    [XCom, XCom.execution_date],     
]

@task()
def cleanup_db_fn(x):
    session = settings.Session()

    if x[1]:
        for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
            earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
            print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
            earliest_date = days_ago(earliest_days_ago)
            oldest_date = days_ago(oldest_days_ago)
            query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
            query.delete(synchronize_session= False)
            session.commit()
            sleep(5)
    else:
        # No time column specified for the table. Delete all entries
        print("deleting", str(x[0]), "...")
        query = session.query(x[0])
        query.delete(synchronize_session= False)
        session.commit()
    
    session.close()

 
@dag(
    dag_id="cleanup_db",
    schedule_interval="@weekly",
    start_date=days_ago(7),
    catchup=False,
    is_paused_upon_creation=False
)

def clean_db_dag_fn():
    t_last=None
    for x in TABLES_TO_CLEAN:
        t=cleanup_db_fn(x)
        if t_last:
            t_last >> t
        t_last = t

clean_db_dag = clean_db_dag_fn()

总结

通过综合应用这些优化策略,您可以显著提高 MWAA 环境的性能、可靠性和可维护性。优化是一个持续的过程,需要根据实际工作负载和需求不断调整和改进,定期回顾这些策略,并结合 MWAA 的新特性和最佳实践,以确保您的 Airflow 环境始终保持最佳状态。

附录

https://docs.aws.amazon.com/mwaa/latest/userguide/best-practices-tuning.html

https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-faqs.html#q-supported-features

https://docs.aws.amazon.com/mwaa/latest/userguide/samples-dag-run-info-to-csv.html

https://docs.aws.amazon.com/mwaa/latest/userguide/samples-database-cleanup.html

本篇作者

陈汉卿

亚马逊云科技解决方案架构师,负责基于亚马逊云科技云计算方案的咨询、架构设计及落地,拥有多年移动互联网研发经验,在云原生微服务以及云迁移等方向有丰富的实践经验。

邢倩

亚马逊云科技资深解决方案架构师,具有丰富的互联网头部企业技术团队管理、产研体系建设实践经验,对于各行业商业、产品、运营、技术架构有综合和深入的理解,擅于将云服务和 GenAI 能力与客户业务成长深度结合,创造多赢机会。