当前位置:   article > 正文

大数据(078)Spark【Spark 源码分析----划分Stage】_spark源码解析

spark源码解析

作者:博弈史密斯
链接:https://www.jianshu.com/p/9f74e7f5e913
来源:简书

 

概要

介绍Stage的定义,DAGScheduler划分Stage流程。

Stage

查看Stage定义

 

Stage中有两个重要属性,rddparents,分别记录的是切分处的RDD和父Stage信息,这一点结合我后面的例子更好理解。Stage有两个子类,ShuffleMapStage、ResultStage,两者分别增加了一个重要属性信息,如下

stage差异属性作用
ShuffleMapStageshuffleDep: ShuffleDependency保存Dependency信息
ResultStagefunc: (TaskContext, Iterator[_]) => _保存action对应的处理函数

处理JobSubmitted事件

上一篇最后讲到调用DAGScheduler的handleJobSubmitted方法处理JobSubmitted事件,查看该方法

 

  1. private[scheduler] def handleJobSubmitted(jobId: Int,
  2. finalRDD: RDD[_],
  3. func: (TaskContext, Iterator[_]) => _,
  4. partitions: Array[Int],
  5. callSite: CallSite,
  6. listener: JobListener,
  7. properties: Properties) {
  8. var finalStage: ResultStage = null
  9. //划分Stage,返回ResultStage,Stage使用 parents 属性保存父 Stage
  10. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  11. //创建ActiveJob,并添加到对应集合管理
  12. val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  13. jobIdToActiveJob(jobId) = job
  14. activeJobs += job
  15. finalStage.setActiveJob(job)
  16. val stageIds = jobIdToStageIds(jobId).toArray
  17. val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  18. //提交 Stage
  19. submitStage(finalStage)
  20. }

如上处,handleJobSubmitted方法主要职责如下:

  1. 调用 createResultStage 方法,划分DAG生成Stage。
  2. 创建ActiveJob,并添加到对应集合管理。
  3. 调用submitStage 提交Stage。

划分Stage

DAGScheduler的 createResultStage方法负责划分DAG生成Stage,createResultStage方法:1 调用 getOrCreateParentStages方法创建父Stage,2 创建 ResultStage。
getOrCreateParentStages :

 

  1. private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
  2. getShuffleDependencies(rdd).map { shuffleDep =>
  3. getOrCreateShuffleMapStage(shuffleDep, firstJobId)
  4. }.toList
  5. }

首先 getShuffleDependencies 获取 所有的 ShuffleDependency

 

  1. private[scheduler] def getShuffleDependencies(
  2. rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
  3. //记录 所有的 ShuffleDependency
  4. val parents = new HashSet[ShuffleDependency[_, _, _]]
  5. //记录所有已经处理的 RDD
  6. val visited = new HashSet[RDD[_]]
  7. //记录所有待处理的 RDD
  8. val waitingForVisit = new Stack[RDD[_]]
  9. //把当前的 ResultRdd,也就是最后一个RDD,放到 waitingForVisit
  10. waitingForVisit.push(rdd)
  11. while (waitingForVisit.nonEmpty) {
  12. //从 waitingForVisit 取出一个 RDD 去处理
  13. val toVisit = waitingForVisit.pop()
  14. //已经处理的RDD列表中 不包含 要处理的这个RDD
  15. //保证下面的流程都是针对要处理的RDD
  16. if (!visited(toVisit)) {
  17. visited += toVisit
  18. //RDD的 dependencies 方法,保存了所有RDD的 dependency
  19. toVisit.dependencies.foreach {
  20. //如果是宽依赖,则添加到 parents
  21. case shuffleDep: ShuffleDependency[_, _, _] =>
  22. parents += shuffleDep
  23. //如果是窄依赖,则把这个依赖的 RDD,添加到 waitingForVisit
  24. //一直往上找,直到找到下一个宽依赖
  25. case dependency =>
  26. waitingForVisit.push(dependency.rdd)
  27. }
  28. }
  29. }
  30. parents
  31. }

如上面代码注释,getShuffleDependencies里主要逻辑为:通过action操作后的RDD,往上遍历所有RDD,寻找所有的 ShuffleDependency 列表,并返回

然后根据 每个 Shuffle 划分 Stage,看下 getOrCreateShuffleMapStage 代码:

 

  1. private def getOrCreateShuffleMapStage(
  2. shuffleDep: ShuffleDependency[_, _, _],
  3. firstJobId: Int): ShuffleMapStage = {
  4. createShuffleMapStage(shuffleDep, firstJobId)
  5. }
  6. def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
  7. val rdd = shuffleDep.rdd
  8. val numTasks = rdd.partitions.length
  9. //再次调用 getOrCreateParentStages 创建 parents
  10. val parents = getOrCreateParentStages(rdd, jobId)
  11. val id = nextStageId.getAndIncrement()
  12. //根据 parents 创建 ShuffleMapStage
  13. val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
  14. //添加到 Map
  15. stageIdToStage(id) = stage
  16. shuffleIdToMapStage(shuffleDep.shuffleId) = stage
  17. //返回创建的 stage
  18. stage
  19. }

例子

 

  1. val sc = new SparkContext("local","wordcount")
  2. val data = sc.parallelize(List("a c", "a b", "b c", "b d", "c d"), 2)
  3. val wordcount = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).reduceByKey(_ + _)
  4. val data2 = sc.parallelize(List("a c", "a b", "b c", "b d", "c d"), 2)
  5. val wordcount2 = data2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).reduceByKey(_ + _)
  6. wordcount.join(wordcount2).collect()

RDD的依赖关系:

 

  1. 最左一列的parallelize、map等表示实例代码中的transformation。
  2. 圆角矩形表示transformation操作生成的RDD和该RDD的Dependency,其中ShuffleDependency使用蓝色标注。

在上图ShuffleDependency处切分DAG生成Stage,结果如下 :

 

  1. 圆角矩形代表Stage,结果为四个ShuffleMapStage ,一个ResultStage。
  2. 圆角矩形内为Stage的两个属性。ShuffleMapStage和ResultStage有差别。

到这里,Stage就划分完成了,最后贴张spark webUI的图片

 

总结

会在 Shuffle 处划分 Stage。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/80234
推荐阅读
相关标签
  

闽ICP备14008679号