赞
踩
调度系统,是贯穿整个Spark应用的主心骨,从调度系统开始入手了解Spark Core,比较容易理清头绪。
Spark的资源调度采用的是常见的两层调度,底层资源的管理和分配是第一层调度,交给YARN、Mesos或者Spark的Standalone集群处理,Application从第一层调度拿到资源后,还要进行内部的任务和资源调度,将任务和资源进行匹配,这是第二层调度,本文讲的就是这第二层调度。
Spark的调度体系涉及的任务包括3个粒度,分别是Job、Stage、Task。Job代表用户提交的一系列操作的总体,一个具体的计算任务,有明确的输入输出,一个Job由多个Stage组成;一个Stage代表Job计算流程的一个组成部分,一个阶段,包含多个Task;一个Task代表对一个分区的数据进行计算的具体任务。
层级关系:Job > Stage > Task
在Spark Core 解析:RDD 弹性分布式数据集中,已经解释了RDD之间的依赖,以及如何组成RDD血缘图。
所以本文主要目的就是解释清楚:Scheduler将RDD血缘图转变成Stage DAG,然后生成Task,最后提交给Executor去执行的过程。
Job的不同分区的计算通常可以并行,但是有些计算需要将数据进行重新分区,这个过程称作shuffle(混洗)。Shuffle的过程是没法完全并行的,这时候就会出现task之间的等待,task的数量也可能发生变化,所以Spark中以shuffle为边界,对task进行划分,划分出来的每段称为Stage。
Stage代表一组可以并行的执行相同计算的task,每个任务必须有相同的分区规则,这样一个stage中是没有shuffle的。
在一个Spark App中,stage有一个全局唯一ID,stage id是自增的。
Stage分为两种:
stage创建流程:
val rg=sc.parallelize(List((1,10),(2,20)))
rg.reduceByKey(_ _).collect
这里reduceByKey操作引起了一次shuffle,所以job被切分成了2个stage。
val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c")))
val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C")))
rddA.join(rddB).collect
join操作导致rddA和rddB都进行了一次shuffle,所以有3个stage。
import org.apache.spark.HashPartitioner
val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c"))).partitionBy(new HashPartitioner(3))
val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C")))
rddA.join(rddB).collect
WHAT ?
因为rddA已经定义了Partitioner,这里join操作会保留rddA的分区方式,所以对rddA的依赖是OneToOneDepenency,而对于rddB则是ShuffleDependency。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。