当前位置:   article > 正文

RDD任务切分之Stage任务划分(图解和源码)_切分stage

切分stage

RDD任务切分中间分为:Application、Job、Stage和Task

(1)Application:初始化一个SparkContext即生成一个Application;

(2)Job:一个Action算子就会生成一个Job;

(3)Stage:Stage等于宽依赖的个数加1;

(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

注意:Application->Job->Stage->Task每一层都是1对n的关系

主要步骤

  1. // 代码样例
  2. def main(args: Array[String]): Unit = {
  3. //1.创建SparkConf并设置App名称
  4. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  5. //2.创建SparkContext,该对象是提交Spark App的入口
  6. val sc: SparkContext = new SparkContext(conf)
  7. val rdd:RDD[String] = sc.textFile("input/1.txt")
  8. val mapRdd = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
  9. mapRdd.saveAsTextFile("outpath")
  10. //3.关闭连接
  11. sc.stop()
  12. }

先看下执行流程图(Yarn-Cluster)

现在一步一步分析

第一步

  • 执行main方法

  • 初始化sc

  • 执行到Action算子

这个阶段会产生血缘依赖关系,具体的代码并没有执行

第二步:DAGScheduler对上面的job切分stage,stage产生task

DAGScheduler: 先划分阶段(stage)再划分任务(task)

这个时候会产生Job的stage个数 = 宽依赖的个数+1 = 2 (这个地方产生一个宽依赖),也就是产生shuffle这个地方

Job的Task个数= 一个stage阶段中,最后一个RDD的分区个数就是Task的个数(2+2 =4)shuffle前的ShuffleStage产生两个

shuffle后reduceStage产生两个

第三步:TaskSchedule通过TaskSet获取job的所有Task,然后序列化分给Exector

job的个数也就是 = Action算子的个数(这里只一个collect)

 源码分析

一步一步从collect()方法找下会找到这段主要代码

  1. ———————————————————————————————1—————————————————————————————————
  2. var finalStage: ResultStage = null
  3. try {
  4. // New stage creation may throw an exception if, for example, jobs are run on a
  5. // HadoopRDD whose underlying HDFS files have been deleted.
  6. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  7. } catch {
  8. case e: Exception =>
  9. logWarning("Creating new stage failed due to exception - job: " + jobId, e)
  10. listener.jobFailed(e)
  11. return
  12. }
  13. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  14. 根据上面图片流程,程序需要找到最后一个Rdd然后创建ResultStage
  15. 下走
  16. —————————————————————————————————2—resultStage创建——————————————————————————————
  17. private def createResultStage(
  18. rdd: RDD[_],
  19. func: (TaskContext, Iterator[_]) => _,
  20. partitions: Array[Int],
  21. jobId: Int,
  22. callSite: CallSite): ResultStage = {
  23. val parents = getOrCreateParentStages(rdd, jobId)
  24. val id = nextStageId.getAndIncrement()
  25. val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  26. stageIdToStage(id) = stage
  27. updateJobIdStageIdMaps(jobId, stage)
  28. stage
  29. }
  30. stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) 这里resultStage已创建
  31. parents = getOrCreateParentStages(rdd, jobId)
  32. 再下走
  33. —————————————————————————————————3———————————————————————————————
  34. private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
  35. getShuffleDependencies(rdd).map { shuffleDep =>
  36. getOrCreateShuffleMapStage(shuffleDep, firstJobId)
  37. }.toList
  38. }
  39. getShuffleDependencies
  40. 再往下走
  41. —————————————————————————————————4———————————————————————————————
  42. private[scheduler] def getShuffleDependencies(
  43. rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
  44. val parents = new HashSet[ShuffleDependency[_, _, _]]
  45. val visited = new HashSet[RDD[_]]
  46. val waitingForVisit = new Stack[RDD[_]]
  47. waitingForVisit.push(rdd)
  48. while (waitingForVisit.nonEmpty) {
  49. val toVisit = waitingForVisit.pop()
  50. if (!visited(toVisit)) {
  51. visited += toVisit
  52. toVisit.dependencies.foreach {
  53. case shuffleDep: ShuffleDependency[_, _, _] =>
  54. parents += shuffleDep
  55. case dependency =>
  56. waitingForVisit.push(dependency.rdd)
  57. }
  58. }
  59. }
  60. parents
  61. }
  62. toVisit.dependencies 它会遍历依赖关系去找ShuffleDependency也即上面图中的一步一步向上找,他会找每个RDD和遍历,只不过他只判断是否是shuffleRDD
  63. 然会会找所有的ShuffleDependency (parents += shuffleDep)然后再回到第三步
  64. getShuffleDependencies(rdd).map {......}
  65. 然后再往下
  66. —————————————————————————————————5———————————————————————————————
  67. private def getOrCreateShuffleMapStage(
  68. shuffleDep: ShuffleDependency[_, _, _],
  69. firstJobId: Int): ShuffleMapStage = {
  70. shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
  71. case Some(stage) =>
  72. stage
  73. case None =>
  74. // Create stages for all missing ancestor shuffle dependencies.
  75. getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
  76. if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
  77. createShuffleMapStage(dep, firstJobId)
  78. }
  79. }
  80. // Finally, create a stage for the given shuffle dependency.
  81. createShuffleMapStage(shuffleDep, firstJobId)
  82. }
  83. }
  84. createShuffleMapStage(shuffleDep, firstJobId) 为shuffle dependency创建ShuffleMapStage
  85. 再往下走
  86. —————————————————————————————————6ShuffleMapStage创建———————————————————————————————
  87. def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
  88. val rdd = shuffleDep.rdd
  89. val numTasks = rdd.partitions.length
  90. val parents = getOrCreateParentStages(rdd, jobId)
  91. val id = nextStageId.getAndIncrement()
  92. val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
  93. 也即最后一句创建了ShuffleMapStage
  94. ———————————————————以上ResultStage和ShuffleMapStage创建好了(图中可体现过程)——————————————

以上ResultStage和ShuffleMapStage创建好了(图中可体现过程)

  1. ——————————————————————————————执行代码————————————————————————————
  2. private[scheduler] def handleJobSubmitted(jobId: Int,
  3. finalRDD: RDD[_],
  4. func: (TaskContext, Iterator[_]) => _,
  5. partitions: Array[Int],
  6. callSite: CallSite,
  7. listener: JobListener,
  8. properties: Properties) {
  9. var finalStage: ResultStage = null
  10. try {
  11. // New stage creation may throw an exception if, for example, jobs are run on a
  12. // HadoopRDD whose underlying HDFS files have been deleted.
  13. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  14. } catch {
  15. case e: Exception =>
  16. logWarning("Creating new stage failed due to exception - job: " + jobId, e)
  17. listener.jobFailed(e)
  18. return
  19. }
  20. val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  21. clearCacheLocs()
  22. logInfo("Got job %s (%s) with %d output partitions".format(
  23. job.jobId, callSite.shortForm, partitions.length))
  24. logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
  25. logInfo("Parents of final stage: " + finalStage.parents)
  26. logInfo("Missing parents: " + getMissingParentStages(finalStage))
  27. val jobSubmissionTime = clock.getTimeMillis()
  28. jobIdToActiveJob(jobId) = job
  29. activeJobs += job
  30. finalStage.setActiveJob(job)
  31. val stageIds = jobIdToStageIds(jobId).toArray
  32. val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  33. listenerBus.post(
  34. SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
  35. submitStage(finalStage)
  36. }
  37. -------
  38. val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  39. finalStage.setActiveJob(job)
  40. 找到finalStage后(也即上面源码分析中的ResultStage),把最后阶段传了进来,需要和JOb联系在一起
  41. submitStage(finalStage) 下走
  42. ——————————————————————————————2————————————————————————————
  43. private def submitStage(stage: Stage) {
  44. val jobId = activeJobForStage(stage)
  45. if (jobId.isDefined) {
  46. logDebug("submitStage(" + stage + ")")
  47. if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
  48. val missing = getMissingParentStages(stage).sortBy(_.id)
  49. 把最后阶段的finalStage(ResultStage)交给了getMissingParentStages 主要目的是找前面的stage
  50. 下走
  51. ——————————————————————————————3————————————————————————————
  52. private def getMissingParentStages(stage: Stage): List[Stage] = {
  53. val missing = new HashSet[Stage]
  54. val visited = new HashSet[RDD[_]]
  55. // We are manually maintaining a stack here to prevent StackOverflowError
  56. // caused by recursively visiting
  57. val waitingForVisit = new Stack[RDD[_]]
  58. def visit(rdd: RDD[_]) {
  59. if (!visited(rdd)) {
  60. visited += rdd
  61. val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
  62. if (rddHasUncachedPartitions) {
  63. for (dep <- rdd.dependencies) {
  64. dep match {
  65. case shufDep: ShuffleDependency[_, _, _] =>
  66. val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
  67. if (!mapStage.isAvailable) {
  68. missing += mapStage
  69. }
  70. case narrowDep: NarrowDependency[_] =>
  71. waitingForVisit.push(narrowDep.rdd)
  72. }
  73. }
  74. }
  75. }
  76. }
  77. waitingForVisit.push(stage.rdd)
  78. while (waitingForVisit.nonEmpty) {
  79. visit(waitingForVisit.pop())
  80. }
  81. missing.toList
  82. }
  83. 主要看def visit(rdd: RDD[_])
  84. for (dep <- rdd.dependencies) 还是找ShuffleDependency 一直到找不到为止,会把ShuffleDependency添加到missing中(看有几个shuffle)
  85. ——————————————————————————————4(回到2中)————————————————————————————
  86. private def submitStage(stage: Stage) {
  87. val jobId = activeJobForStage(stage)
  88. if (jobId.isDefined) {
  89. logDebug("submitStage(" + stage + ")")
  90. if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
  91. val missing = getMissingParentStages(stage).sortBy(_.id)
  92. logDebug("missing: " + missing)
  93. if (missing.isEmpty) {
  94. logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
  95. submitMissingTasks(stage, jobId.get)
  96. } else {
  97. for (parent <- missing) {
  98. submitStage(parent)
  99. }
  100. waitingStages += stage
  101. }
  102. }
  103. } else {
  104. abortStage(stage, "No active job for stage " + stage.id, None)
  105. }
  106. }
  107. 主要代码 for (parent <- missing) {
  108. submitStage(parent)
  109. }
  110. 会把第三步的missing进行递归 直到 missing为空(stage前面没有了)为止开始submitMissingTasks(stage, jobId.get)
  111. 开始执行ShuffleMapStage,执行的时候会找到有多少Task
  112. 下走
  113. ——————————————————————————————5————————————————————————————
  114. private def submitMissingTasks(stage: Stage, jobId: Int) {
  115. val tasks: Seq[Task[_]] = try {
  116. stage match {
  117. case stage: ShuffleMapStage =>
  118. partitionsToCompute.map { id =>
  119. val locs = taskIdToLocations(id)
  120. val part = stage.rdd.partitions(id)
  121. new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
  122. taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
  123. Option(sc.applicationId), sc.applicationAttemptId)
  124. }
  125. case stage: ResultStage =>
  126. partitionsToCompute.map { id =>
  127. val p: Int = stage.partitions(id)
  128. val part = stage.rdd.partitions(p)
  129. val locs = taskIdToLocations(id)
  130. new ResultTask(stage.id, stage.latestInfo.attemptId,
  131. taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
  132. Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
  133. }
  134. }
  135. }
  136. partitionsToCompute 下走
  137. ——————————————————————————————6————————————————————————————
  138. override def findMissingPartitions(): Seq[Int] = {
  139. val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)
  140. assert(missing.size == numPartitions - _numAvailableOutputs,
  141. s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
  142. missing
  143. }
  144. 如果ShuffleMapStage阶段最后的Rdd有两个分区
  145. missing返回的就是 01
  146. 下走回到这里
  147. partitionsToCompute.map { id =>
  148. val locs = taskIdToLocations(id)
  149. val part = stage.rdd.partitions(id)
  150. new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
  151. taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
  152. Option(sc.applicationId), sc.applicationAttemptId)
  153. }
  154. 有两个分区,也就会new 两个 ShuffleMapTask,也就两个Task任务
  155. 匹配result的原理一样,不再阐述
  156. —————————————————————7(和第五步同列代码)—————————————————————————————
  157. if (tasks.size > 0) {
  158. logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
  159. stage.pendingPartitions ++= tasks.map(_.partitionId)
  160. logDebug("New pending partitions: " + stage.pendingPartitions)
  161. taskScheduler.submitTasks(new TaskSet(
  162. tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  163. stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
  164. }
  165. taskScheduler.submitTasks 提交任务
  166. 下走
  167. _______________________________8_______________________________
  168. override def submitTasks(taskSet: TaskSet) {
  169. val tasks = taskSet.tasks
  170. logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  171. this.synchronized {
  172. val manager = createTaskSetManager(taskSet, maxTaskFailures)
  173. val stage = taskSet.stageId
  174. val stageTaskSets =
  175. taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
  176. stageTaskSets(taskSet.stageAttemptId) = manager
  177. val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
  178. ts.taskSet != taskSet && !ts.isZombie
  179. }
  180. ......
  181. 至此完了

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/94548
推荐阅读
相关标签
  

闽ICP备14008679号