亚马逊AWS官方博客

基于 Cosmos,dbt 和 Redshift 快速构建无服务器的现代数据管道

关于 AWS Redshift + dbt 的详情可以参考以下内容:

使用 dbt 进行数据建模,可以显著提升数据可靠性、可溯源性和可审查性,这些都对构建高质量的现代数据分析系统至关重要,总结下来主要有以下几大优势:

  • 清晰的数据血缘。dbt 通过模型依赖关系,可以非常清晰地表达出数据流转的血缘。数据从原始表到中间层再到应用层的转换链路一目了然。
  • 一致的文档。dbt 可以自动生成数据字典,对所有模型进行统一的文档化。文档中包含字段信息、描述、血缘等 Metadata。
  • 通过测试驱动的数据质量。dbt 内置了测试框架,可以在模型级编写数据测试案例,持续验证数据质量。例如唯一性、完整性、精确性等测试。
  • 问题排查更加高效。若出现数据问题,可以通过血缘链快速定位问题出在哪个转换步骤。文档也有助于理解字段含义。
  • 自动化部署和测试 dbt 支持 CI/CD。可以通过自动化流程来确保任何代码变更都经过验证,避免引入问题。
  • 可 audit 的变更。历史 dbt 项目本身是代码,所有的变更都可存储在 Git 等版本控制工具中,可完整地复核和回溯变更历史。

当使用 Airflow 的 DAG 去调度 dbt Core 项目连接 Redshift 时,引入 Cosmos 作为中间层的效果如下:

Without Cosmos

With Cosmos

从图上可以看出来,差别较为明显,简单总结如下:

  • 利用 Airflow 的数据感知调度功能,在上游数据接入后立即运行模型 ,将每个 dbt 模型转变成带重试、告警等功能的任务或任务组,血缘关系上移到 airflow 可以清晰地排查到具体是模型的哪个部分出现问题并再次重试。
  • 使用 Airflow 连接而不是 dbt profile 来运行 dbt 项目,可以更好地利用 Airflow 的连接管理功能,Airflow 本身支持容错和重运行失败任务,使得 dbt 任务执行更加可靠。
  • 原生支持在虚拟环境中安装和运行 dbt, Cosmos 目前支持本地、Kubernetes 和 Docker 三种运行模式,避免与 Airflow 的依赖冲突。

更多细节见 https://www.astronomer.io/Cosmos/

本篇主要探讨使用 Cosmos+dbt+Redshift 以及整个DevOps CICD 的数据管道的具体实现,架构图如下:

Continuous Integration 阶段:每次触发合并请求时,GitLab Webhook 激活 API Gateway,然后调用 Lambda 函数启动 CodeBuild 作业。CodeBuild 将 DAG 中的代码编译为 Docker 镜像并推送到 Amazon Elastic Container Registry。更多信息见 https://github.com/aws-samples/gitlab-codebuild-quickstart。通过重写 API Gateway 在 Git 更改时调用的 Lambda 函数,并修改 CodeBuild 的编译过程,我们实现了 dbt 部署的 CI/CD 流水线自动化。这样每次 Git 仓库中的 DAG 定义文件发生变更时,都会触发对 dbt 的重新编译和部署到 Redshift,实现分析工作负载中 dbt 模型的持续交付。

Continuous Deployment 阶段:CodeBuild 作业会将 DAG 文件上传到 S3 桶。然后,它会在 Amazon Elastic Kubernetes Service 上触发 Kubernetes Pod 来运行 dbt 编译。编译后的 dbt 代码随后会部署到 Redshift 执行。此外,一旦上传到 S3,与 AWS Glue 等其他服务相关的 DAG 也可以用类似的方式触发。监控:每个 DAG 运行完成后,会触发回调,通过 SNS 通知将运行结果发送到 Lambda 函数。然后 Lambda 函数会将运行结果发布到用户常用的企业微信或 Microsoft Teams 等企业协作工具。

接下来,开始进入部署阶段。

1. 准备 GitLab 环境

GitLab 的具体安装方式可以根据需求选择容器化部署或者虚拟机部署。本文档的重点是设计 GitLab 的持续集成和持续交付工作流程。关于安装指南可以参考:https://docs.gitlab.com/ee/install/aws/

2. 在 GitLab 中配置 Webhook

GitLab Webhook 允许外部系统在 GitLab 中的某些事件发生时得到通知。它可以集成 CI/CD 流水线,以便在代码推送到 GitLab 时自动触发构建。

2.1 在 GitLab 的左侧导航栏点击设置,然后点击辅助菜单中的 Webhooks。

2.2 在 Webhook 配置页面填写在 4.1.1 中创建的 API Gateway 的端点,填写 Secret token,选择 Merge request events,然后点击添加 Webhook。

2.3 记录您的 Secret token,它会在下一部分中使用。可以先任意填一个网页 url,之后用 API Gateway 生成的 url 替换它。

注意:如果选择不同的 GitLab 事件需要相应地修改 Lambda 函数代码以处理替代请求。本篇的 Lambda 主要是针对 merge request events 的处理。

3. 在 GitLab 中配置 API 访问权限

GitLab API 访问令牌用于认证 API 请求并授权访问 GitLab API。在本例中,Lambda 可以使用该令牌与 GitLab 服务进行交互。

– 在令牌名称字段中输入一个可识别的名称

– 在选择角色时,角色级别不能低于 Reporter

– 选择 read_api read_repository 选项。

– 点击创建项目访问令牌。

– 妥善保存好项目访问令牌。编写 CodeBuild 脚本时会使用在这里生成的令牌。

4. 用 ARM 架构在 AWS 上实现无服务器部署 API Gateway、Lambda 和 CodeBuild

由于 ARM 架构 CPU 与传统的 x86 CPU 相比,为云中的虚拟化工作负载提供了明显的性价比优势,我们在本示例中选择了 Graviton 类型实例来构建代码。

4.1 在 ECR 中创建存储库

使用 AWS 控制台创建一个新的私有 ECR 存储库来存储容器镜像。

4.2 在 arm 的机器上创建一个 dockerfile 以及 requirment.txt 文件,参考 https://github.com/ziling777/cicd/tree/main/base。构建镜像并推送到 ECR 的私有镜像仓库并记录镜像的 URI 到记录该镜像的 URI 供后续使用。

sudo systemctl start docker
aws ecr get-login-password --region us-east-2 |sudo docker login --username AWS --password-stdin AWS-ACCOUNT.dkr.ecr.us-east-2.amazonaws.com
sudo docker build -t cicd .
sudo  docker tag cicd:latest AWS-ACCOUNT.dkr.ecr.us-east-2.amazonaws.com/cicd:latest
sudo docker tag cicd:latest AWS-ACCOUNT.dkr.ecr.us-east-2.amazonaws.com/cicd:latest
sudo docker push AWS-ACCOUNT.dkr.ecr.us-east-2.amazonaws.com/cicd:latest

4.3 创建 CodeCommit 仓库来保存 build.yaml、dockerfile 和 kubectl.config 文件。前两个文件的内容,可参考 https://github.com/ziling777/cicd/tree/main/codecommit

Replace AWS_CCOUNT into your own account id in build.yaml file.
Replace AWS_REGION into your own region to be provisioned.

a)build.yaml:这是CodeBuild构建项目的配置文件,定义了构建环境、构建命令等信息。通过这个文件可以指定构建过程中的各种配置,比如构建的运行环境、需要安装的依赖、构建和测试命令等。

b)dockerfile:这是一个Docker镜像构建文件的配置清单,定义了如何将应用程序打包成Docker镜像。dockerfile中会指定基础镜像、需要安装的依赖、应用程序代码的复制、启动命令等步骤。通过dockerfile可以将应用程序及其运行环境打包为一个镜像。

c)kubectl.config 文件生成请参考 https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-eks-example.html#eksctl-kube-config,该文件将在代码构建期间自动复制到 S3,用于 airflow 和 kubernetes 集群之间的交互。

在首次连接 AWS CodeCommit 之前,您必须完成以下初始配置步骤:https://docs.aws.amazon.com/codecommit/latest/userguide/setting-up-https-unixes.html

git init
git remote add origin git-codecommit.us-east-2.amazonaws.com/v1/repos/pipeline-code-prod
git branch -M main
git add .
git commit -m 'first cicd'
git push -u origin main

4.4 创建 codebuild 并记下 project 的名字

添加环境变量:

– GIT_BRANCH: The Git branch name to pull the code from, such as main or master

– GIT_PROJECT_NAME: The Git project name, such as airflow or dbt-project

– GIT_ACCESS_TOKEN: The Personal Access Token used to access the Git project

– GIT_PROJECT_ID: The ID of the Git project, used to build CI/CD pipelines

– GIT_PROJECT_IID: The IID of the Git merge request id.

– GIT_PROJECT_URL: The HTTP/SSH URL of the Git project

– REPOSITORY_IMAGE_PREI: The ECR image Project name

– REPOSITORY_URI: The ECR URI

– REPOSITORY_TAG: The ECR repository tag

– MWAA_DAGS_DIR: The directory of DAG files in MWAA.

4.5 从本地机器下载 CloudFormation 模板文件 https://github.com/ziling777/cicd/blob/main/cloudformation/gitlab-apigw-lambda-codebuild3.yaml ,然后使用它创建基于 ARM 的 Lambda函数、APIGateway 并调用上一步创建的 codebuild project。

GitLab 用户名:访问 GitLab 的用户名。

GitLab 密码:访问 GitLab 的密码。将存储在 AWS Secrets Manager 中。

Webhook token:Webhook_token 和 git 访问令牌在第一部分中生成,格式如下:

{“webhook_token”:”xxxxx”,”git_access_token”:”xxxxx”}

Lambda 层 S3 桶:下载 https://github.com/ziling777/cicd/blob/main/cloudformation/boto3.zip 到任意部署区域的 S3 桶然后填入桶的名字。

其余选项默认。

当 CloudFormation stack 部署完成之后,output 列复制 WebhookURL,并在 GitLab Webhook 页面中更新它。在更新 URL 后可能需要重新填写密钥令牌,因为在重新编辑时它会为空。

4.6 创建 Amazon MWAA 环境,版本为 2.5.1,步骤可参见 https://docs.aws.amazon.com/mwaa/latest/userguide/what-is-mwaa.html

创建 requirement.txt 文件如下,让 MWAA 可以自动安装相关插件,然后将其上传到 MWAA 的桶中,并在配置 MWAA 时指定它:

#--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.10.txt"
#kubernetes==23.6.0
#apache-airflow[cncf.kubernetes]
airflow-dbt-python[Redshift,s3]
astronomer-Cosmos[dbt-Redshift,kubernetes]==0.7.3
apache-airflow-providers-cncf-kubernetes==7.2.0 
  1. 在 Amazon Secrets Manager 创建一个 key 存储数据库的 connection string
  2. 填入以下值:
    {"username":"admin",
    "password":"xxxxx",
    "schema":"dbt",
    "host":"xxxxxx.us-east-2.Redshift-serverless.amazonaws.com",
    "port":"5439",
    "dbname":"dev"}
    
  3. 给 Amazon MWAA 的 service role 添加 Amazon Redshift, Amazon secret manager, Amazon S3 的权限。
  1. dbt project workshop 参考 https://catalog.workshops.aws/dbt-cli-and-amazon-Redshift/en-US/introduction。Lab3 完成之后最终生成的 project 文件夹目录参考 https://github.com/ziling777/cicd/tree/main/gitlab_dbt。这里需要确保这个 dbt project 的名字和在 gitlab 提交的 project 名字相同。
  1. 接下是最重要的部分,通过 Amazon MWAA 的 DAG 在 Amazon Elastic Kubernetes Service 上创建 pod 部署 dbt/Cosmos 生成 SQL 模型,在 Redshift 执行,原始 DAG 文件可以参考 https://github.com/ziling777/cicd/blob/main/gitlab_dbt/dags/eks.py。以下是关于 Cosmos 的参数的解释说明:
    import os
    import boto3
    import json
    from datetime import datetime
    from pathlib import Path
    from airflow.decorators import dag
    from airflow.operators.empty import EmptyOperator
    from Cosmos.providers.dbt import DbtTaskGroup
    from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
    #from kubernetes.client import V1EnvVar
    #from kubernetes.client import V1ResourceRequirements
    
    # Cosmos 使用 Airflow 驱动调度,你可以利用 Airflow 的调度能力来调度你的 dbt 项目。
    @dag(
        schedule_interval="@daily",
        start_date=datetime(2023, 7, 24),
        catchup=False,
    )
    def basic_eks_Cosmos_task_group() -> None:
    # 从 secret manager 读取 Redshift 的连接信息,关于 Redshift 的连接参考 https://astronomer.github.io/astronomer-Cosmos/profiles/RedshiftUserPassword.html。
    
        def get_prod_Redshift_bi_conn_json():
            secrets = boto3.client('secretsmanager')
            secretValues = secrets.get_secret_value(SecretId="Redshiftsecretname")['SecretString']
            dic_secrets_json = json.loads(secretValues)
           # prod_Redshift_bi_str = dic_secrets_json['prod_Redshift_bi']
           # prod_Redshift_bi_json = json.loads(dic_secrets_json)
            return dic_secrets_json
    prod_Redshift_bi_conn=get_prod_Redshift_bi_conn_json()
    
    # 这个 operator 不会执行动作,它可以用来在 DAG 中对任务进行分组。任务会被调度程序评估但从不会被执行程序处理。
    
      pre_dbt = EmptyOperator(task_id="pre_dbt")
    
    # 注意:您的 dbt 项目可以放在 Airflow 可以访问的任何位置。默认情况下,Cosmos 会在 /usr/local/airflow/dags/dbt 目录中查找,但是您可以通过在创建 DAG 实例时设置 dbt_project_dir 参数来更改此位置。
    
    _project_dir= "/usr/local/airflow/dags/dbt_projects_dir/"
    
    # 这个 DAG 使用 Cosmos 包中的 DbtTaskGroup 类来从 dbt 项目中的模型创建一个任务组。dbt 模型之间的依赖关系会自动变成 Airflow 任务之间的依赖关系。请确保为 YOUR_NAME、DB_NAME 和 SCHEMA_NAME 添加自己的值。 
    
    # Cosmos 可以使用四种不同的方法(称为执行模式)来运行 dbt 命令,目前我们使用的是 kubernetes 模式。这四种模式的区别和优势可以参考 https://astronomer.github.io/astronomer-Cosmos/getting_started/execution-modes.html。
    
    local: 使用本地 dbt 安装运行 dbt 命令(默认)
    virtualenv: 从 Cosmos 管理的 Python 虚拟环境中运行 dbt 命令 
    docker: 从 Cosmos 管理的Docker容器中运行 dbt 命令
    kubernetes: 从 Cosmos 管理的 Kubernetes Pod 中运行 dbt 命令
    
        Redshift_dbt_group = DbtTaskGroup(
            dbt_root_path=_project_dir,
            dbt_project_name="dbtcicd",
            execution_mode="kubernetes",
            conn_id="bi-poc-Redshift",
    # Cosmos 允许你在每个 DbtDag / DbtTaskGroup 中使用 RenderConfig 类中的 select 和 exclude 参数过滤 dbt 项目的一个子集
            select={"configs": ["tags:finance"]},
    # Cosmos 的 DBTRunkubernetesOperator 和 DbtTestKubernetesOperator 都继承了 Airflow 的 KubernetesPodOperator(请见https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_api/airflow/providers/cncf/kubernetes/operators/pod/index.html)。 Cosmos 将组装参数传递给 KubernatesPodOperator,具体传递了哪些参数请见 Cosmos 源码。
            operator_args={
                 # task 之间是否有消息交互
                "do_xcom_push": False,
                 # 在基础镜像的 dockerfile 中 dbt 项目文件会复制到容器的/app 目录下
                "project_dir":"/app",
                "namespace":"eks namespace",
                "image": "ecr image uri",
                "get_logs": True,
                "is_delete_operator_pod": True,
               # "labels": {"app": "bigdata-dw"},
                "name": "mwaa-Cosmos-pod-dbt",
                "config_file": "/usr/local/airflow/dags/kubeconfig file name",
                "in_cluster": False,
    # 通过在提供给 operator_args 参数的字典中使用 vars 关键字,可以向 dbt 项目中注入变量。如果 dbt 项目中包含 dbt 测试,它们将在模型完成后直接运行。请注意,最佳实践是为运行 dbt 模型的所有任务将重试次数设置为至少 2。
                "vars": '{"my_car": "val1"}',
                "env_vars": {"TARGT": "prod_password",
                             "DB_HOST": prod_Redshift_bi_conn['host'], 
                             "DB_PORT": str(prod_Redshift_bi_conn['port']), 
                             "DB_USER": prod_Redshift_bi_conn['username'],
                             "DB_PASSWORD": prod_Redshift_bi_conn['password'],
                             "DB_NAME": prod_Redshift_bi_conn['dbname'],
                             "DB_SCHEMA": prod_Redshift_bi_conn['schema']
                            },
                # service_account_name当 IAM 连接 Redshift 时使用
                #"service_account_name": "prod-bigdata-bi-Redshift-iam-sa",        
                #"cluster_context": "aws",
                "image_pull_policy": "Always",
               # "tolerations": tolerations,
                #"node_selector": nodeSelector
                #"resources": resources
            },
        )
    
        post_dbt = EmptyOperator(task_id="post_dbt")
    
        pre_dbt >> Redshift_dbt_group >> post_dbt
    
    
    
    basic_eks_Cosmos_task_group()
    

4.7 在 CodeBuild 项目中配置通知

  1. 配置要用于通知的 SNS 主题 注意:SNS 主题的名称必须以”codestar-notifications-“开头,否则它可以关联但 CodeBuild 无法连接到配置的主题。
  1. 创建一个 Amazon Lambda 订阅 Amazon SNS 话题并发送消息到企微。Lambda 代码参考如下,需要替换 Webhook 的 URL:https://github.com/ziling777/cicd/blob/main/lambda_function_alarm_wechat.py

    Amazon Lambda 建立之后,将 Amazon SNS 主题加为触发器:

总结

通过结合 GitLab、Amazon  API Gateway、Amazon  Lambda 和 Amazon  CodeBuild,可以实现一个高度可扩展、自动化程度高、Serverless 架构的 CI/CD 工作流,具有快速迭代、高效协作、安全可控、低成本和高可用等优势,能够大幅提升研发交付效率。

使用 Amazon MWAA,Cosmos 插件,dbt 和 Amazon Redshift 可以轻松设置和管理 ELT 数据转换的流程,大大简化了数据分析的流程,快速构建可扩展的分析管道。

本篇作者

谢紫玲

亚马逊云科技解决方案架构师,主要负责 Auto 行业客户解决方案设计,比较擅长云原生数据库以及大数据方案设计和实践。

王新博

极氪大数据架构师,有多年的互联网软件研发、系统架构设计及大数据产品开发经验。

杨文辉

AWS Redshift Specialist 解决方案架构师,负责基于 Redshift 等数据分析领域的解决方案咨询与设计, Analytics TFC 成员。在分布式系统、计算引擎、存储引擎、调度引擎、数据架构、性能优化、数据工程、数学优化、数据科学与机器学习等领域有着丰富的理论基础与实践经验。

王佳佳

极氪智能科技大数据开发工程师,从事大数据开发多年,有设计和搭建高效、可伸缩的大数据解决方案的相关经验。