当前位置:   article > 正文

spark源码(五)--job触发流程以及stage划分算法_spark stage任务划分算法

spark stage任务划分算法

master在完成资源分配后,运行一个application的条件就成熟了,下面需要解析的就是如何运行一个app。我们之前提到过,在初始化sparkContext中会初始化一个taskScheduler和DAGScheduler,taskScheduler的主要任务是创建task调度池实现task级别的任务调度和联系master进行app注册;DAGScheduler则是面向stage的高层次的调度,负责application的job的划分。下面我们就看下DAGScheduler的源码。
路径是 core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scala
但是我们这次不直接找这个类,因为刚刚说了,sparkContext初始化的时候会初始化DAGScheduler,我们先找到sparkContext中的runJob()方法

/**
   * Run a function on a given set of partitions in an RDD and pass the results to the given handler function. This is the main entry point for all actions in Spark.
   * 在RDD一系列分区上运行一个函数并且将结果传递给下一个处理函数。这是spark中action算子的主要入口
   * 
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all  partitions of the target RDD, e.g. for operations like `first()`
   * @param resultHandler callback to pass each result to
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    // 直接调用了dagScheduler的runJob()方法
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

在我们去看dagScheduler的runJob()方法之前,我们先看下DAGScheler的官方定义,先对它有个整体认识,再具体方法

**
 * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
 * stages for each job, keeps track of which RDDs and stage outputs are materialize
 * d, and finds a  minimal schedule to run the job. It then submits stages as TaskSets to an underlying
 * TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent
 * tasks that can run right away based on the data that's already on the cluster (e.g. map output
 * files from previous stages), though it may fail if this data becomes unavailable.
   面向stage的高层次的调度。它会给每个job都计算一个DAG有向无环图 追踪每个rdd的输出是否被物化了,并且寻找执行job的一个最小调度
   以taskset的方式提交一个stage并且以TaskScheduler的实现类在集群上执行。

 * In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred
 * locations to run each task on, based on the current cache status, and passes these to the
 * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
 * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
 * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
 * a small number of times before cancelling the whole stage.
   除了处理stage的DAG,DAGScheduler还决定运行每个task的最佳位置,基于当前的缓存状态,将这些最佳位置提供给底层的TaskSchedulerImpl去运行
   此外,他还会处理由于shuffle输出文件丢失导致的失败,这种情况下,旧的stage需要重新被提交。在一个stage内,不是由于shuffle输出文件丢失导致的失败,
   会被TaskScheduler去处理,他会多次运行task,直到取消整个stage

 */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

接下来进入DAGScheduler的runJob()方法

/**
   * Run an action job on the given RDD and pass all the results to the resultHandler function as
   * they arrive.
   *在给定的RDD上运行一个action job 并且把结果传递给resultHandler 
   */
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    // 核心是调用了submitJob()方法
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

submitJob()方法

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // 确认不会在不存在的partition上启动task
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException()
    }
    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    // 在这里调用了JobSubmitted()方法
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

JobSubmitted()实际上是一个接口,在DAGScheduler()方法中有对应的方法,叫做handleJobSubmitted()

 private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // 首先用触发job的最后一个RDD创建一个resultStage
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
       // 进行异常捕捉,因为直接创建一个finalStage 可能会报错。比如job可能会运行一个hadoopDD但是HDFS文件却被删了。
    }
    // Job submitted, clear internal data.
    barrierJobIdToNumTasksCheckFailures.remove(jobId)

    // 用finalstage创建一个job
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    /**
    * cacheLocas:缓存的每个RDD的所有分区的位置信息,最终建立已经缓存的分区号和位置信息序列映射。为什么分区号和位置还会是序列呢?
    * 因为每一个分区可能存在多个副本机制,因此RDD的每一个分区的BLock可能存在多个节点的BlockManager上,因此是序列
    */
    // 这里将rdd缓存的位置信息全部清除
    clearCacheLocs()

    // job添加到内存
    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    // 重要!!!stage提交算法
    submitStage(finalStage)
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

handleJobSubmitted()方法是DAGScheduler调度的开始。提交Job总结为4步:
1.用触发job的最后一个RDD创建一个finalstage(stage分为两种,除了job的最后一个stage即resultStage外其他都是shuffleMapStage)
2.用finalstage创建一个job (job以action算子划分)
3.将job加入到DAGScheduler缓存
4.递归提交stage

下面就是spark中stage的划分算法submitStage()

private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)   // 获取finalstage对应的jobid
    if (jobId.isDefined) {
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        // 获取当前stage的父stage
        val missing = getMissingParentStages(stage).sortBy(_.id)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          // 如果没有父stage说明整个job就只有一个satge 直接提交这个stage中的task
          submitMissingTasks(stage, jobId.get)
        } else {
          // 核心!!!如果有父stage,遍历所有的父stage,递归寻找stage的父stage
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage    // 并把当前stage加入到等待调度的队列中
        }
      }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

看一下是怎么实现stage的划分的,也就是getMissingParentStages()方法

/**
*这是一个宽度优先遍历的算法(sparkstreaming中划分stage也是这种算法)。通过递归调用遍历上游依赖,直到找到需要进行实际计算的最小集合
*/
// 接收一个stage,返回的是一个stage的集合,也就是该stage所有的父stage
private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new ArrayStack[RDD[_]]    // 手动维护一个栈 避免因为递归调用导致内存溢出
  
    def visit(rdd: RDD[_]) {     // 定义一个遍历rdd的方法,接收一个RDD
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {  // 如果这个RDD中有没有缓存的partition就遍历这个RDD的依赖  这是为啥?
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>       // 遍历这个RDD的依赖,如果是宽依赖,那就说明是2个stage 就创建一个shuffleMapStage。除了最后一个stage是resultStage,其他都是shuffleMapStage
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage  // 把这个shuffleMapStage加入父stage集合返回
                }
              case narrowDep: NarrowDependency[_] =>  
                waitingForVisit.push(narrowDep.rdd)   // 如果是窄依赖,那么就把窄依赖的这个rdd推入栈中
            }
          }
        }
      }
    }
   
    waitingForVisit.push(stage.rdd)    // 首先往栈中推入了该stage的rdd
    // 只要还有rdd没有遍历到,就循环遍历这个rdd
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())  // 依次遍历该栈中所有的rdd,这里注意,如果从waitingForVisit中pop()出来了一个rdd,遍历他的依赖发现是个窄依赖,那么会将他窄依赖的那个rdd也推入到waitingForVisit这个栈中
    }
    missing.toList   // 将父stage返回
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

小结:submitStage()的核心思想是宽度优先遍历(BFS)。首先提交一个resultStage,然后遍历resultStage的rdd的依赖,有宽依赖就是父stage,否则就是同一个stage。然后再递归调用submitStage遍历他的父stage

job触发流程以及stage划分算法重要方法梳理(从上到下按顺序执行):

-> DAGScheduler.runJob->submitJob(rdd, func, partitions, callSite, resultHandler,
       properties)
    
-> DAGScheduler.eventProcessLoop.post(JobSubmitted(
		   jobId, rdd, func2, partitions.toArray, callSite, waiter,
		   SerializationUtils.clone(properties))) 

-> DAGSchedulerEventProcessLoop-> case JobSubmitted(jobId, dependency, callSite, listener, properties)->

-> DAGScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

-> DAGScheduler.createResultStage(finalRDD, func, partitions, jobId, callSite)  (创建resultStage)

-> DAGScheduler.submitStage(finalStage)  (广度优先算法,建立无环图)

-> DAGScheduler.getMissingParentStages(stage)  

-> DAGScheduler.submitMissingTasks(stage, jobId.get)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/655815
推荐阅读
相关标签
  

闽ICP备14008679号