当前位置:   article > 正文

Spark学习之7:Job触发及Stage划分_sparkcallsitejob

sparkcallsitejob

1. Job提交触发

流程图:


作业提交流程由RDD的action操作触发,继而调用SparkContext.runJob。
在RDD的action操作后可能会调用多个SparkContext.runJob的重载函数,但最终会调用的runJob见1.1。

1.1. SparkContext.runJob

  1. def runJob[T, U: ClassTag](
  2. rdd: RDD[T],
  3. func: (TaskContext, Iterator[T]) => U,
  4. partitions: Seq[Int],
  5. allowLocal: Boolean,
  6. resultHandler: (Int, U) => Unit) {
  7. if (stopped) {
  8. throw new IllegalStateException("SparkContext has been shutdown")
  9. }
  10. val callSite = getCallSite
  11. val cleanedFunc = clean(func)
  12. logInfo("Starting job: " + callSite.shortForm)
  13. if (conf.getBoolean("spark.logLineage", false)) {
  14. logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  15. }
  16. dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
  17. resultHandler, localProperties.get)
  18. progressBar.foreach(_.finishAll())
  19. rdd.doCheckpoint()
  20. }
参数说明:
(1)func,将在每个分区上执行的函数;
(2)partitions,分区索引号,从0开始;
(3)resultHandler,结果聚合函数;
在job执行完成后,将调用RDD.doCheckPoint检查是否需要做checkpoint。

1.2. DAGScheduler.runJob

  1. def runJob[T, U: ClassTag](
  2. rdd: RDD[T],
  3. func: (TaskContext, Iterator[T]) => U,
  4. partitions: Seq[Int],
  5. callSite: CallSite,
  6. allowLocal: Boolean,
  7. resultHandler: (Int, U) => Unit,
  8. properties: Properties = null)
  9. {
  10. val start = System.nanoTime
  11. val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
  12. waiter.awaitResult() match {
  13. case JobSucceeded => {
  14. logInfo("Job %d finished: %s, took %f s".format
  15. (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
  16. }
  17. case JobFailed(exception: Exception) =>
  18. logInfo("Job %d failed: %s, took %f s".format
  19. (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
  20. throw exception
  21. }
  22. }
(1)调用submitJob函数,返回JobWaiter对象;
(2)由JobWaiter来等待Job的完成或失败。

1.3. DAGScheduler.submitJob

  1. // Check to make sure we are not launching a task on a partition that does not exist.
  2. val maxPartitions = rdd.partitions.length
  3. partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
  4. throw new IllegalArgumentException(
  5. "Attempting to access a non-existent partition: " + p + ". " +
  6. "Total number of partitions: " + maxPartitions)
  7. }
  8. val jobId = nextJobId.getAndIncrement()
  9. if (partitions.size == 0) {
  10. return new JobWaiter[U](this, jobId, 0, resultHandler)
  11. }
  12. assert(partitions.size > 0)
  13. val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  14. val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
  15. eventProcessLoop.post(JobSubmitted(
  16. jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
  17. waiter
(1)创建jobId;
(2)创建JobWaiter对象;
(3)将jobId,JobWriter等对象封入JobSubmitted消息并塞入DAGSchedulerEventProcessLoop的消息队列,DAGSchedulerEventProcessLoop对象包含一个消息队列及读取消息的线程。线程从消息队列中读取消息,根据消息的类型调用DAGScheduler的不同方法(具体见DAGSchedulerEventProcessLoop.onReceive)。 JobSubmitted消息将触发DAGScheduler.handleJobSubmitted方法的调用。

2. RDDs的Stage划分

2.1. DAGScheduler.handleJobSubmitted

  1. var finalStage: Stage = null
  2. try {
  3. // New stage creation may throw an exception if, for example, jobs are run on a
  4. // HadoopRDD whose underlying HDFS files have been deleted.
  5. finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
  6. } catch {
  7. ......
  8. }
  9. ......
调用newStage方法创建RDD对应的finalStage,该RDD是调用action操作的RDD。

2.2. Stage划分流程



2.2.1. Stage

  1. private[spark] class Stage(
  2. val id: Int,
  3. val rdd: RDD[_],
  4. val numTasks: Int,
  5. val shuffleDep: Option[ShuffleDependency[_, _, _]], // Output shuffle if stage is a map stage
  6. val parents: List[Stage],
  7. val jobId: Int,
  8. val callSite: CallSite)
Stage有两种类型:
(1)shuffle map stage;
(2)result stage;
类结构说明:
(1)rdd,表示一个Stage中的最后一个RDD;
(2)numTasks,就是分区的数量;
(3)shuffleDep,如注释所说,如果该stage是shuffle map stage,则该字段表示输出shuffle,即有个RDD shuffle依赖该Stage的结果;
(4)parents,该Stage的父Stages。

2.2.2. DAGScheduler.newStage

  1. private def newStage(
  2. rdd: RDD[_],
  3. numTasks: Int,
  4. shuffleDep: Option[ShuffleDependency[_, _, _]],
  5. jobId: Int,
  6. callSite: CallSite)
  7. : Stage =
  8. {
  9. val parentStages = getParentStages(rdd, jobId)
  10. val id = nextStageId.getAndIncrement()
  11. val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
  12. stageIdToStage(id) = stage
  13. updateJobIdStageIdMaps(jobId, stage)
  14. stage
  15. }
创建一个新的Stage。
从Stage的定义可以看出,创建Stage需要知道其父Stage信息。所以:
(1)先获取RDD(所在Stage)的父Stage;
(2)创建Stage id;
(3)创建Stage
从流程图知道,newStage可能会是一个递归的过程,在获取父Stage时,也需要获取其祖父Stage。

2.2.3. 举例



有这样一个RDD图,其中红色箭头表示RDD之间的Shuffle依赖(宽依赖),其他颜色箭头表示窄依赖。
在RDD9上执行了action操作。
我们要创建RDD9的stage。
Stage以ShuffleDependency为界进行划分。
2.2.3.1 DAGScheduler.getParentStages
获取RDD的parent stage。
  1. private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
  2. val parents = new HashSet[Stage]
  3. val visited = new HashSet[RDD[_]]
  4. // We are manually maintaining a stack here to prevent StackOverflowError
  5. // caused by recursively visiting
  6. val waitingForVisit = new Stack[RDD[_]]
  7. def visit(r: RDD[_]) {
  8. if (!visited(r)) {
  9. visited += r
  10. // Kind of ugly: need to register RDDs with the cache here since
  11. // we can't do it in its constructor because # of partitions is unknown
  12. for (dep <- r.dependencies) {
  13. dep match {
  14. case shufDep: ShuffleDependency[_, _, _] =>
  15. parents += getShuffleMapStage(shufDep, jobId)
  16. case _ =>
  17. waitingForVisit.push(dep.rdd)
  18. }
  19. }
  20. }
  21. }
  22. waitingForVisit.push(rdd)
  23. while (!waitingForVisit.isEmpty) {
  24. visit(waitingForVisit.pop())
  25. }
  26. parents.toList
  27. }
(1)将RDD9放入waitingForVisit栈中,并开始遍历该栈;
(2)从栈中取出一个RDD(即RDD9),它有两个依赖,都是窄依赖,所以将RDD4、RDD7压栈;
(3)取出RDD7,它有两个宽依赖,所以要获取宽依赖对应的shuffle map stage;
(4)取出RDD4,它只有一个窄依赖,所以将RDD3压栈;
(5)取出RDD3,它有一个宽依赖,有一个窄依赖,将RDD5压栈,计算RDD3对应宽依赖的shuffle map stage;
一个RDD的parent Stage要么为None,要么是一个shuffle map stage。

注:
在RDD间产生ShuffleDependency依赖的transform操作时,在创建ShuffledRDD过程中将deps初始化为Nil,并没有实际创建ShuffleDependency对象,但窄依赖是在transform操作时就创建好的。
RDD间的ShuffleDependency对象是通过调用RDD.dependencies创建的(如在该方法中调用r.dependencies)。
由于RDD的遍历是从大编号到小编号,因此先遍历的RDD(编号大)对应ShuffleDependency拥有较小的ShuffleId。
另,祖先Stage拥有较小的StageId。
2.2.3.2. DAGScheduler.getShuffleMapStage
  1. private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): Stage = {
  2. shuffleToMapStage.get(shuffleDep.shuffleId) match {
  3. case Some(stage) => stage
  4. case None =>
  5. // We are going to register ancestor shuffle dependencies
  6. registerShuffleDependencies(shuffleDep, jobId)
  7. // Then register current shuffleDep
  8. val stage =
  9. newOrUsedStage(
  10. shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
  11. shuffleDep.rdd.creationSite)
  12. shuffleToMapStage(shuffleDep.shuffleId) = stage
  13. stage
  14. }
  15. }
为了划分好Stage的复用,减少Stage划分支出,会将每个shuffle map stage保存起来。
(1)检查shuffle dependency对应的stage是否已经存在;
(2)若存在,直接返回对应的stage;
(3)若不存在,则先注册该 shuffle dependency所有祖先 shuffle dependency对应的stage,然后再创建当前 shuffle dependency对应的stage。
2.2.3.3. DAGScheduler.registerShuffleDependencies
  1. private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) = {
  2. val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
  3. while (!parentsWithNoMapStage.isEmpty) {
  4. val currentShufDep = parentsWithNoMapStage.pop()
  5. val stage =
  6. newOrUsedStage(
  7. currentShufDep.rdd, currentShufDep.rdd.partitions.size, currentShufDep, jobId,
  8. currentShufDep.rdd.creationSite)
  9. shuffleToMapStage(currentShufDep.shuffleId) = stage
  10. }
  11. }
(1)获取ShuffleDependency依赖RDD的所有祖先RDD中包含的 ShuffleDependency;
(2)getAncestorShuffleDependencies返回的是一个栈,由于RDD的遍历是从大编号往小编号方向,所以小编号的ShuffleDependency放在栈底,大编号的ShuffleDependency放在栈顶(如果在划分Stage之前就调用过RDD.dependencies,ShuffleDependency的编号就可能是乱序的)。
(3)从栈顶取出的ShuffleDependency的ShuffleId大。然后调用newOrUsedStage方法创建对应的Stage。从流程图中看出,在newOrUsedStage方法中将调用newStage方法。如果RDD位于RDD图的叶子端,对应的parent stage不存在,所以可以直接创建对应的Stage。
(4)保存新创建的Stage。
因此,祖先Stage拥有更小的StageId。
当祖先Stage都创建完成后,并且每个shuffle map stage都保存在以ShuffleId为key的HasMap中,我们可以从该结构获取父Stage。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/655840
推荐阅读
相关标签
  

闽ICP备14008679号