Spark是用于大规模数据处理的统一分析引擎,它提供了Scala、Java、Python和R语言的高级Api,基于Spark Core构建了Spark SQL、Spark Streaming、MLlib和Graphx四个主要编程库,分别用于离线ETL(Extract-Transform-Load)、在线数据分析、流计算、机器学习和图计算等场景,在分析领域可谓大放异彩,成为目前最流行的计算框架之一。
Kubernetes是一套流行的开源容器管理系统,负责为用户提供用于应用程序部署、维护以及扩展的基础解决方案。Amazon EKS是AWS上的一项托管Kubernetes服务,提供高可用性的控制平面,能够运行生产级工作负载。客户可以在EKS上运行各类常见工作负载,例如微服务、批处理以及机器学习等。
从Spark 2.3开始,支持使用Kubernetes运行并管理各类Spark资源,在Kubernetes上运行Spark,能够大大减少实验时间,也可以通过多种优化技术降低运营复杂性。因此托管的Amazon EKS也成为AWS客户在Kubernetes上调度Spark应用程序的热门选择。
AWS Graviton2是AWS设计的一款处理器,它使用64位Arm Neoverse核心进行了定制,为客户在AWS中的工作负载提供最佳的性价比。利用该架构,与同类第5代x86实例相比,在面对广泛的工作负载时,如应用程序服务器、微服务、视频编码、高性能计算、游戏、开源数据库、内存缓存和基于CPU的机器学习推理,c6g、m6g和r6g等基于Graviton2的EC2实例可以提供高达40%的性价比。
在大规模离线分析场景中,Spark on EKS任务可能会调度数千个实例、运行数小时,如果采用Graviton2实例类型,将能显著提升任务运行效率,节省成本。本文将基于客户真实场景、真实任务进行测试,探索在Spark on EKS中运行计算任务时,x86机型和Graviton2机型在任务配置、执行时间、资源消耗、成本方面的差异。
EKS集群配置两个Nodegroup,一个启动Graviton2实例,另一个启动x86实例,根据Spark job传入的参数,自动将任务启动到对应的Nodegroup,部署Cluster Autoscaler进行节点自动伸缩。
- 任务节点:x86机型:m5.2xlarge,Graviton2机型:m6g.2xlarge,这两种机型都是8核cpu,32G内存;
- 测试机及网络:测试机为Job提交客户端,m6i.xlarge,与EKS集群执行任务的节点在同一子网;
- Java:版本为Openjdk-11.0.14.1;
- Spark:版本为3.2.1,镜像为官方Docker镜像;
- EKS:1.21;
- 数据:包括100个文件,每个文件4万行记录,128个字段,详细数据结构参考这里;
- 任务:Sql交叉查询任务,详细任务参考这里;
- 创建EKS集群
通过以下命令创建集群,集群中默认配置两个节点组,通过arch标签进行匹配,配置Kubeconfig
wget https://raw.githubusercontent.com/pyun/spark-on-eks-graviton/main/eksctl.yaml
eksctl create cluster -f eksctl.yaml
eksctl utils associate-iam-oidc-provider --region=us-east-1 --cluster=spark-eks-test1 --approve
aws eks --region us-east-1 update-kubeconfig --name spark-eks-test1
- 部署集群自动伸缩
通过以下命令,配置集群自动伸缩CA
wget https://raw.githubusercontent.com/pyun/spark-on-eks-graviton/main/cluster-autoscaler.yml
kubectl create -f cluster_autoscaler.yml
- 新建账号策略
通过控制台或命令行创建以下账号策略,策略名:spark_eks_policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SourcePermissions",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::nyc-tlc/*",
"arn:aws:s3:::nyc-tlc"
]
},
{
"Sid": "TargetPermissions",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:DeleteObject",
"s3:ListMultipartUploadParts",
"s3:listBucketMultipartUploads",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<output_bucket_name>/*",
"arn:aws:s3:::<output_bucket_name>"
]
}
]
}
kubectl create namespace spark
eksctl create iamserviceaccount \
--name spark \
--namespace spark \
--cluster spark-eks-test1 \
--attach-policy-arn arn:aws:iam::<ACCOUNT_ID>:policy/spark_eks_policy \
--approve --override-existing-serviceaccounts
kubectl create clusterrolebinding spark-role \
--clusterrole=edit \
--serviceaccount=spark:spark
wget https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
- 下载以下jar包,复制到spark-3.2.1-bin-hadoop3.2/jars目录
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/3.3.1/hadoop-common-3.3.1.jar
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.235/aws-java-sdk-bundle-1.12.235.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.12.235/aws-java-sdk-s3-1.12.235.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.12.235/aws-java-sdk-1.12.235.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.12.235/aws-java-sdk-core-1.12.235.jar
- 通过控制台,创建ECR镜像库
- 使用以下Dockerfile文件,构建Spark x86和arm镜像,并上传到ECR
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin <ACCOUNT_ID>.dkr.ecr.us-east-1.amazonaws.com
docker build -t <ACCOUNT_ID>.dkr.ecr.us-east-1.amazonaws.com/spark:3.2.1-arm-test-v0.1 .
docker push <ACCOUNT_ID>.dkr.ecr.us-east-1.amazonaws.com/spark:3.2.1-arm-test-v0.1
- Dockerfile内容如下,基于官方镜像,加入aws相关jar包。
#Use Spark base image built for AWS compatiblity arm/x86 image
#FROM --platform=linux/arm64 apache/spark:v3.2.1
FROM --platform=linux/amd64 apache/spark:v3.2.1
COPY ./spark-3.2.1-bin-hadoop3.2/jars/aws-java-sdk-1.12.190.jar /opt/spark/jars
COPY ./spark-3.2.1-bin-hadoop3.2/jars/aws-java-sdk-bundle-1.11.375.jar /opt/spark/jars
COPY ./spark-3.2.1-bin-hadoop3.2/jars/aws-java-sdk-s3-1.12.200.jar /opt/spark/jars
COPY ./spark-3.2.1-bin-hadoop3.2/jars/hadoop-aws-3.2.0.jar /opt/spark/jars
ENV SPARK_HOME /opt/spark
WORKDIR /opt/spark/work-dir
ENTRYPOINT [ "/opt/entrypoint.sh" ]
- 通过Glue 搭建Spark history server,方便查看任务执行情况
从这里下载数据文件,下载后上传到s3,包括100个csv文件,每个文件4万行记录,128个字段,20MB;
任务是一个交叉查询任务,属于计算密集型任务,详细任务Sql见这里, 任务Java代码见这里
代码片段如下:
public class SparkArmTest {
public static void main(String[] args) {
if (args.length < 1) return;
String arch = args[0];
testSparkTask(arch);
}
public static void testSparkTask(String arch) {
//初始化SparkSession
SparkSession spark = SparkSession
.builder()
.appName("spark_on_eks_" + taskType + "_" + arch)
.getOrCreate();
try {
//从s3读取数据
JavaRDD<Row> source = spark.read()
.text("s3a://pyunspark/x86_arm_test_1")
.javaRDD();
JavaRDD<Row> rowRDD = schemaRDD(source);
//创建临时表
Dataset<Row> df = spark.createDataFrame(rowRDD, generateSchema(rowRDD));
df.createTempView("test");
//执行任务Sql,从临时表查询数据并写回s3
Dataset<Row> ds1 = spark.sql(getTaskString(taskType));
ds1.write().format("orc").save("s3a://pyunspark/x86_arm_test_1_target1/" + taskType + "/" + arch + "/" + LocalDateTime.now().toString());
} catch (Exception e) {
e.printStackTrace();
}
spark.stop();
}
}
编译Java任务代码,将jar包上传到测试客户机,执行spark-submit命令提交任务到EKS集群。通过spark.kubernetes.node.selector.arch参数匹配x86或Graviton2节点组。
完整命令如下:
/home/ec2-user/spark/spark-3.2.1-bin-hadoop3.2/bin/spark-submit \
--master k8s://<eks cluster endpoint> \
--deploy-mode cluster \
--name java_spark_test \
--class mydemo.spark.SparkArmTest \
--conf spark.executor.instances=250 \
--conf spark.kubernetes.allocation.batch.size=250 \
--conf spark.default.parallelism=1000 \
--conf spark.sql.shuffle.partitions=1000 \
--conf spark.executor.memoryOverhead=1G \
--conf spark.executor.memory=8G \
--conf spark.kubernetes.executor.request.cores=0.5 \
--conf spark.executor.cores=4 \
--conf spark.driver.cores=2 \
--conf spark.kubernetes.node.selector.arch=<arm/x86> \
--conf spark.kubernetes.container.image=<images> \
--conf spark.kubernetes.file.upload.path=s3a://<data> \
--conf spark.hadoop.fs.s3a.access.key=xxxxxxx \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.fast.upload=true \
--conf spark.hadoop.fs.s3a.secret.key=xxxxxx \
--conf spark.history.provider=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=s3a://<sparkhistory> \
--conf spark.history.fs.logDirectory=s3a://<sparkhistory> \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.namespace=spark \
./java_spark_test.jar \
根据Graviton2的硬件设计理念,在高并发时相对于x86芯片更具有优势,所以,我们在提交Spark job时,参数设置考虑尽量提高任务执行的并行度,提高cpu的利用率,因此主要通过调整如下参数对比x86和graviton2机型的执行情况
--conf spark.executor.instances=250 #表示要启动的executor个数
--conf spark.default.parallelism=1000 #task并行度
--conf spark.sql.shuffle.partitions=1000 #task并行度
--conf spark.executor.memory=8G #表示每个executor分配的内存
--conf spark.kubernetes.executor.request.cores=0.5 #表示executor的pod分配的vCPU数
--conf spark.executor.cores=4 #表示每个executor分配的内核数
可以理解为spark.executor.cores*spark.kubernetes.executor.request.cores为每个executor实际要占用的内核数,由此我们可以计算出需要的节点数,根据每个2xlarge的节点是8核32G内存,可以得到节点数计算公式如下:
根据Spark官网建议,并行度设置为当前Spark job的总core数量的2~3倍,可以更加充分的利用CPU资源。理论上,当执行任务时,需要的Node数量越少,执行时间越短,就越具有成本和性能效益,所以,我们通过调整executor数、每executor分配资源量、任务并行度来寻找最优解。
- 情况一:
配置executor为100,每executor分配12G内存4vCPU时,并行度设置为800,需要100个Node运行Job,每节点1个executor,cpu利用率(50%),此时,Graviton2实例没有性能优势,Job耗时比x86长,总节点耗时更多,但因为单价比较低,所以总体费用也有16.3%的节省。
- 情况二:
配置executor为250,每executor分配12G内存2vCPU时,并行度设置为800,需要125个node运行Job,每节点2个executor,此时cpu利用率保持不变时(50%),Job耗时和总节点耗时都有所减少,但Graviton2实例性能优势仍然没有体现。
- 情况三:
配置executor为250,每executor分配8G内存2vCPU时,并行度设置为800,需要84个node运行Job,每节点3个executor,此时,cpu利用率有所提高(75%),Job耗时和总节点耗时都减少明显,Graviton2机型无论从性能还是成本都体现出明显的优势。
- 情况四:
配置executor为150,每executor分配8G内存4vCPU,但每任务配置0.5vCPU,并行度设置为800,需要50个node运行Job,每节点3个executor,此时,cpu利用率保持(75%),由于使用了更少的节点,每节点运行任务增加,所以Job耗时明显增加,但总节点耗时进一步减少,Graviton2相对x86实例运行速度提升22.86%,费用节省38.13%,优势更加明显。
- 情况五:
配置executor为120,每executor分配6G内存4vCPU,但每任务配置0.4vCPU,并行度设置为800,需要30个node运行Job,每节点4个executor,此时,cpu利用率保持(80%),Job耗时进一步增加,但总节点耗时进一步减少,只需17.5节点时,Graviton2相对x86实例运行速度提升到25.53%,费用节省高达40.27%。
如继续提升利用率将出现退变,Job无法正常完成,出现大量任务OOM。
从以上测试情况,我们可以发现,无论是x86实例还是Graviton2实例,当提高任务并行度、提升节点资源利用率后,整体Job的耗时都会减少。但当资源利用率太高,导致每任务分配资源太少时,Job耗时又会增加,但对于总节点耗时还是会减少,更加节约成本。总之cpu利用率高时,Graviton2实例相对于x86实例无论在性能还是成本上都有明显的优势。
通常情况下,在相同的软件环境中,如果任务高度并行执行,充分利用cpu资源,Graviton2的性能提升可高达25%,成本节省可高达40%;
因为Graviton2的硬件设计是从高并发互联网应用出发的,所以其设计重点在于大L1.L2 Cache、无超线程资源争抢、大内存带宽以及核与核之间的一致性访问延迟。所以,对高并发业务,CPU利用率高的时候,Graviton2减少了线程上下文切换的资源竞争,并且充分利用了Cache和内存的特性,相对于x86实例其性能表现会更加突出。
本篇作者