赞
踩
作者:博弈史密斯
链接:https://www.jianshu.com/p/9f74e7f5e913
来源:简书
介绍Stage的定义,DAGScheduler划分Stage流程。
查看Stage定义
Stage中有两个重要属性,rdd和parents,分别记录的是切分处的RDD和父Stage信息,这一点结合我后面的例子更好理解。Stage有两个子类,ShuffleMapStage、ResultStage,两者分别增加了一个重要属性信息,如下
stage | 差异属性 | 作用 |
---|---|---|
ShuffleMapStage | shuffleDep: ShuffleDependency | 保存Dependency信息 |
ResultStage | func: (TaskContext, Iterator[_]) => _ | 保存action对应的处理函数 |
上一篇最后讲到调用DAGScheduler的handleJobSubmitted方法处理JobSubmitted事件,查看该方法
- private[scheduler] def handleJobSubmitted(jobId: Int,
- finalRDD: RDD[_],
- func: (TaskContext, Iterator[_]) => _,
- partitions: Array[Int],
- callSite: CallSite,
- listener: JobListener,
- properties: Properties) {
- var finalStage: ResultStage = null
- //划分Stage,返回ResultStage,Stage使用 parents 属性保存父 Stage
- finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
-
- //创建ActiveJob,并添加到对应集合管理
- val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
- jobIdToActiveJob(jobId) = job
- activeJobs += job
- finalStage.setActiveJob(job)
-
- val stageIds = jobIdToStageIds(jobId).toArray
- val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
- //提交 Stage
- submitStage(finalStage)
- }
如上处,handleJobSubmitted方法主要职责如下:
DAGScheduler的 createResultStage方法负责划分DAG生成Stage,createResultStage方法:1 调用 getOrCreateParentStages方法创建父Stage,2 创建 ResultStage。
getOrCreateParentStages :
- private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
- getShuffleDependencies(rdd).map { shuffleDep =>
- getOrCreateShuffleMapStage(shuffleDep, firstJobId)
- }.toList
- }
首先 getShuffleDependencies
获取 所有的 ShuffleDependency
:
- private[scheduler] def getShuffleDependencies(
- rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
-
- //记录 所有的 ShuffleDependency
- val parents = new HashSet[ShuffleDependency[_, _, _]]
-
- //记录所有已经处理的 RDD
- val visited = new HashSet[RDD[_]]
-
- //记录所有待处理的 RDD
- val waitingForVisit = new Stack[RDD[_]]
-
- //把当前的 ResultRdd,也就是最后一个RDD,放到 waitingForVisit
- waitingForVisit.push(rdd)
-
- while (waitingForVisit.nonEmpty) {
- //从 waitingForVisit 取出一个 RDD 去处理
- val toVisit = waitingForVisit.pop()
-
- //已经处理的RDD列表中 不包含 要处理的这个RDD
- //保证下面的流程都是针对要处理的RDD
- if (!visited(toVisit)) {
- visited += toVisit
-
- //RDD的 dependencies 方法,保存了所有RDD的 dependency
- toVisit.dependencies.foreach {
- //如果是宽依赖,则添加到 parents
- case shuffleDep: ShuffleDependency[_, _, _] =>
- parents += shuffleDep
-
- //如果是窄依赖,则把这个依赖的 RDD,添加到 waitingForVisit
- //一直往上找,直到找到下一个宽依赖
- case dependency =>
- waitingForVisit.push(dependency.rdd)
- }
- }
- }
- parents
- }
如上面代码注释,getShuffleDependencies里主要逻辑为:通过action操作后的RDD,往上遍历所有RDD,寻找所有的 ShuffleDependency 列表,并返回
然后根据 每个 Shuffle 划分 Stage,看下 getOrCreateShuffleMapStage 代码:
- private def getOrCreateShuffleMapStage(
- shuffleDep: ShuffleDependency[_, _, _],
- firstJobId: Int): ShuffleMapStage = {
- createShuffleMapStage(shuffleDep, firstJobId)
- }
-
- def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
- val rdd = shuffleDep.rdd
- val numTasks = rdd.partitions.length
-
- //再次调用 getOrCreateParentStages 创建 parents
- val parents = getOrCreateParentStages(rdd, jobId)
- val id = nextStageId.getAndIncrement()
- //根据 parents 创建 ShuffleMapStage
- val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
-
- //添加到 Map
- stageIdToStage(id) = stage
- shuffleIdToMapStage(shuffleDep.shuffleId) = stage
-
- //返回创建的 stage
- stage
- }
- val sc = new SparkContext("local","wordcount")
- val data = sc.parallelize(List("a c", "a b", "b c", "b d", "c d"), 2)
- val wordcount = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).reduceByKey(_ + _)
-
- val data2 = sc.parallelize(List("a c", "a b", "b c", "b d", "c d"), 2)
- val wordcount2 = data2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).reduceByKey(_ + _)
-
- wordcount.join(wordcount2).collect()
RDD的依赖关系:
在上图ShuffleDependency处切分DAG生成Stage,结果如下 :
到这里,Stage就划分完成了,最后贴张spark webUI的图片
会在 Shuffle 处划分 Stage。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。