当前位置:   article > 正文

Spark调度系统——Stage详解_spark rdd stage

spark rdd stage

DAGScheduler会将Job的RDD划分到不同的Stage,并构建这些Stage的依赖关系。这样可以使得没有依赖关系的Stage并行执行,并保证有依赖关系的Stage顺序执行。并行执行能够有效利用集群资源,提升运行效率,而串行执行则适用于那些在时间和数据资源上存在强制依赖的场景。Stage分为需要处理Shuffle的ShuffleMapStage和最下游的ResultStage。上游Stage先于下游Stage执行,ResultStage是最后执行的Stage。Stage的属性如下:

  • id:Stage的身份标识
  • rdd:当前Stage包含的RDD
  • numTasks:当前Stage的Task数量
  • parents:当前Stage的父Stage列表。说明一个Stage可以有一个到多个父Stage
  • firstJobId:第一个提交当前Stage的Job的身份标识(即Job的id)。当使用FIFO调度时,通过firstJobId首先计算来自较早Job的Stage,或者在发生故障时更快地恢复
  • callSite:应用程序中与当前Stage相关联的调用栈信息
  • numPartitions:当前Stage的分区数量。实际为rdd的分区的数量
  • jobIds:当前Stage所属的Job的身份标识集合。说明一个Stage可以属于一到多个Job
  • pendingPartitions:存储待处理分区的索引的集合
  • nextAttemptId:用于生成Stage下一次尝试的身份标识
  • _latestInfo:Stage最近一次尝试的信息,即StageInfo
  • fetchFailedAttemptIds:发生过FetchFailure的Stage尝试的身份标识的集合。此属性用于避免在发生FetchFailure后无止境的重试

下面来学习一下Stage的方法

  • clearFailures:清空fetchFialedAttemptIds
  • failedOnFetchAndShouldAbort:用于将发生FetchFailure的Stage尝试的身份标识添加到fetchFailedAttemptIds中,并返回发生FetchFailure的次数是否已经超过了允许发生FetchFailure的次数的状态。允许发生FetchFailure的次数固定为4
  • latestInfo:返回最近一次Stage尝试的StageInfo,即返回_latestInfo
  • findMissingPartitions:找到还未执行完成的分区
  • mekeNewStageAttempt:用于创建新的Stage尝试
  1. //org.apache.spark.scheduler.Stage
  2. def makeNewStageAttempt(
  3. numPartitionsToCompute: Int,
  4. taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
  5. val metrics = new TaskMetrics
  6. metrics.register(rdd.sparkContext)
  7. _latestInfo = StageInfo.fromStage(
  8. this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
  9. nextAttemptId += 1
  10. }

其执行步骤如下:

  • 1)调用StageInfo的fromStage方法创建新的StageInfo
  • 2)增加nextAttemptId

抽象类Stage有两个实现子类,分别为ShuffleMapStage和ResultStage

1 ResultStage的实现

ResultStage可以使用指定的函数对RDD中的分区进行计算并得出最终结果。ResultStage是最后执行的Stage,此阶段主要进行作业的收尾工作(例如,对各个分区的数据收拢、打印到控制台或写入到HDFS)。ResultStage除继承自父类Stage的属性外,还包括以下属性:

  • func:即对RDD的分区进行计算的函数。func是ResultStage的构造器参数,指定了函数的形式必须满足:
(TaskContext, Iterator[_]) => _
  • partitions:由RDD的各个分区的索引组成的数组
  • _activeJob:ResultStage处理的ActiveJob

还提供了一些方法

  1. //org.apache.spark.scheduler.ResultStage
  2. def activeJob: Option[ActiveJob] = _activeJob
  3. def setActiveJob(job: ActiveJob): Unit = {
  4. _activeJob = Option(job)
  5. }
  6. def removeActiveJob(): Unit = {
  7. _activeJob = None
  8. }
  9. override def findMissingPartitions(): Seq[Int] = {
  10. val job = activeJob.get
  11. (0 until job.numPartitions).filter(id => !job.finished(id))
  12. }

findMissingPartitions用于找出当前Job的所有分区中还没有完成的分区的索引。ResultStage判断一个分区是否完成,是通过AcitveJob的Boolean类型数组finished,因为finished记录了每个分区是否完成。

2 ShuffleMapStage的实现

ShuffleMapStage是DAG调度流程的中间Stage,它可以包括一到多个ShuffleMapTask,这些ShuffleMapTask将生成用于Shuffle的数据。ShuffleMapStage一般是ResultStage或者其它ShuffleMapStage的前置Stage,ShuffleMapTask则通过Shuffle与下游Stage中的Task串联起来。从ShuffleMapStage的命名可以看出,它将对Shuffle的数据映射到下游Stage的各个分区中。ShuffleMapStage除继承自父类Stage的属性外,还包括以下属性:

  • shuffleDep:与ShuffleMapStage相关联的ActiveJob的列表
  • _mapStageJobs:与ShuffleMapStage相关联的ActiveJob的列表
  • _numAvailableOutputs:ShuffleMapStage可用的map任务的输出数量,这也代表了执行成功的map任务数
  • outputLocs:ShuffleMapStage的各个map任务与其对应的MapStatus列表的映射关系。由于map任务可能会运行多次,因而可能会有多个MapStatus

ShuffleMapStage还提供了一些方法:

  • mapStageJobs:即读取_mapStageJobs的方法
  • addActiveJob与removeActiveJob:向ShuffleMapStage相关联的ActiveJob的列表中添加或删除ActiveJob。
  1. //org.apache.spark.scheduler.ShuffleMapStage
  2. def addActiveJob(job: ActiveJob): Unit = {
  3. _mapStageJobs = job :: _mapStageJobs
  4. }
  5. def removeActiveJob(job: ActiveJob): Unit = {
  6. _mapStageJobs = _mapStageJobs.filter(_ != job)
  7. }
  • numAvailableOutputs:即读取_numAvailableOutputs的方法
  • isAvailable:当_numAvailableOutputs与numPartitions相等时为true。也就是说,ShuffleMapStage的所有分区的map任务都执行成功后,ShuffleMapStage才是可用的
  • findMissingPartitions:找到所有还未执行成功而需要计算的分区
  1. //org.apache.spark.scheduler.ShuffleMapStage
  2. override def findMissingPartitions(): Seq[Int] = {
  3. val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)
  4. assert(missing.size == numPartitions - _numAvailableOutputs,
  5. s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
  6. missing
  7. }
  • addOutputLoc:当某一分区的任务执行完成后,首先将分区与MapStatus的对应关系添加到outputLocs中,然后将可用的输出数加一
  1. //org.apache.spark.scheduler.ShuffleMapStage
  2. def addOutputLoc(partition: Int, status: MapStatus): Unit = {
  3. val prevList = outputLocs(partition)
  4. outputLocs(partition) = status :: prevList
  5. if (prevList == Nil) {
  6. _numAvailableOutputs += 1
  7. }
  8. }

3 StageInfo

StageInfo用于描述Stage信息,并可以传递给SparkListener。StageInfo包括以下属性:

  • stageId:Stage的id
  • attemptId:当前Stage尝试的id
  • name:当前Stage的名称
  • numTasks:当前Stage的Task数量
  • rddInfos:RDD信息(即RDDInfo)的序列
  • parentIds:当前Stage的父亲Stage的身份标识序列
  • details:详细的线程栈信息
  • taskMetrics:Task的度量信息
  • taskLocalityPreferences:类型为Seq[Seq[TaskLocation]],用于存储任务的本地性偏好
  • submissionTime:DAGScheduler将当前Stage提交给TaskScheduler的时间
  • completionTime:当前Stage中的所有Task完成的时间(即Stage完成的时间)或者Stage被取消的时间
  • failureReason:如果Stage失败了,用于记录失败的原因
  • accumulables:存储了所有聚合器计算的最终值

在StageInfo的伴生对象中还提供了构建StageInfo的方法

  1. //org.apache.spark.scheduler.ShuffleMapStage
  2. def fromStage(
  3. stage: Stage,
  4. attemptId: Int,
  5. numTasks: Option[Int] = None,
  6. taskMetrics: TaskMetrics = null,
  7. taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
  8. ): StageInfo = {
  9. val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
  10. val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
  11. new StageInfo(
  12. stage.id,
  13. attemptId,
  14. stage.name,
  15. numTasks.getOrElse(stage.numTasks),
  16. rddInfos,
  17. stage.parents.map(_.id),
  18. stage.details,
  19. taskMetrics,
  20. taskLocalityPreferences)
  21. }

根据代码,其执行步骤如下:

  • 1)调用当前Stage的RDD的getNarrowAncesstors方法,获取RDD的祖先依赖中属于窄依赖的RDD序列
  • 2)对上一步中获得的RDD序列中的每个RDD,调用RDDInfo伴生对象的fromRdd方法创建RDDInfo对象
  • 3)给当前Stage的RDD创建对应的RDDInfo对象,将上一步中创建的所有RDDInfo对象与此RDDInfo对象放入序列rddInfos中
  • 4)创建StageInfo
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/776449
推荐阅读
相关标签
  

闽ICP备14008679号