亚马逊AWS官方博客

云原生编排数据分析管道初探

摘要

公有云是适合数据分析和大数据处理的天然平台。近年来,云服务和开源社区涌现出许多优秀的工作流编排工具,方便就数据分析中复杂的抽取转换加载 (ETL) 过程进行任务编排。亚马逊云科技(亚马逊)的 Step Functions 状态机和开源社区的 Airflow 是其中的典型代表。要成功运行数据分析管道,需要至少两个必要准备,一是搭建好支持运行数据管道的基础设施,例如部署好 Airflow 的调度器和执行器等。二是编排好数据管道的 ETL 任务顺序,例如状态机的 JSON 定义文件或者 Airflow 的有向无环图定义。前者涉及运维,后者事关业务。从数据分析的角度,则希望运维难度最小,业务易用度最大。本文从上述两个角度切入,就 Airflow 和状态机支持数据分析管道的情况进行分析,并初步探讨云原生编排数据管道的方法和意义。

Abstract

Public cloud is one of the most suitable platforms for data analysis and big data processing. In recent years, many excellent workflow orchestration tools have proliferated in cloud services and open source communities to facilitate orchestration of complex ETL jobs in data analysis. AWS Step Functions and Airflow from open source community are two typical examples. To run the data analysis pipelines successfully, at least two steps are necessary. Firstly, to build the infrastructure to support running the data pipelines, such as the deployment of Airflow’s schedulers and executors. Secondly, to orchestrate the ETL tasks in the data pipelines, such as the JSON definition of the state machine or Airflow’s directed acyclic graph definition. The former involves dev-ops, while the latter is related to application. From the perspective of data analysis, it is ideal that the difficulty of dev-ops is minimized, and the user-friendliness of application is maximized. This article analyzes how well Airflow and Step Functions support data analysis pipelines from the above two points of view, and preliminarily discusses the methodology and significance of cloud-native services for orchestrating data pipelines.

目标读者

本文预期读者需要掌握以下技术的基础知识:

  • Apache Airflow 及其相关概念
  • 亚马逊开发包:CDK, SDK
  • 亚马逊服务:CloudTrail, EventBridge, Glue, Lambda, MWAA, Redshift, S3, Step Functions, VPC, 密码管理器
  • 语言:Javascript, JSON, Python

开放源代码

本文所述解决方案源代码开放:

绪论

2020年11月,亚马逊发布了托管的 Airflow 服务,全称为 Managed Workflows for Apache Airflow (MWAA),支持 1.10.12 版。2021年5月开始支持 2.0.2 版。但截至目前(2021年8月),中国北京区、宁夏区和香港区皆未提供托管服务。如果要使用 Airflow,则需要自行搭建部署,例如利用 Elastic Container Service。或者换成其他云原生的编排服务,例如 Step Functions 状态机或 Simple Workflow Service。本文以某 MWAA 的数据分析指导教程为引子,介绍 MWAA 资源代码化的方法,并对如何在中国区使用云原生服务达到类似编排数据分析管道的目的,进行初探。

教程简介

MWAA 数据分析管道指导教程是指导利用 MWAA 搭建数据分析管道的教程。该分析管道较简单,线性分为五步,分别是监控文件,爬元数据,转换数据,整合数据,保存结果。简单起见,本文将第四步改造为 Glue 任务,不使用 Elastic MapReduce 集群。部署好的数据管道有向无环图如下:

以 MWAA 为基础的架构图如下所示。虚线和编号表示该数据管道运行时的数据流向和任务执行顺序。此处不同服务的调用是通过 Airflow 的操作符间接完成的,以 Python 定义在数据管道的有向无环图中。

根据该教程介绍,完成时间为一小时左右。抛开初学者的因素,所用时间较长主要是因为所涉及的资源及其配置都是在亚马逊控制台上手工完成,非常耗时,效率低下。如果利用资源代码化技术,则部署时间可以在数分钟内完成(排除 MWAA 自身启动需要的约半小时),提高工作效率,尤其在需要多环境部署测试的情况下。

资源代码化

利用亚马逊云开发包 (Cloud Development Kit),可快速构建云资源部署程序。结合相关代码仓库管理工具如 github,可代码化管理资源的创建和修改过程,方便协同工作和排错溯源。限于篇幅以下仅展示关键代码。完整代码请参阅开放源代码。首先建立 Redshift 集群。云开发包建立好集群后,会自动在密码管理器中新建一密码项,即为该集群的连接密码,全程密码无暴露。

createCluster(landingZone, subnetGroup) {
    return new redshift.Cluster(this, "MainCluster", {
        masterUser: { masterUsername: "admin" },
        vpc: landingZone.vpc,
        numberOfNodes: 2,
        publiclyAccessible: false,
        defaultDatabaseName: RedshiftStack.DB_NAME,
        securityGroups: [landingZone.securityGroup],
        nodeType: redshift.NodeType.DC2_LARGE,
        roles: [this.role],
        subnetGroup: subnetGroup
    })
}

其次是建立 MWAA 环境。简化起见配置为可以公网访问。实际生产中建议配置为私网访问。MWAA 对网络配置有特别要求,若不达标环境可能无法启动。子网建议配置为私有子网,即有路由到 NAT 网关。具体请参阅使用手册。

createEnvironment(envName, role, landingZone, bucket) {
    return new airflow.CfnEnvironment(this, "Lab", {
        name: envName,
        airflowVersion: "2.0.2",
        environmentClass: "mw1.small",
        executionRoleArn: role.roleArn,
        sourceBucketArn: bucket.bucketArn,
        webserverAccessMode: "PUBLIC_ONLY",
        dagS3Path:           "airflow/lab/dags",
        pluginsS3Path:       "airflow/lab/plugins/awsairflowlib_202.zip",
        requirementsS3Path:  "airflow/lab/requirements/requirements.txt",
        networkConfiguration: {
            securityGroupIds: [landingZone.securityGroup.securityGroupId],
            subnetIds: landingZone.vpc.privateSubnets.map(subnet => subnet.subnetId)
        }
    });
}

MWAA 配置好以后,还需要把有向无环图定义文件上传到存储桶指定位置。利用云开发包的 S3 部署模块完成:

uploadDagFile(bucket) {
    new deploy.BucketDeployment(this, "DagScript",
        sources: [deploy.Source.asset(path.join(__dirname, '../../scripts/airflow/lab/dags/'))],
        destinationBucket: bucket,
        destinationKeyPrefix: 'airflow/lab/dags/'
    });
}

至此就完成了 MWAA 相关资源的部署程序。部署上述 MWAA 大约耗时半小时,因为涉及服务器的启动连接等。部署耗时长也凸显出非无服务器服务的弊端。当环境状态变为“可用”后,点击“打开 Airflow UI”即可打开 Airflow 的控制台。

部署完成后,根据教程即可完成数据分析管道相关操作。Airflow 控制台可以显示数据管道完成时间甘特图,形象展示各任务耗时多寡。这是其亮点之一。

安全隐患

最后一步把存储桶文件复制到 Redshift,需要在 Airflow 控制台配置到 Redshift 的连接。连接属性包含用户名和密码明文。此处有暴露密码明文的安全隐患。下面介绍的云原生技术通过密码管理器实现无缝连接,可有效规避密码暴露风险,切实提高系统安全水平。

云原生编排

目前 MWAA 尚不支持中国区,故需要自行搭建并维护 Airflow 环境,例如在 ECS Fargate 上。但是运维难度不容小觑。亚马逊提供了其他编排服务,例如状态机,完全可以代替 Airflow 对数据管道进行编排。此外状态机是无服务器的,毋需关心并事先设定服务器数量性能等运维难题。使用云原生服务,和其他服务紧密集成,可最大化服务效益,增强安全性。和教程数据分析管道等价的状态机工作流如下所示。状态机跨服务集成支持直接启动 Glue 任务,较为简单。其他步骤需做些许变通。

以云原生服务为基础的架构图如下所示。状态机在各服务之间调度,其中 Glue 任务可以直接执行。爬虫任务需要通过 Lambda 函数辅助。把存储桶的数据载入到 Redshift 可以有多种方法完成,例如 Glue 任务可以直接连接 Redshift 并保存数据。为了和教程中的步骤一一对应,此处利用 Lambda 函数来辅助完成。存储桶文件监控则通过跟踪与规则联合完成。

爬元数据

状态机的跨服务集成目前暂不支持调用 Glue 爬虫,需替代方案。此处利用两个 Lambda 函数和轮询机制实现相同功能。启动爬虫很简单:

await glue.startCrawler({Name: crawlerName}).promise();

爬虫状态查询,如果不是“就绪”状态,则等待十秒后再次查询,直至成功或者超时。

exports.handler = async event => {
    response = await glue.getCrawler({Name: event.crawlerName}).promise();
    const state = response.Crawler.State;
    return state == "READY";
}

爬虫调用代替方案的其他两步可以利用状态机的“选择”和“等待”原生操作符完成。这里传入变量 next 作为爬虫轮询结束后的下一步操作。以下代码可作为爬虫调用的模版使用。若想提高响应速度可缩短等待时长。一个典型的爬虫任务需时数分钟。

createCrawlerStep(next) {
    const crawlTask = new task.LambdaInvoke(this, "Crawl", {
        lambdaFunction: this.lambda("CrawlLambda", role, "../lambda/crawler/crawl"),
        payload: sf.TaskInput.fromObject({CrawlerTagName: "NF1-Airflow-Lab-RawGreenCrawler"}),
        payloadResponseOnly: true,
        resultPath: "$.crawlerName",
    });
    const checkCrawled = new task.LambdaInvoke(this, "Check if crawled", {
        lambdaFunction: this.lambda("CheckCrawledLambda", role, "../../lambda/crawler/check"),
        payloadResponseOnly: true,
        resultPath: "$.crawled",
    });
    const wait = 10;
    crawlTask.next(checkCrawled)
    .next(new sf.Choice(this, "Is crawled?")
        .when(sf.Condition.booleanEquals("$.crawled", true), next)
        .otherwise(new sf.Wait(this, `Wait for ${wait} Seconds`, {
            time: sf.WaitTime.duration(core.Duration.seconds(wait)),
        }).next(checkCrawled)));
    return crawlTask;
}

保存结果

状态机的跨服务集成尚未对 Redshift 提供支持,对保存结果需要替代方案。Airflow 提供 S3ToRedshiftOperator 操作符将数据从存储桶复制到 Redshift 表中。本质上是通过 Redshift 的 COPY 命令实现的。替代方案亦遵循此思路。利用亚马逊软件开发包的 RedshiftData 模块,可以执行 SQL 语言和数据操作命令。此处通过密码 ARN 获取密码,完全规避密码明文暴露的问题。此外 Glue 也提供应用接口支持直接把数据保存到 Redshift。在保存数据到数据仓库之前可多做一步,将目标表做清空操作(truncate 或 delete),避免因原表中有数据导致冗余。

await data.executeStatement({
    ClusterIdentifier: event.ClusterIdentifier,
    Database: event.Database,
    SecretArn: event.SecretArn,
    Sql: event.Sql
}).promise();

状态机保存结果的替代方案可以一个 Lambda 函数实现。在生产环境,此处建议以更细粒度和最小化原则配置该函数所赋予的权限,夯实系统安全性。

createCopyS3ToRedshift(bucket, redshift) {
    const dir = bucket.s3UrlForObject("airflow/lab/data/aggregated");
    const sql = `copy nyc.green from '${dir}' iam_role '${redshift.role.roleArn}' format as parquet;`;

    return new task.LambdaInvoke(this, "Copy S3 to Redshift", {
        lambdaFunction: this.lambda("CopyToRedshiftLambda", role, "../lambda/redshift/execute"),
        payloadResponseOnly: true,
        payload: sf.TaskInput.fromObject({
            ClusterIdentifier: redshift.cluster.clusterName,
            Database: RedshiftStack.DB_NAME,
            SecretArn: redshift.cluster.secret.secretArn,
            Sql: sql,
        })
    });
}

监控文件

最后还需要处理对存储桶文件的监控任务。Airflow 提供 S3PrefixSensor 来监控文件上传到某个桶,进而触发数据管道进行数据处理。此处有多种方式监控存储桶里面的文件。例如 S3 自带的事件通知功能。不过事件通知的目标不支持启动状态机,还需要 Lambda 辅助。或者可以通过 CloudTrail 跟踪桶写入操作,然后通过 EventBridge 的个规则来捕获事件进而触发状态机执行。跟踪只需要关注特定桶特定目录的写入即可。

createTrail(logBucket, monitorBucket) {
    const trail = new cloudtrail.Trail(this, 'CloudTrail', {
        bucket: logBucket,
        s3KeyPrefix: "data-trail",
    });
    trail.addS3EventSelector(
        [{bucket: monitorBucket, objectPrefix: "airflow/lab/data/raw"}],
        {readWriteType: cloudtrail.ReadWriteType.WRITE_ONLY});
    return trail;
}

捕获规则和跟踪一样,只需要捕获特定桶特定目录的写入即可,此处利用 prefix 前缀操作符来限定。规则目标可以直接启动状态机,开启数据分析管道进程。

createS3Event(machine, bucket) {
    new events.Rule(this, "S3StepFunctions", {
        description: "S3 invoke StepFunctions",
        eventPattern: {
            source: [ "aws.s3" ],
            detailType: [ "AWS API Call via CloudTrail" ],
            detail: {
                "eventSource": [ "s3.amazonaws.com" ],
                "eventName": [ "PutObject" ],
                "requestParameters": {
                    "bucketName": [ bucket.bucketName ],
                    "key": [{ "prefix": "airflow/lab/data/raw" }]
                }}},
        targets: [ new targets.SfnStateMachine(machine) ]
    });
}

方案对比

至此云原生编排数据分析管道的改造宣告完成。回顾本文,重新审视和对比各项方案的利弊得失,可以更好的帮助读者选择更适合业务场景的方案。例如,如果在中国区新建数据分析管道项目,则建议使用状态机。如果从 Airflow 的老环境迁移上云,则建议使用 MWAA 或者自建 Airflow,这样可以复用积累的代码。亦可两者并行,对新的数据管道用云原生方案。

状态机 MWAA 自建 Airflow
编写语言 JSON 及云开发包支持的所有语言
(TypeScript, JavaScript, Python, Java, and C#)
Python Python
支持中国区
无服务器
托管服务
连接密码暴露
开源社区
部署时间(近似值) 开箱即用 一小时 数小时到数天
Airflow 最新版本
(2021年8月)
不适用 2.0.2 2.1.3
服务调度集成 云原生 通过操作符 通过操作符

结论

本文从 MWAA 的数据分析管道指导教程为引子,介绍利用云开发包快速搭建部署程序的方法,并以状态机为编排平台,尝试改造其为云原生编排的数据管道,辅以几个关键操作的替代方案介绍。本文初步探讨云原生编排数据分析管道的方法,借此抛砖引玉,供读者讨论。从“方案对比”一节可以看出,云原生的编排方案有诸多优势,尤其表现在零部署时间,无服务器化管理,多语言支持和增强安全性上。相信随着跨服务集成的深度和广度越来越高,云原生的编排优势会如虎添翼,成为数据分析管道编排平台的不二选择。

工作展望

有几个有趣的展开方向。首先就 Airflow 的诸多操作符,研究云原生改造方法,以期对所有 Airflow 有向无环图皆可支持改造,利于迁移。能自动化改造更佳。其次研究将 Airflow 的数据分析管道以 Glue 的蓝图和工作流为基础进行改造。Glue 蓝图的编排平台和状态机的编排平台是一对有趣的对比。再者就自建 Airflow 的方案如何高效搭建基础设施并降低运维难度亦值得关注。

参考资料

本篇作者

袁文俊

亚马逊云科技专业服务部顾问。曾在亚马逊美国西雅图总部工作多年,就职于 Amazon Relational Database Service (RDS) 关系型数据库服务开发团队。拥有丰富的后端开发及运维经验。现负责业务持续性及可扩展性运行、企业应用及数据库上云迁移、云上灾难恢复管理系统等架构咨询、方案设计及项目实施工作。他拥有复旦大学理学学士学位和香港大学哲学硕士学位。

赵鑫

亚马逊云科技专业服务团队数据架构师,专注于生命科学、自动驾驶领域的数据架构与数据分析