亚马逊AWS官方博客

Spark on EKS在Graviton2实例上的性能调优实践

一、前言

Spark是用于大规模数据处理的统一分析引擎,它提供了Scala、Java、Python和R语言的高级Api,基于Spark Core构建了Spark SQL、Spark Streaming、MLlib和Graphx四个主要编程库,分别用于离线ETL(Extract-Transform-Load)、在线数据分析、流计算、机器学习和图计算等场景,在分析领域可谓大放异彩,成为目前最流行的计算框架之一。

Kubernetes是一套流行的开源容器管理系统,负责为用户提供用于应用程序部署、维护以及扩展的基础解决方案。Amazon EKS是AWS上的一项托管Kubernetes服务,提供高可用性的控制平面,能够运行生产级工作负载。客户可以在EKS上运行各类常见工作负载,例如微服务、批处理以及机器学习等。

从Spark 2.3开始,支持使用Kubernetes运行并管理各类Spark资源,在Kubernetes上运行Spark,能够大大减少实验时间,也可以通过多种优化技术降低运营复杂性。因此托管的Amazon EKS也成为AWS客户在Kubernetes上调度Spark应用程序的热门选择。

AWS Graviton2是AWS设计的一款处理器,它使用64位Arm Neoverse核心进行了定制,为客户在AWS中的工作负载提供最佳的性价比。利用该架构,与同类第5代x86实例相比,在面对广泛的工作负载时,如应用程序服务器、微服务、视频编码、高性能计算、游戏、开源数据库、内存缓存和基于CPU的机器学习推理,c6g、m6g和r6g等基于Graviton2的EC2实例可以提供高达40%的性价比。

在大规模离线分析场景中,Spark on EKS任务可能会调度数千个实例、运行数小时,如果采用Graviton2实例类型,将能显著提升任务运行效率,节省成本。本文将基于客户真实场景、真实任务进行测试,探索在Spark on EKS中运行计算任务时,x86机型和Graviton2机型在任务配置、执行时间、资源消耗、成本方面的差异。

二、环境准备

1. 测试架构


EKS集群配置两个Nodegroup,一个启动Graviton2实例,另一个启动x86实例,根据Spark job传入的参数,自动将任务启动到对应的Nodegroup,部署Cluster Autoscaler进行节点自动伸缩。

2. 环境配置

  • 任务节点:x86机型:m5.2xlarge,Graviton2机型:m6g.2xlarge,这两种机型都是8核cpu,32G内存;
  • 测试机及网络:测试机为Job提交客户端,m6i.xlarge,与EKS集群执行任务的节点在同一子网;
  • Java:版本为Openjdk-11.0.14.1;
  • Spark:版本为3.2.1,镜像为官方Docker镜像;
  • EKS:1.21;
  • 数据:包括100个文件,每个文件4万行记录,128个字段,详细数据结构参考这里
  • 任务:Sql交叉查询任务,详细任务参考这里

3. 环境搭建

3.1 EKS集群创建

  • 创建EKS集群
    通过以下命令创建集群,集群中默认配置两个节点组,通过arch标签进行匹配,配置Kubeconfig
wget https://raw.githubusercontent.com/pyun/spark-on-eks-graviton/main/eksctl.yaml
eksctl create cluster -f eksctl.yaml
eksctl utils associate-iam-oidc-provider --region=us-east-1 --cluster=spark-eks-test1 --approve
aws eks --region us-east-1 update-kubeconfig --name spark-eks-test1
  • 部署集群自动伸缩
    通过以下命令,配置集群自动伸缩CA
wget https://raw.githubusercontent.com/pyun/spark-on-eks-graviton/main/cluster-autoscaler.yml
kubectl create -f cluster_autoscaler.yml
  • 新建账号策略
    通过控制台或命令行创建以下账号策略,策略名:spark_eks_policy
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "SourcePermissions",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::nyc-tlc/*",
                "arn:aws:s3:::nyc-tlc"
            ]
        },
        {
            "Sid": "TargetPermissions",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:AbortMultipartUpload",
                "s3:DeleteObject",
                "s3:ListMultipartUploadParts",
                "s3:listBucketMultipartUploads",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<output_bucket_name>/*",
                "arn:aws:s3:::<output_bucket_name>"
            ]
        }
    ]
}
  • 创建命名空间
kubectl create namespace spark
  • 创建账号
eksctl create iamserviceaccount \
--name spark \
--namespace spark \
--cluster spark-eks-test1 \
--attach-policy-arn arn:aws:iam::<ACCOUNT_ID>:policy/spark_eks_policy \
--approve --override-existing-serviceaccounts
  • 绑定账号
kubectl create clusterrolebinding spark-role \
--clusterrole=edit \
--serviceaccount=spark:spark

3.2 spark镜像生成

  • 下载Spark安装包,解压
wget https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
  • 下载以下jar包,复制到spark-3.2.1-bin-hadoop3.2/jars目录
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/3.3.1/hadoop-common-3.3.1.jar
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.235/aws-java-sdk-bundle-1.12.235.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.12.235/aws-java-sdk-s3-1.12.235.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.12.235/aws-java-sdk-1.12.235.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.12.235/aws-java-sdk-core-1.12.235.jar
  • 通过控制台,创建ECR镜像库
  • 使用以下Dockerfile文件,构建Spark x86和arm镜像,并上传到ECR
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin <ACCOUNT_ID>.dkr.ecr.us-east-1.amazonaws.com
docker build -t <ACCOUNT_ID>.dkr.ecr.us-east-1.amazonaws.com/spark:3.2.1-arm-test-v0.1 .
docker push <ACCOUNT_ID>.dkr.ecr.us-east-1.amazonaws.com/spark:3.2.1-arm-test-v0.1
  • Dockerfile内容如下,基于官方镜像,加入aws相关jar包。
#Use Spark base image built for AWS compatiblity arm/x86 image
#FROM --platform=linux/arm64 apache/spark:v3.2.1
FROM --platform=linux/amd64 apache/spark:v3.2.1

COPY ./spark-3.2.1-bin-hadoop3.2/jars/aws-java-sdk-1.12.190.jar  /opt/spark/jars
COPY ./spark-3.2.1-bin-hadoop3.2/jars/aws-java-sdk-bundle-1.11.375.jar  /opt/spark/jars
COPY ./spark-3.2.1-bin-hadoop3.2/jars/aws-java-sdk-s3-1.12.200.jar  /opt/spark/jars
COPY ./spark-3.2.1-bin-hadoop3.2/jars/hadoop-aws-3.2.0.jar /opt/spark/jars

ENV SPARK_HOME /opt/spark

WORKDIR /opt/spark/work-dir

ENTRYPOINT [ "/opt/entrypoint.sh" ]

3.3 其他环境(可选)

  • 通过Glue 搭建Spark history server,方便查看任务执行情况
  • 搭建Kubernetes Dashboard

三、数据和任务准备

1. 数据准备

这里下载数据文件,下载后上传到s3,包括100个csv文件,每个文件4万行记录,128个字段,20MB;

2. 任务准备

任务是一个交叉查询任务,属于计算密集型任务,详细任务Sql见这里, 任务Java代码见这里
代码片段如下:

public class SparkArmTest {
    public static void main(String[] args) {
        if (args.length < 1) return;
        String arch = args[0];
        testSparkTask(arch);
    }
    public static void testSparkTask(String arch) {
        //初始化SparkSession
        SparkSession spark = SparkSession
                .builder()
                .appName("spark_on_eks_" + taskType +   "_" + arch)
                .getOrCreate();
        try {
            //从s3读取数据
            JavaRDD<Row> source = spark.read()
                    .text("s3a://pyunspark/x86_arm_test_1")
                    .javaRDD();
            JavaRDD<Row> rowRDD = schemaRDD(source);
            //创建临时表
            Dataset<Row> df = spark.createDataFrame(rowRDD, generateSchema(rowRDD));
            df.createTempView("test");
            //执行任务Sql,从临时表查询数据并写回s3
            Dataset<Row> ds1 = spark.sql(getTaskString(taskType));
            ds1.write().format("orc").save("s3a://pyunspark/x86_arm_test_1_target1/" +   taskType + "/" + arch + "/" + LocalDateTime.now().toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
        spark.stop();
    }
}

四、性能测试及调优

1. 任务执行流程

编译Java任务代码,将jar包上传到测试客户机,执行spark-submit命令提交任务到EKS集群。通过spark.kubernetes.node.selector.arch参数匹配x86或Graviton2节点组。
完整命令如下:

/home/ec2-user/spark/spark-3.2.1-bin-hadoop3.2/bin/spark-submit \
--master k8s://<eks cluster endpoint> \
--deploy-mode cluster \
--name java_spark_test \
--class mydemo.spark.SparkArmTest \
--conf spark.executor.instances=250 \
--conf spark.kubernetes.allocation.batch.size=250 \
--conf spark.default.parallelism=1000 \
--conf spark.sql.shuffle.partitions=1000 \
--conf spark.executor.memoryOverhead=1G \
--conf spark.executor.memory=8G \
--conf spark.kubernetes.executor.request.cores=0.5 \
--conf spark.executor.cores=4 \
--conf spark.driver.cores=2 \
--conf spark.kubernetes.node.selector.arch=<arm/x86> \
--conf spark.kubernetes.container.image=<images> \
--conf spark.kubernetes.file.upload.path=s3a://<data> \
--conf spark.hadoop.fs.s3a.access.key=xxxxxxx \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.fast.upload=true \
--conf spark.hadoop.fs.s3a.secret.key=xxxxxx \
--conf spark.history.provider=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=s3a://<sparkhistory> \
--conf spark.history.fs.logDirectory=s3a://<sparkhistory> \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.namespace=spark \
./java_spark_test.jar \

2. 参数调优

根据Graviton2的硬件设计理念,在高并发时相对于x86芯片更具有优势,所以,我们在提交Spark job时,参数设置考虑尽量提高任务执行的并行度,提高cpu的利用率,因此主要通过调整如下参数对比x86和graviton2机型的执行情况

--conf spark.executor.instances=250  #表示要启动的executor个数
--conf spark.default.parallelism=1000 #task并行度
--conf spark.sql.shuffle.partitions=1000 #task并行度
--conf spark.executor.memory=8G  #表示每个executor分配的内存
--conf spark.kubernetes.executor.request.cores=0.5 #表示executor的pod分配的vCPU数
--conf spark.executor.cores=4 #表示每个executor分配的内核数

可以理解为spark.executor.cores*spark.kubernetes.executor.request.cores为每个executor实际要占用的内核数,由此我们可以计算出需要的节点数,根据每个2xlarge的节点是8核32G内存,可以得到节点数计算公式如下:

根据Spark官网建议,并行度设置为当前Spark job的总core数量的2~3倍,可以更加充分的利用CPU资源。理论上,当执行任务时,需要的Node数量越少,执行时间越短,就越具有成本和性能效益,所以,我们通过调整executor数、每executor分配资源量、任务并行度来寻找最优解。

  • 情况一:

    配置executor为100,每executor分配12G内存4vCPU时,并行度设置为800,需要100个Node运行Job,每节点1个executor,cpu利用率(50%),此时,Graviton2实例没有性能优势,Job耗时比x86长,总节点耗时更多,但因为单价比较低,所以总体费用也有16.3%的节省。
  • 情况二:

    配置executor为250,每executor分配12G内存2vCPU时,并行度设置为800,需要125个node运行Job,每节点2个executor,此时cpu利用率保持不变时(50%),Job耗时和总节点耗时都有所减少,但Graviton2实例性能优势仍然没有体现。
  • 情况三:

    配置executor为250,每executor分配8G内存2vCPU时,并行度设置为800,需要84个node运行Job,每节点3个executor,此时,cpu利用率有所提高(75%),Job耗时和总节点耗时都减少明显,Graviton2机型无论从性能还是成本都体现出明显的优势。
  • 情况四:

    配置executor为150,每executor分配8G内存4vCPU,但每任务配置0.5vCPU,并行度设置为800,需要50个node运行Job,每节点3个executor,此时,cpu利用率保持(75%),由于使用了更少的节点,每节点运行任务增加,所以Job耗时明显增加,但总节点耗时进一步减少,Graviton2相对x86实例运行速度提升22.86%,费用节省38.13%,优势更加明显。
  • 情况五:

    配置executor为120,每executor分配6G内存4vCPU,但每任务配置0.4vCPU,并行度设置为800,需要30个node运行Job,每节点4个executor,此时,cpu利用率保持(80%),Job耗时进一步增加,但总节点耗时进一步减少,只需17.5节点时,Graviton2相对x86实例运行速度提升到25.53%,费用节省高达40.27%。

如继续提升利用率将出现退变,Job无法正常完成,出现大量任务OOM。

从以上测试情况,我们可以发现,无论是x86实例还是Graviton2实例,当提高任务并行度、提升节点资源利用率后,整体Job的耗时都会减少。但当资源利用率太高,导致每任务分配资源太少时,Job耗时又会增加,但对于总节点耗时还是会减少,更加节约成本。总之cpu利用率高时,Graviton2实例相对于x86实例无论在性能还是成本上都有明显的优势。

五、结论

通常情况下,在相同的软件环境中,如果任务高度并行执行,充分利用cpu资源,Graviton2的性能提升可高达25%,成本节省可高达40%;

因为Graviton2的硬件设计是从高并发互联网应用出发的,所以其设计重点在于大L1.L2 Cache、无超线程资源争抢、大内存带宽以及核与核之间的一致性访问延迟。所以,对高并发业务,CPU利用率高的时候,Graviton2减少了线程上下文切换的资源竞争,并且充分利用了Cache和内存的特性,相对于x86实例其性能表现会更加突出。

本篇作者

彭赟

AWS资深解决方案架构师,负责基于AWS的云计算方案架构咨询和设计,20多年软件架构、设计、开发、项目管理交付经验,擅长业务咨询、产品设计、软件架构,在大数据、区块链、容器化方向有较深入的研究,具有丰富的解决客户实际问题的经验。