赞
踩
DAGScheduler会将Job的RDD划分到不同的Stage,并构建这些Stage的依赖关系。这样可以使得没有依赖关系的Stage并行执行,并保证有依赖关系的Stage顺序执行。并行执行能够有效利用集群资源,提升运行效率,而串行执行则适用于那些在时间和数据资源上存在强制依赖的场景。Stage分为需要处理Shuffle的ShuffleMapStage和最下游的ResultStage。上游Stage先于下游Stage执行,ResultStage是最后执行的Stage。Stage的属性如下:
下面来学习一下Stage的方法
- //org.apache.spark.scheduler.Stage
- def makeNewStageAttempt(
- numPartitionsToCompute: Int,
- taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
- val metrics = new TaskMetrics
- metrics.register(rdd.sparkContext)
- _latestInfo = StageInfo.fromStage(
- this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
- nextAttemptId += 1
- }
其执行步骤如下:
抽象类Stage有两个实现子类,分别为ShuffleMapStage和ResultStage
ResultStage可以使用指定的函数对RDD中的分区进行计算并得出最终结果。ResultStage是最后执行的Stage,此阶段主要进行作业的收尾工作(例如,对各个分区的数据收拢、打印到控制台或写入到HDFS)。ResultStage除继承自父类Stage的属性外,还包括以下属性:
(TaskContext, Iterator[_]) => _
还提供了一些方法
- //org.apache.spark.scheduler.ResultStage
- def activeJob: Option[ActiveJob] = _activeJob
- def setActiveJob(job: ActiveJob): Unit = {
- _activeJob = Option(job)
- }
- def removeActiveJob(): Unit = {
- _activeJob = None
- }
- override def findMissingPartitions(): Seq[Int] = {
- val job = activeJob.get
- (0 until job.numPartitions).filter(id => !job.finished(id))
- }
findMissingPartitions用于找出当前Job的所有分区中还没有完成的分区的索引。ResultStage判断一个分区是否完成,是通过AcitveJob的Boolean类型数组finished,因为finished记录了每个分区是否完成。
ShuffleMapStage是DAG调度流程的中间Stage,它可以包括一到多个ShuffleMapTask,这些ShuffleMapTask将生成用于Shuffle的数据。ShuffleMapStage一般是ResultStage或者其它ShuffleMapStage的前置Stage,ShuffleMapTask则通过Shuffle与下游Stage中的Task串联起来。从ShuffleMapStage的命名可以看出,它将对Shuffle的数据映射到下游Stage的各个分区中。ShuffleMapStage除继承自父类Stage的属性外,还包括以下属性:
ShuffleMapStage还提供了一些方法:
- //org.apache.spark.scheduler.ShuffleMapStage
- def addActiveJob(job: ActiveJob): Unit = {
- _mapStageJobs = job :: _mapStageJobs
- }
- def removeActiveJob(job: ActiveJob): Unit = {
- _mapStageJobs = _mapStageJobs.filter(_ != job)
- }
- //org.apache.spark.scheduler.ShuffleMapStage
- override def findMissingPartitions(): Seq[Int] = {
- val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)
- assert(missing.size == numPartitions - _numAvailableOutputs,
- s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
- missing
- }
- //org.apache.spark.scheduler.ShuffleMapStage
- def addOutputLoc(partition: Int, status: MapStatus): Unit = {
- val prevList = outputLocs(partition)
- outputLocs(partition) = status :: prevList
- if (prevList == Nil) {
- _numAvailableOutputs += 1
- }
- }
StageInfo用于描述Stage信息,并可以传递给SparkListener。StageInfo包括以下属性:
在StageInfo的伴生对象中还提供了构建StageInfo的方法
- //org.apache.spark.scheduler.ShuffleMapStage
- def fromStage(
- stage: Stage,
- attemptId: Int,
- numTasks: Option[Int] = None,
- taskMetrics: TaskMetrics = null,
- taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
- ): StageInfo = {
- val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
- val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
- new StageInfo(
- stage.id,
- attemptId,
- stage.name,
- numTasks.getOrElse(stage.numTasks),
- rddInfos,
- stage.parents.map(_.id),
- stage.details,
- taskMetrics,
- taskLocalityPreferences)
- }

根据代码,其执行步骤如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。