赞
踩
本篇主要阐述 DAGSchedule 划分 Stage 的过程,其主要目的是为了了解 Stage 划分的原理;同时对源码分析更能清楚过程,当某个任务出现运行时间较长时;如果可以清楚其 Stage 划分的过程,就可以大概清楚是什么算子导致运行时间较长,从而可以进行优化提高 Spark 任务运行的效率。
首先 Stage 基本划分原则是由后往前推,遇到宽依赖就会形成一个 Stage;
由 RDD G 往上找对应的父 RDD 为 RDD B 与 RDD F,其中 RDD G 与父 RDD F 为宽依赖(会产生 Shuffle)关系,与父 RDD B为窄依赖关系;所以 RDD G 会与 RDD B 在同一个 Stage中,而与 RDD F 就不会再同一个 Stage 中;
再由 RDD B 往上推,父 RDD A 与 RDD B 为宽依赖关系,因此 RDD A 就为单独一个 Stage;
RDD F 往上推,与 RDD D 、RDD E 都是窄依赖关系,因此就会划分在一个 Stage中;又因为 RDD D 与其父 RDD C 为窄依赖关系,至此 RDD F、RDD D、RDD E、RDD C就会划分在同一个 Stage中;
补充说明
org.apache.spark.scheduler.DAGScheduler // 第一步:创建 finalStage(RDD 链条中最后一个 RDD,也是 sc.runJOb 执行的 RDD;换句话来说就是该 RDD 也一定会调用了一个 Action 算子) var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. // 返回的 ResultStage 对象中,基本上会包含他的所有的父 Stage(也会存在为找到的父 Stage) // 后面的 getMissingParentStages() 方法会找寻缺失的父 Stage finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return }
org.apache.spark.scheduler.DAGScheduler private def createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { // 获取当前 Stage 的父 Stage,如果没有 Shuffle 则返回空 val parents = getOrCreateParentStages(rdd, jobId) // finalStage=resultStage 的 stageID val id = nextStageId.getAndIncrement() // 创建 ResultStage val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) // 将 ResultStage 与 StageId 相关联, 保存在 map 中 stageIdToStage(id) = stage // 更新该 Job 中包含的 Stage updateJobIdStageIdMaps(jobId, stage) // 返回ResultStage stage }
org.apache.spark.scheduler.DAGScheduler
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
/**
* @Author: Small_Ran
* @Date: 2022/6/16
* @Description:
* 1、这里的 RDD 指的是 FinalRDD,getShuffleDependencies 会获取当前 Job 或者说是 FinalRDD 往上的所有 shuffleDependencies
* 2、Map 函数会为每一个 shuffleDependencies 构建 ShuffleMapStage
*/
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
org.apache.spark.scheduler.DAGScheduler private[scheduler] def getShuffleDependencies( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { // 用来存放依赖 val parents = new HashSet[ShuffleDependency[_, _, _]] // 存放遍历过的RDD val visited = new HashSet[RDD[_]] // 创建一个堆(先进后出) val waitingForVisit = new Stack[RDD[_]] // 将RDD放入堆中 waitingForVisit.push(rdd) // 遍历循环堆 while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() if (!visited(toVisit)) { visited += toVisit // 通过 rdd.dependencies 获取 RDD 的依赖 toVisit.dependencies.foreach { // 如果是 Shuffle 依赖则放入返回的 parents 中 case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep // 如果不是 Shuffle 依赖则获取该 RDD 的上一个父 RDD放入堆中,并重新开始循环(一个一个的往上找 Shuffle,直到最后一个 RDD) case dependency => waitingForVisit.push(dependency.rdd) } } } parents }
在 getOrCreateParentStages 方法
中,会基于 Stage 的 RDD 的依赖关系向上查找其父 RDD ,如果窄依赖则继续向上查找;如果宽依赖(Shuffle 依赖)则会调用 getOrCreateShuffleMapStage 方法
来建立一个ShuffleMapStage。
org.apache.spark.scheduler.DAGScheduler
/**
* @Author: Small_Ran
* @Date: 2022/6/11
* @Description: 第二步:使用 finalStage 创建一个 Job
* 1、一个 Spark 任务可以包含一个或者多个 Job(遇到 行动算子就会提交一个 Job)
* 2、一个 Job 可以包含一个或者多个 Stage(遇到 Shuffle 算子就会形成一个 Stage;例如:collect为行动算子但不是 Shuffle 算子,groupBy 是行动算子中的 Shuffle 算子)
*/
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
org.apache.spark.scheduler.DAGScheduler
// 第三步:将 Job 加入内存缓存中
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
// 第四步: 使用 submitStage 提交 finalStage
submitStage(finalStage)
org.apache.spark.scheduler.DAGScheduler
// 获取该 Stage 未提交的父 Stages,并按 Stage id 从小到大排序
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
org.apache.spark.scheduler.DAGScheduler /** * @Author: Small_Ran * @Date: 2022/6/13 * @param stage * @Description: 获取未找到(缺失)父 Stage * 1、如果一个 Stage 的最后一个 RDD 所对应的父 RDD 都是窄依赖,那么就不会创建新的 Stage(直接返回空) * 2、如果一个 Stage 的最后一个 RDD 所对应的父 RDD, 其中有一个为宽依赖;那么就用宽依赖 RDD 创建一个新的 Stage * 3、最后一个 Stage 叫做 ResultStage ,前面的叫做 ShuffleMapStage * 4、ResultStage 执行完成之后就会按照程序进行数据持久化(出打印外),ShuffleMapStage 执行完成之后数据可能会被持久化或者传递给下一个 Stage */ private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] // 下面就为 Stage 划分的算法 def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) { // 开始遍历 RDD 的依赖 for (dep <- rdd.dependencies) { // 对 RDD 中的依赖关系进行模式匹配 dep match { // 若为宽依赖,生成新的 Stage case shufDep: ShuffleDependency[_, _, _] => val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage } // 若为窄依赖,那就属于同一个stage;并将该 RDD 的父 RDD 放入堆中; case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } } } } }
org.apache.spark.scheduler.DAGScheduler private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { // 获取 当前 Stage 的父 Stage val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) // 如果返回的结果为空,那么直接提交 Stage if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") // 提交 submitMissingTasks(stage, jobId.get) } else { // 如果返回的是多个,那么就开始一个一个的遍历;并进行递归调用 for (parent <- missing) { // 首先创建新的 Stage 是从最后一个 RDD开始,所以一直递归最终是从创建最后一个 Stage 开始提交 submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
Stage的提交时经过submitStage
函数来实现的。主要实现逻辑以下图所示:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。