当前位置:   article > 正文

Spark源码阅读2 —— Stage划分_broadcasting large task binary with size

broadcasting large task binary with size

 一、为什么需要划分Stage

        因为程序是分布式集群中运行的,划分Stage的原因是由于任务类型的不一致:宽依赖和窄依赖。有的任务只依赖于一个父的RDD:一个Stage依赖于另一个Stage的结果,所以可以和父RDD放到同一task中执行,构成一个Stage。但是有的任务(例如reduce,group)等,需要的是全局数据,依赖于多个分区的处理结果,所以只等等待前面的执行,这就有了先后顺序(Stage),所以要划分。

        划分出来的stage是放在集群中运行的,其中每个stage有多个task。每个task的逻辑一样,只是对应的分区是不相同。这多个task被分布在不同的机器上并发执行。不同的资源调度框架:Yarn、Mesos、Local等生成的DAG是完全一样,可以跨平台运行保证结果。

        每个stage由多个task组成,这些task在任务逻辑上是一致的只是对应不同的分区。Partition的数量和同一任务逻辑的Task的数量是一致的,也就是说一个分区会被对应的stage的一个task处理,由多个Executor争抢。如果计算资源充足,一个Executor执行一个,不足会计算多个task。

宽依赖是DAG的分界线,也是stage的分界线

查看源图像

handleJobSubmitted开始stage的划分:从G开始,G依赖于B和F,先处理B还是F是随机的。由于B是G的窄依赖,所以G和B在同一个stage中。而对于F是G的宽依赖,中间存在Shuffle,只能在F执行完才能执行G,所以F比G早一个stage。AB之间也是宽依赖,所以B跟A不能是同一个阶段,A单独一个阶段。所以BG所在的stage3有两个父依赖阶段。stage1和2是独立的,可以并发,但是他们对于stage3来说是要等待其执行结果的

DAGScheduler随着SparkContext的创建而创建。

二、宽窄依赖

  • 窄依赖是指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区(O(1),与数据规模无关)(一对多)
  • 相应的,宽依赖是指父RDD的每个分区都可能被多个子RDD分区所使用,子RDD分区通常对应所有的父RDD分区(O(n),与数据规模有关)(多对多)

 三、实现源码

程序入口:

1.SparkContext的submitMapStage

  1. /**
  2. * Submit a map stage for execution. This is currently an internal API only, but might be
  3. * promoted to DeveloperApi in the future.
  4. */
  5. private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
  6. : SimpleFutureAction[MapOutputStatistics] = {
  7. assertNotStopped()
  8. val callSite = getCallSite()
  9. var result: MapOutputStatistics = null
  10. val waiter = dagScheduler.submitMapStage(
  11. dependency,
  12. (r: MapOutputStatistics) => { result = r },
  13. callSite,
  14. localProperties.get)
  15. new SimpleFutureAction[MapOutputStatistics](waiter, result)
  16. }

2.DAGScheduler的submitMapStage

  1. /**
  2. * Submit a shuffle map stage to run independently and get a JobWaiter object back. The waiter
  3. * can be used to block until the job finishes executing or can be used to cancel the job.
  4. * This method is used for adaptive query planning, to run map stages and look at statistics
  5. * about their outputs before submitting downstream stages.
  6. *
  7. * @param dependency the ShuffleDependency to run a map stage for
  8. * @param callback function called with the result of the job, which in this case will be a
  9. * single MapOutputStatistics object showing how much data was produced for each partition
  10. * @param callSite where in the user program this job was submitted
  11. * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
  12. */
  13. def submitMapStage[K, V, C](
  14. dependency: ShuffleDependency[K, V, C],
  15. callback: MapOutputStatistics => Unit,
  16. callSite: CallSite,
  17. properties: Properties): JobWaiter[MapOutputStatistics] = {
  18. val rdd = dependency.rdd
  19. val jobId = nextJobId.getAndIncrement()
  20. if (rdd.partitions.length == 0) {
  21. throw new SparkException("Can't run submitMapStage on RDD with 0 partitions")
  22. }
  23. // We create a JobWaiter with only one "task", which will be marked as complete when the whole
  24. // map stage has completed, and will be passed the MapOutputStatistics for that stage.
  25. // This makes it easier to avoid race conditions between the user code and the map output
  26. // tracker that might result if we told the user the stage had finished, but then they queries
  27. // the map output tracker and some node failures had caused the output statistics to be lost.
  28. val waiter = new JobWaiter[MapOutputStatistics](
  29. this, jobId, 1,
  30. (_: Int, r: MapOutputStatistics) => callback(r))
  31. eventProcessLoop.post(MapStageSubmitted(
  32. jobId, dependency, callSite, waiter, Utils.cloneProperties(properties)))
  33. waiter
  34. }

3.eventProcessLoop的post(事件循环器典型代码,项目中可参考),与上一篇类似,最终调用到DAGScheduler的doOnReceive方法

  1. case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
  2. dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

 4.DAGScheduler的handleMapStageSubmitted方法

  1. private[scheduler] def handleMapStageSubmitted(jobId: Int,
  2. dependency: ShuffleDependency[_, _, _],
  3. callSite: CallSite,
  4. listener: JobListener,
  5. properties: Properties): Unit = {
  6. // Submitting this map stage might still require the creation of some parent stages, so make
  7. // sure that happens.
  8. var finalStage: ShuffleMapStage = null
  9. try {
  10. // New stage creation may throw an exception if, for example, jobs are run on a
  11. // HadoopRDD whose underlying HDFS files have been deleted.
  12. finalStage = getOrCreateShuffleMapStage(dependency, jobId)
  13. } catch {
  14. case e: Exception =>
  15. logWarning("Creating new stage failed due to exception - job: " + jobId, e)
  16. listener.jobFailed(e)
  17. return
  18. }
  19. val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  20. clearCacheLocs()
  21. log.info("Got map stage job %s (%s) with %d output partitions".format(
  22. jobId, callSite.shortForm, dependency.rdd.partitions.length))
  23. log.info("Final stage: " + finalStage + " (" + finalStage.name + ")")
  24. log.info("Parents of final stage: " + finalStage.parents)
  25. log.info("Missing parents: " + getMissingParentStages(finalStage))
  26. val jobSubmissionTime = clock.getTimeMillis()
  27. jobIdToActiveJob(jobId) = job
  28. activeJobs += job
  29. finalStage.addActiveJob(job)
  30. val stageIds = jobIdToStageIds(jobId).toArray
  31. val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  32. listenerBus.post(
  33. SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
  34. Utils.cloneProperties(properties)))
  35. submitStage(finalStage)
  36. // If the whole stage has already finished, tell the listener and remove it
  37. if (finalStage.isAvailable) {
  38. markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))
  39. }
  40. }

5.DAGScheduler的getOrCreateShuffleMapStage,里面获取所有的missing stage并createShuffleMapStage

  1. // New stage creation may throw an exception if, for example, jobs are run on a
  2. // HadoopRDD whose underlying HDFS files have been deleted.
  3. finalStage = getOrCreateShuffleMapStage(dependency, jobId)
  4. /**
  5. * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
  6. * shuffle map stage doesn't already exist, this method will create the shuffle map stage in
  7. * addition to any missing ancestor shuffle map stages.
  8. * 如果 shuffleIdToMapStage 中存在一个随机映射阶段,则获取一个随机映射阶段。
  9. * 否则,如果 shuffle map 阶段尚不存在,除了任何缺失的祖先 shuffle map 阶段之外,
  10. * 此方法还将创建 shuffle map 阶段。
  11. */
  12. private def getOrCreateShuffleMapStage(
  13. shuffleDep: ShuffleDependency[_, _, _],
  14. firstJobId: Int): ShuffleMapStage = {
  15. shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
  16. case Some(stage) =>
  17. stage
  18. case None =>
  19. // Create stages for all missing ancestor shuffle dependencies.
  20. getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
  21. // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
  22. // that were not already in shuffleIdToMapStage, it's possible that by the time we
  23. // get to a particular dependency in the foreach loop, it's been added to
  24. // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
  25. // SPARK-13902 for more information.
  26. // 尽管 getMissingAncestorShuffleDependencies
  27. // 只返回 shuffleIdToMapStage 中尚未包含的 shuffle 依赖项,
  28. // 但当我们到达 foreach 循环中的特定依赖项时,
  29. // 它可能已被早期依赖项的阶段创建过程添加到 shuffleIdToMapStage 中。
  30. // 有关更多信息,请参阅 SPARK-13902。
  31. if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
  32. createShuffleMapStage(dep, firstJobId)
  33. }
  34. }
  35. // Finally, create a stage for the given shuffle dependency.
  36. createShuffleMapStage(shuffleDep, firstJobId)
  37. }
  38. }

6.之后就是DAGScheduler的submitStage

提交stage:将所有没有提交的ParentStage提交运行,然后提交当前stage

  1. /** Submits stage, but first recursively submits any missing parents. */
  2. private def submitStage(stage: Stage): Unit = {
  3. val jobId = activeJobForStage(stage)
  4. if (jobId.isDefined) {
  5. log.debug(s"submitStage($stage (name=${stage.name};" +
  6. s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
  7. if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
  8. val missing = getMissingParentStages(stage).sortBy(_.id)
  9. log.debug("missing: " + missing)
  10. if (missing.isEmpty) {
  11. // 找不到父Stag说明是最开始stage,也就是触发action的RDD
  12. log.info("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
  13. submitMissingTasks(stage, jobId.get)
  14. } else {
  15. // 先提交父stage,对顺序进行了控制
  16. for (parent <- missing) {
  17. submitStage(parent)
  18. }
  19. waitingStages += stage
  20. }
  21. }
  22. } else {
  23. abortStage(stage, "No active job for stage " + stage.id, None)
  24. }
  25. }

7.DAGScheduler的submitMissingTasks。分为ShuffleMapTask和ResultTask,两种类型的task的处理逻辑不一样。

  1. /** Called when stage's parents are available and we can now do its task. */
  2. private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
  3. log.debug("submitMissingTasks(" + stage + ")")
  4. // Before find missing partition, do the intermediate state clean work first.
  5. // The operation here can make sure for the partially completed intermediate stage,
  6. // `findMissingPartitions()` returns all partitions every time.
  7. // 在找到丢失的分区之前,先做中间状态清理工作
  8. // 这里的操作可以确保部分完成的中间阶段
  9. // `findMissingPartitions()` 每次都返回所有分区
  10. stage match {
  11. case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
  12. mapOutputTracker.unregisterAllMapOutput(sms.shuffleDep.shuffleId)
  13. case _ =>
  14. }
  15. // Figure out the indexes of partition ids to compute.
  16. // 找出要计算的分区 ID 的索引。
  17. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
  18. // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
  19. // with this Stage
  20. val properties = jobIdToActiveJob(jobId).properties
  21. addPySparkConfigsToProperties(stage, properties)
  22. runningStages += stage
  23. // SparkListenerStageSubmitted should be posted before testing whether tasks are
  24. // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
  25. // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
  26. // event.
  27. // SparkListenerStageSubmitted 应该在测试任务是否可序列化之前发布。
  28. // 如果任务不可序列化,则将发布 SparkListenerStageCompleted 事件,
  29. // 该事件应始终在相应的 SparkListenerStageSubmitted 事件之后。
  30. stage match {
  31. case s: ShuffleMapStage =>
  32. outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
  33. // Only generate merger location for a given shuffle dependency once. This way, even if
  34. // this stage gets retried, it would still be merging blocks using the same set of
  35. // shuffle services.
  36. if (pushBasedShuffleEnabled) {
  37. prepareShuffleServicesForShuffleMapStage(s)
  38. }
  39. case s: ResultStage =>
  40. outputCommitCoordinator.stageStart(
  41. stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
  42. }
  43. val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
  44. stage match {
  45. case s: ShuffleMapStage =>
  46. partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
  47. case s: ResultStage =>
  48. partitionsToCompute.map { id =>
  49. val p = s.partitions(id)
  50. (id, getPreferredLocs(stage.rdd, p))
  51. }.toMap
  52. }
  53. } catch {
  54. case NonFatal(e) =>
  55. stage.makeNewStageAttempt(partitionsToCompute.size)
  56. listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo,
  57. Utils.cloneProperties(properties)))
  58. abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
  59. runningStages -= stage
  60. return
  61. }
  62. stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
  63. // If there are tasks to execute, record the submission time of the stage. Otherwise,
  64. // post the even without the submission time, which indicates that this stage was
  65. // skipped.
  66. // 如果有任务要执行,记录stage的提交时间。 否则,在没有提交时间的情况下发布偶数,这表明此阶段已被跳过。
  67. if (partitionsToCompute.nonEmpty) {
  68. stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
  69. }
  70. listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo,
  71. Utils.cloneProperties(properties)))
  72. // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
  73. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
  74. // the serialized copy of the RDD and for each task we will deserialize it, which means each
  75. // task gets a different copy of the RDD. This provides stronger isolation between tasks that
  76. // might modify state of objects referenced in their closures. This is necessary in Hadoop
  77. // where the JobConf/Configuration object is not thread-safe.
  78. // 任务的广播二进制文件,用于将任务分派给执行程序。 请注意,我们广播了 RDD 的序列化副本,
  79. // 并且对于每个任务,我们将对其进行反序列化,这意味着每个任务都会获得 RDD 的不同副本。
  80. // 这在可能修改其闭包中引用的对象状态的任务之间提供了更强的隔离。
  81. // 这在 Hadoop 中是必需的,因为 JobConf/Configuration 对象不是线程安全的。
  82. var taskBinary: Broadcast[Array[Byte]] = null
  83. var partitions: Array[Partition] = null
  84. try {
  85. // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
  86. // For ResultTask, serialize and broadcast (rdd, func).
  87. var taskBinaryBytes: Array[Byte] = null
  88. // taskBinaryBytes and partitions are both effected by the checkpoint status. We need
  89. // this synchronization in case another concurrent job is checkpointing this RDD, so we get a
  90. // consistent view of both variables.
  91. // taskBinaryBytes 和分区都受检查点状态的影响。 我们需要这种同步,
  92. // 以防另一个并发作业正在检查这个 RDD,所以我们得到了两个变量的一致视图。
  93. RDDCheckpointData.synchronized {
  94. taskBinaryBytes = stage match {
  95. case stage: ShuffleMapStage =>
  96. JavaUtils.bufferToArray(
  97. closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
  98. case stage: ResultStage =>
  99. JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
  100. }
  101. partitions = stage.rdd.partitions
  102. }
  103. if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
  104. logWarning(s"Broadcasting large task binary with size " +
  105. s"${Utils.bytesToString(taskBinaryBytes.length)}")
  106. }
  107. taskBinary = sc.broadcast(taskBinaryBytes)
  108. } catch {
  109. // In the case of a failure during serialization, abort the stage.
  110. case e: NotSerializableException =>
  111. abortStage(stage, "Task not serializable: " + e.toString, Some(e))
  112. runningStages -= stage
  113. // Abort execution
  114. return
  115. case e: Throwable =>
  116. abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
  117. runningStages -= stage
  118. // Abort execution
  119. return
  120. }
  121. val tasks: Seq[Task[_]] = try {
  122. val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
  123. stage match {
  124. case stage: ShuffleMapStage =>
  125. stage.pendingPartitions.clear()
  126. partitionsToCompute.map { id =>
  127. val locs = taskIdToLocations(id)
  128. val part = partitions(id)
  129. stage.pendingPartitions += id
  130. new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
  131. taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
  132. Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
  133. }
  134. case stage: ResultStage =>
  135. partitionsToCompute.map { id =>
  136. val p: Int = stage.partitions(id)
  137. val part = partitions(p)
  138. val locs = taskIdToLocations(id)
  139. new ResultTask(stage.id, stage.latestInfo.attemptNumber,
  140. taskBinary, part, locs, id, properties, serializedTaskMetrics,
  141. Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
  142. stage.rdd.isBarrier())
  143. }
  144. }
  145. } catch {
  146. case NonFatal(e) =>
  147. abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
  148. runningStages -= stage
  149. return
  150. }
  151. if (tasks.nonEmpty) {
  152. log.info(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
  153. s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
  154. taskScheduler.submitTasks(new TaskSet(
  155. tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
  156. stage.resourceProfileId))
  157. } else {
  158. // Because we posted SparkListenerStageSubmitted earlier, we should mark
  159. // the stage as completed here in case there are no tasks to run
  160. // 因为我们之前发布了 SparkListenerStageSubmitted,所以我们应该在此处将阶段标记为已完成,以防没有任务要运行
  161. markStageAsFinished(stage, None)
  162. stage match {
  163. case stage: ShuffleMapStage =>
  164. logDebug(s"Stage ${stage} is actually done; " +
  165. s"(available: ${stage.isAvailable}," +
  166. s"available outputs: ${stage.numAvailableOutputs}," +
  167. s"partitions: ${stage.numPartitions})")
  168. markMapStageJobsAsFinished(stage)
  169. case stage : ResultStage =>
  170. logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
  171. }
  172. submitWaitingChildStages(stage)
  173. }
  174. }

最后,走到TaskScheduler的submitTasks方法。至此,DAGScheduler完成了自己的使命,提交任务给TaskScheduler运行。

四、调度总览

任务调度最重要的两个部分:DAGSchedulerTaskScheduler,负责将用户提交的任务生成执行关系(有向无环图:DAG),划分为不同的stage,提交到集群中进行最终的计算。

DAGScheduler和TaskScheduler中传输的是TaskSet,TaskScheduler和Worker中间传输的是Task。

1.通过用户的操作,创建DAG,生成DAG后发送给DAGScheduler。DAGScheduler这里对任务进行切分,切分成不同的stage,将每个stage中的TaskSet发送给TaskScheduler。

2.TaskScheduler通过ClusterManager发送Task给Worker结点的Executor,Executor将开启线程进行计算执行,并且保存任务到本地或者回传到Driver端。

TaskScheduler
为SparkContext调度任务,从不同的DAGscheduler(不同的任务),接收不同的Stage向集群提交任务、备份任务,他会调用SchedulerBackend(在新任务提交的时候,任务执行失败,计算结点挂掉了,执行过慢重新分配任务)

SchedulerBackend
对Exechutor分配任务Task,并且在Executor上执行Task,有多个TaskSheduler,也有多个SchedulerBackend,他们是一对一的关系,并且都被SparkContext持有。

Spark Stage切分 源码剖析——DAGScheduler

[spark] DAGScheduler划分stage源码解析

【Spark】Spark作业执行原理--提交调度阶段

spark中如何划分stage(面试)

 阿里云的数据集成(DataWorks):调度DLA Spark 任务 - 云原生数据湖分析 DLA - 阿里云

结合一个Example把调度流程套起来。

Spark内核详解 (5) | Spark的任务调度机制

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/776429
推荐阅读
相关标签
  

闽ICP备14008679号