亚马逊AWS官方博客

用 Airflow 实现 EMR 集群的动态启停并通过 Livy 远程提交任务

互联网行业每天都有大量的日志生成,需要在固定时间段对数据进行ETL工作。用户常规的做法是启动一组长期运行的EMR集群,配置远程提交任务的服务器,结合自身的任务调度系统定期提交任务,但集群执行完成任务之后会闲置,造成不必要的开销。另一种方法是在需要执行任务的时候启动集群,任务完成之后关闭集群,但因为每次启动集群后,主节点与核心节点的IP都会发生分变化,导致每次都需要重新配置提交任务的服务器,造成额外的工作负担。本文介绍了一种通过Apache Airflow任务调度系统动态启停Amazon EMR集群的方法,并通过EMR内置的Livy远程提交作业,这样可以节省大量的成本并且无需进行过多的额外配置。

1. 相关技术介绍

在开始之前,请先对以下技术进行简单了解。

1.1 Apache Airflow

Apache Airflow是一款开源的任务调度系统,用户通过创建DAG(有向无环图)来定义任务的流程,图中的每个节点就是需要执行的任务,不同DAG之间的任务可以相互依赖。通过Airflow我们可以定时执行脚本,并且它提供了web界面供用户可视化地查看任务执行情况。

1.2 Apache Livy

Apache Livy是Hadoop生态圈中提供远程提交任务功能的应用程序。它以Rest API的方式提供了Session与Batches两种集群执行任务的方法。Session指的是将集群需要执行的代码写在对Livy请求中,目前支持spark、pyspark、sparkr与sql等四种方式与集群交互。Bathches指的是将代码存放在指定位置,在请求中提供路径,让集群执行代码。例如将jar包存放在S3上,在请求Livy的时候提供jar包的路径,从而让集群直接执行jar包,好处是无需在集群上配置执行代码所需的依赖。

2. 演练

通过本文示例,我将向您展示如何实现以下方案:
基于开源调度工具Airflow编排提交Spark Jobs到EMR做批处理,Job开始之前启动EMR集群,对集群节点采用Spot实例,所有Job结束后关闭EMR集群。

2.1 流程架构图与过程简介

(1)在一台EC2上配置Airflow;
(2)定义Airflow工作流,其中包含创建集群,Spark任务步骤与终止集群等任务;
(3)向Livy提交任务;
(4)EMR从S3中读取数据,对数据进行处理完成之后重新写入S3;
(5)工作完成,终止集群。

2.2 前提条件

(1)本文示例所使用的区域us-east-1;
(2)在该区域创建一台EC2,并确保与EC2绑定的IAM Role有EMR集群的Full Access;
(3)拥有创建EMR集群所需的默认角色:EMR_DefaultRole与EMR_EC2_DefaultRole;
(4)创建S3桶,下载jar包spark-examples_2.11-2.4.4和数据集emrdata.txt,并上传至s3。

2.3 实现过程

2.3.1 在EC2上配置Airflow

(1)登陆EC2,安装Airflow已经相关依赖

sudo yum update -y
sudo yum install -y python-pip gcc mysql-devel python-devel mysql
sudo pip install mysql-python
sudo yum install -y python3
sudo pip3 install boto3
sudo pip3 install requests

# 安装Airflow
sudo pip install apache-airflow
sudo pip install 'apache-airflow[celery]'
airflow initdb

 

(2)创建RDS for Mysql数据库供Airflow使用,对数据库性能要求不高,因此使用默认配置即可

(3)更改airflow.cfg配置文件,并测试是否能打开Airflow的web页面

cd airflow
vim airflow.cfg

# 找到sql_alchemy_conn等参数所在位置,替换为创建的数据库信息
sql_alchemy_conn = mysql://admin:12345678@database-for-airflow.cdtwa5j4xten.us-east-1.rds.amazonaws.com/airflowdb

# Exit vim, Update Airflow Database
airflow initdb

# 配置celery相关参数
vim airflow.cfg

# 找到executor位置,将执行器设置为celery,可保证不相互依赖的任务可以并行执行
executor = CeleryExecutor

# 找到broker_url与result_backend参数的位置
broker_url = sqla+mysql://admin:12345678@database-for-airflow.cdtwa5j4xten.us-east-1.rds.amazonaws.com:3306/airflowdb
result_backend = db+mysql://admin:12345678@database-for-airflow.cdtwa5j4xten.us-east-1.rds.amazonaws.com:3306/airflowdb

# 开启airflow的webserver,在网页上输入EC2的DNS,查看是否能打开网页(注意打开安全组,并且如果本地连上的是公司的vpn,可能会出现无法打开网页的情况)
airflow webserver -p 8080 &

# 启动worker
airflow worker &

# 启动flower,可对worker中的任务进行可视化(要看到网页注意打开5555端口)
airflow flower &
 
        

2.3.2 定义工作流

现定义如下两个Airflow的DAG:
dag_transform_calpi

(1)create_emr_cluster:创建EMR集群;

# -*- coding: UTF-8 -*-

import boto3
import time

emr_client = boto3.client('emr', region_name='us-east-1')

# 定义集群名称,集群名称不要与当前运行的集群重名
name = 'emr-cluster'

# 定义instance,可自定义实例的数量与类型
intances = {
    'InstanceGroups': [
        {
            'Market': 'SPOT',
            'InstanceRole': 'MASTER',
            'InstanceType': 'm4.xlarge',
            'InstanceCount': 1,
        },
        {
            'Market': 'SPOT',
            'InstanceRole': 'CORE',
            'InstanceType': 'm4.xlarge',
            'InstanceCount': 2,
        }
    ],
    'KeepJobFlowAliveWhenNoSteps': True
}

# 定义集群中的应用
applications = [
    {
        'Name': 'Hadoop'
    },
    {
        'Name': 'Pig'
    },
    {
        'Name': 'Livy'
    },
    {
        'Name': 'Hive'
    },
    {
        'Name': 'Hue'
    },
    {
        'Name': 'Spark'
    }
]

if __name__ == '__main__':

    # 创建emr集群
    emr_client.run_job_flow(
        Name=name, ReleaseLabel='emr-5.12.0', Instances=intances, Applications=applications,
        JobFlowRole='EMR_EC2_DefaultRole', ServiceRole='EMR_DefaultRole')
        
    # 持续发送请求,直到创建的集群状态处于Waiting为止
    flag = True
    while flag:
        time.sleep(20)
        r = emr_client.list_clusters(ClusterStates=['WAITING'])
        for i in r['Clusters']:
            if i['Name'] == name:
                flag = False

 

(2)create_livy_session:创建Livy会话;

# -*- coding: UTF-8 -*-
import requests
import json
import pprint
import boto3

# 获取集群的DNS,其中name为你的集群名称
name = 'emr-cluster'
emr_client = boto3.client('emr', region_name='us-east-1')
r = emr_client.list_clusters(ClusterStates=['WAITING'])
for i in r['Clusters']:
    if i['Name'] == name:
        cluster_id = i['Id']
r = emr_client.describe_cluster(ClusterId=cluster_id)
emr_dns = r['Cluster']['MasterPublicDnsName']

# livy_host为配置在emr集群上livy的url,无需修改代码
livy_host = 'http://' + emr_dns + ':8998'
data = {'kind': 'pyspark'}
headers = {'Content-Type': 'application/json'}
r = requests.post(livy_host + '/sessions',
                  data=json.dumps(data), headers=headers)
pprint.pprint(r.json())

 

(3)sleep:等待会话创建完成;

(4)calpi:以batches的方式执行spark任务计算pi值;

# -*- coding: UTF-8 -*-
import requests
import json
import textwrap
import pprint
import boto3

# 获取执行jar包任务的livy batch的url,其中name为你的集群名称
name = 'emr-cluster'
emr_client = boto3.client('emr', region_name='us-east-1')
r = emr_client.list_clusters(ClusterStates=['WAITING'])
for i in r['Clusters']:
    if i['Name'] == name:
        cluster_id = i['Id']
r = emr_client.describe_cluster(ClusterId=cluster_id)
emr_dns = r['Cluster']['MasterPublicDnsName']
batch_url = 'http://' + emr_dns + ':8998/batches'
headers = {'Content-Type': 'application/json'}

# 提交任务
data = {"file": "s3://xiaoyj/emr/spark-examples_2.11-2.4.4.jar",
        "className": "org.apache.spark.examples.SparkPi"}
r = requests.post(batch_url, data=json.dumps(data), headers=headers)
pprint.pprint(r.json())

 

(5)query_completed:外部任务,依赖于第二个DAG(dag_query),即等待查询完成之后,执行下一个任务;

(6)终止集群。

# -*- coding: UTF-8 -*-
import boto3
import time

# 终止集群,其中name为你的集群名称
name = 'emr-cluster'
emr_client = boto3.client('emr', region_name='us-east-1')
flag = True
while flag:
    time.sleep(120)
    r = emr_client.list_clusters(ClusterStates=['WAITING'])
    for i in r['Clusters']:
        if i['Name'] == name:
            emr_client.terminate_job_flows(JobFlowIds=[i['Id']])
            flag = False

 

dag_query


(1)sleep_completed:外部任务,依赖于第一个DAG(dag_transform_calpi),即等待Livy会话执行下一个任务;

(2)transform:对之前上传到S3上的文本文件进行聚合、转换;

# -*- coding: UTF-8 -*-

import requests
import json
import textwrap
import pprint
import boto3


# 获取提交任务的livy_url,其中name为你的集群名称
name = 'emr-cluster'
emr_client = boto3.client('emr', region_name='us-east-1')
r = emr_client.list_clusters(ClusterStates=['WAITING'])
for i in r['Clusters']:
    if i['Name'] == name:
        cluster_id = i['Id']
r = emr_client.describe_cluster(ClusterId=cluster_id)
emr_dns = r['Cluster']['MasterPublicDnsName']
livy_url = 'http://' + emr_dns + ':8998/sessions/0/statements'
headers = {'Content-Type': 'application/json'}

# 提交任务,data中的code为在emr中执行的代码,对s3中的文件进行转化操作,完成后将结果存放回s3作为中间结果
data = {
    'code': textwrap.dedent("""
    import json
    sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 's3-us-east-2.amazonaws.com')
    text_file = sc.textFile("s3a://xiaoyj/emr/emrdata.txt")
    text_file = text_file.map(lambda x: x.split('::'))
    text_file = text_file.map(lambda x: (int(x[0]), x[1:]))
    text_file = text_file.groupByKey().map(lambda x: (x[0], list(x[1])))
    text_file = text_file.sortByKey()
    text_file = text_file.map(lambda x: {x[0]: x[1]})
    text_file = text_file.map(lambda x: json.dumps(x))
    text_file.coalesce(1).saveAsTextFile("s3a://xiaoyj/emr/middle_result")
    print("Transform Complete!")
    """)
}
r = requests.post(livy_url, data=json.dumps(data), headers=headers)
pprint.pprint(r.json())

 

(3)check_s3:检查S3中是否有上一步生成的中间结果;

# -*- coding: UTF-8 -*-

import boto3
import time

# 轮询s3,确认transform任务是否执行完成(即s3中是否有middle_result文件生成),name为你的s3桶名称
name = 'xiaoyj'
s3_client = boto3.client('s3', region_name='us-east-1')
flag = True
while flag:
    time.sleep(60)
    r = s3_client.list_objects(Bucket=name)
    for i in r['Contents']:
        if i['Key'] == 'emr/middle_result/part-00000':
            flag = False

 

(4)query:对上一步生成的中间结果进行查询。

# -*- coding: UTF-8 -*-

import requests
import json
import textwrap
import pprint
import boto3

# 获取提交任务的livy_url,其中name为你的集群名称
name = 'emr-cluster'
emr_client = boto3.client('emr', region_name='us-east-1')
r = emr_client.list_clusters(ClusterStates=['WAITING'])
for i in r['Clusters']:
    if i['Name'] == name:
        cluster_id = i['Id']
r = emr_client.describe_cluster(ClusterId=cluster_id)
emr_dns = r['Cluster']['MasterPublicDnsName']
livy_url = 'http://' + emr_dns + ':8998/sessions/0/statements'
headers = {'Content-Type': 'application/json'}

# 提交任务,data中的code为在emr中执行的代码,对s3中的文件进行转化操作,完成后将结果存放回s3作为中间结果
data = {
    'code': textwrap.dedent("""
    import json
    from pyspark.sql import HiveContext, Row
    hiveCtx = HiveContext(sc)
    input = hiveCtx.read.json("s3a://xiaoyj/emr/middle_result/part-00000")
    input.registe rTempTable("tbn")
    result = hiveCtx.sql("SELECT size(`9`) from tbn")
    result = result.rdd.map(lambda row: row[0])
    result.coalesce(1).saveAsTextFile("s3a://xiaoyj/emr/result")
    print("Search Complete!")
    """)
}
r = requests.post(livy_url, data=json.dumps(data), headers=headers)
pprint.pprint(r.json())

 

2.3.3 创建Airflow工作流

(1)在airflow文件夹中创建dags文件夹,并进入到文件夹中;
(2)定义工作流(注意开头的# — coding: UTF-8 –不要省略,并且bash_command需替换为自己任务所在的路径);

vim dag_transform_calpi.py

# -*- coding: UTF-8 -*-

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime.now().replace(microsecond=0),
    'email': ['756044579@qq.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('dag_transform_calpi', default_args=default_args,
          schedule_interval=timedelta(days=1))
          
# 创建emr集群
t0 = BashOperator(
    task_id='create_emr_cluster',
    bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/create_emr_cluster.py',
    dag=dag)
    
# 创建livy的会话
t1 = BashOperator(
    task_id='create_livy_session',
    bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/create_session.py',
    dag=dag)
    
# 等待会话创建完成
t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 20',
    dag=dag)
    
# 计算pi值
t3 = BashOperator(
    task_id='calpi',
    bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/calpi.py',
    dag=dag)
    
# 终止emr集群
t4 = BashOperator(
    task_id='terminate_cluster',
    bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/terminate_cluster.py',
    dag=dag)
    
# dag_query中的spark sql任务
external_task = ExternalTaskSensor(
    external_task_id='query', task_id='query_completed', external_dag_id='dag_query', dag=dag)
    
# 定义airflow的有向无环图
t0 >> t1
t1 >> t2
t2 >> t3
external_task >> t4

 


vim dag_query.py 
# -*- coding: UTF-8 -*-
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime.now().replace(microsecond=0),
    'email': ['756044579@qq.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('dag_query', default_args=default_args,
          schedule_interval=timedelta(days=1))
          
# 对s3上的文本文件进行转化操作
t0 = BashOperator(
    task_id='transform',
    bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/transform.py',
    dag=dag)
    
# 轮询s3,查看中间结果是否生成
t1 = BashOperator(
    task_id='check_s3',
    bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/check_s3.py',
    dag=dag)
    
# spark sql任务
t2 = BashOperator(
    task_id='query',
    bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/query.py',
    dag=dag)
    
# dag_transform_calpi中的sleep任务
external_task = ExternalTaskSensor(
    external_task_id='sleep', task_id='sleep_completed', external_dag_id='dag_transform_calpi', dag=dag)
external_task >> t0
t0 >> t1
t1 >> t2

 

(3)重制Airflow数据库;

airflow resetdb

 

(4)启动Airflow,-s为当前日期,-e是结束日期,均设置为当日的日期(若工作流执行失败并想重头开始执行工作,需要先执行airflow resetdb)

airflow backfill dag_transform_calpi -s 2019-12-02 -e 2019-12-02 & airflow backfill dag_query -s 2019-12-02 -e 2019-12-02

 

3. 展示

(1)打开AWS EMR控制台,可以观察到集群正在创建;

(2)待集群创建完成后,获取主节点DNS,并打开网页;

(3)观察到Livy上并行提交了两个任务分别是spark对文本的tansform操作和jar包计算pi值的任务;

(4)pi值计算完成;

(5)待Transform任务完成,Spark SQL任务开始执行;

(6)执行完成后可以在s3上可以看到Transform任务生成的middle result和Spark SQL任务生成的最终结果;

(7)下载middle_result中的文件,可以看到聚合结果;

(8)下载result中的文件,可以查看到最终结果(统计编号为9的列表中包含53组数据,-1表示其他json文件没有编号为9的;

(9)任务执行完毕,发现集群自动终止;

(10)再查看远程服务器上Airflow的web界面,发现两个dag已经执行完毕。

4. 总结

本文展现了如何使用Airflow启动EMR集群,并通过Livy远程提交任务,在任务完成后终止集群。成本节省主要体现在两个方面:1)每天在需要执行ETL工作时启动集群,任务执行完成后终止集群,因此不会出现空闲的集群;2)EMR可以配合Spot实例使用,从而节省更多的成本。另一个好处是使用Livy无需额外配置远程提交任务的服务器,并且EMR集成了Livy的一键安装,造成了极大的方便。

 

本篇作者

肖元君

AWS解决方案架构师,负责基于AWS云计算方案的架构咨询和设计实现,同时致力于数据分析与AI的研究与应用。

本篇作者

翟羽翔

AWS解决方案架构师,负责基于AWS云计算方案的架构咨询和设计实现,同时致力于数据湖的应用和推广。