当前位置:   article > 正文

spark最新版本3.0.0中job划分、调度以及任务的提交执行流程源码分析_jobqueue:3.0.0

jobqueue:3.0.0

前言

上文我们对spark整个应用运行流程进行了整体的分析,但是后面最后一步关于任务的划分、调度和执行就一笔带过了,今天我们就来对spark任务的执行流程的底层源码分析,搞懂这个会对我们理解spark几个术语有比较清楚的理解了,如:

术语描述关系
job工作调用一个行动算子就会有一个job生成
stage阶段有一个shuffle算子就会多一个阶段,最少一个
Task任务一个stage可以有多个Task,Task和并行度(分区数)有关,一般是有多少个分区数,就有多少个Task

一 整个任务的流程图示

这是我自己画的,就是一个大概的流程

在这里插入图片描述

关于shuffle过程的原理,可以参考我写的这篇文章 【spark中shuffle操作底层原理】

我在网上找到一个具体,非常仔细的流程图,具体到每个方法

在这里插入图片描述

大家可以跟着图示来看下面的源码分析,可以更加清晰和方便的理解整个流程了

二 任务的划分、调度和执行

1 调用行动算子生成job

用wordcount来说明,我们进入collect方法

//SparkContext调用runjob方法
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  • 1
  • 2

进入runJob方法

//看名字就知道这是调用有向无环图的runjob方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler,localProperties.get)
  • 1
  • 2

继续进入

//提交job
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
//阻塞线程直到任务划分好
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
  • 1
  • 2
  • 3
  • 4

进入submitJob方法

//eventProcessLoop事件进程池进行操作
eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      Utils.cloneProperties(properties)))
  • 1
  • 2
  • 3
  • 4

进入JobSubmitted

/** A result-yielding job was submitted on a target RDD */
private[scheduler] case class JobSubmitted(
    jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties = null)
  extends DAGSchedulerEvent
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

英语好的大佬估计一眼就翻译出来这个类上面的注释了,小弟不行,谷歌翻译一波

已按照目标RDD提交了一项要提高结果的工作

表明了调用行动算子后会提交一个job

2 划分阶段并发送Task给Executor

进入post方法

 //一个阻塞队列
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

  def post(event: E): Unit = {
    if (!stopped.get) {
      if (eventThread.isAlive) {
          //将提交的job放在这个阻塞队列中
        eventQueue.put(event)
      } else {
        onError(new IllegalStateException(s"$name has already been stopped accidentally."))
      }
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

我们可以看到当eventThread线程是存活时,才会往eventQueue队列里面put事件

进入eventThread中,查看run方法

       while (!stopped.get) {
           //从阻塞队列中获取job
          val event = eventQueue.take()
          try {
              //发送job
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

进入onReceive方法

doOnReceive(event)
  • 1
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    //模式匹配JobSubmitted,并调用有向无环图的处理JobSubmitted方法
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
  • 1
  • 2
  • 3
  • 4

进入handleJobSubmitted方法

//创建阶段
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  • 1
  • 2
//获取父阶段
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
//创建一个ResultStage阶段
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  • 1
  • 2
  • 3
  • 4
  • 5

创建一个ResultStage阶段,最后返回的就是这个阶段,只不过这个阶段里面还封装了0个或多个ShuffleMapStage阶段

进入getOrCreateParentStages方法

//获取rdd的依赖,并调用getOrCreateShuffleMapStage方法
getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
  • 1
  • 2
  • 3

进入getShuffleDependencies方法

 private[scheduler] def getShuffleDependencies(
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new ListBuffer[RDD[_]]
    waitingForVisit += rdd
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.remove(0)
      if (!visited(toVisit)) {
        visited += toVisit
        toVisit.dependencies.foreach {
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep
          case dependency =>
            waitingForVisit.prepend(dependency.rdd)
        }
      }
    }
    parents
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

该方法主要就是用来获取所有的ShuffleDependency依赖,主要逻辑是这样的

创建两个hashSet

parents:用来存放有ShuffleDependency依赖

visited:用来做标记,标记rdd是否访问过

val waitingForVisit = new ListBuffer[RDD[_]]用来存放传进来的rdd

然后判断这个list是否为空,获取这个rdd的所有依赖,如果有shuffleDep就添加进parents,否则就将该dependency的rdd添加进waitingForVisit ,然后继续进行判断,知道找出所有的ShuffleDependency,然后返回

parents

回退到getOrCreateParentStages方法,进入到getOrCreateShuffleMapStage方法

//创建ShuffleMapStage阶段
createShuffleMapStage(shuffleDep, firstJobId)
  • 1
  • 2

进入createShuffleMapStage方法

//创建ShuffleMapStage阶段
val stage = new ShuffleMapStage(
      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
  • 1
  • 2
  • 3

因为getShuffleDependencies返回的是HashSet并且调用的map映射方法,所以我们可以知道,每有一个ShuffleDependency就会生成一个ShuffleMapStage阶段

回退到handleJobSubmitted方法

//提交阶段
submitStage(finalStage)
  • 1
  • 2

我们看看阶段具体是怎么提交的,进入到submitStage方法

val missing = getMissingParentStages(stage).sortBy(_.id)

if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new ListBuffer[RDD[_]]
    waitingForVisit += stage.rdd
    def visit(rdd: RDD[_]): Unit = {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.prepend(narrowDep.rdd)
            }
          }
        }
      }
    }
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.remove(0))
    }
    missing.toList
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

我们看看就知道了和之前获取ShuffleDependency依赖的方法是何其相似,具体的逻辑都是一样的

missing中存放的是每个ShuffleMapStage阶段

回退到submitStage方法

if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
} else {
      for (parent <- missing) {
            submitStage(parent)
      }
      waitingStages += stage
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

如果missing不为空,则会递归调用submitStage方法,即提交每个ShuffleMapStage阶段,最后如果为空了就调用submitMissingTasks方法,也就是递归跳出的条件

进入submitMissingTasks方法

val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

我们可以看出每一个ShuffleMapStage阶段都会变成ShuffleMapTask任务,而ShuffleMapTask任务的多少取决于

partitionsToCompute这个分区计算器,所以也就是说一个分区对应一个Task

 if (tasks.nonEmpty) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
     //如果Task不为空则提交Task
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } 

else{
    //提交等待的子阶段
    submitWaitingChildStages(stage)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

我们可以分析,当Tasks为空时,会调用submitWaitingChildStages方法,看名字就知道是提交子阶段,即我们提交阶段是从最前面一个阶段开始依次往后面阶段提交

我们可以看看submitWaitingChildStages方法**

  private def submitWaitingChildStages(parent: Stage): Unit = {
    logTrace(s"Checking if any dependencies of $parent are now runnable")
    logTrace("running: " + runningStages)
    logTrace("waiting: " + waitingStages)
    logTrace("failed: " + failedStages)
    val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
    waitingStages --= childStages
    for (stage <- childStages.sortBy(_.firstJobId)) {
      submitStage(stage)
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

从最后一个循环我们就可以看出它是按firstJobId进行排序后的顺序就行提交的,也就是最前面的阶段开始依次往后面提交

进入submitTasks方法

stageTaskSets(taskSet.stageAttemptId) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
  • 1
  • 2

将任务封装好,并调用schedulableBuilder的addTaskSetManager方法

进入addTaskSetManager方法

我们可以查看schedulableBuilder是特质,所以肯定有实现的类,查看继承树

子类分别是FIFOSchedulableBuilder和FairSchedulableBuilder,看名字就能知道分别是先进先出调度器和公平调度器

override def addTaskSetManager(manager: Schedulable, properties: Properties): Unit = {
    rootPool.addSchedulable(manager)
  }
  • 1
  • 2
  • 3

我们可以看到是将我们的封装好的任务放到一个任务池当中存放,至于为什么要创建一个任务池来存放我们的任务是因为我们任务创建好之后如果Executor端还没有申请好资源,那么任务就算发送过去也执行不了,并且会丢失任务,所以我们需要创建一个任务池来临时存放任务,等待Executor端所有资源申请好之后再发送Task

回退到submitTasks方法中

我们看最后一行代码

backend.reviveOffers()

override def reviveOffers(): Unit = {
    driverEndpoint.send(ReviveOffers)
}
  • 1
  • 2
  • 3
  • 4
  • 5
case ReviveOffers =>
   makeOffers()
  • 1
  • 2

ReviveOffers会作为一条消息被接收

进入makeOffers方法

if (taskDescs.nonEmpty) {
    //启动任务
    launchTasks(taskDescs)
}
  • 1
  • 2
  • 3
  • 4

进入launchTasks方法

for (task <- tasks.flatten) {
   val serializedTask = TaskDescription.encode(task)
  • 1
  • 2

先将任务进行编码序列化

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
  • 1

调用executor终端的发送方法,发送我们已经序列化好的Task(LaunchTask)

3 Executor执行Task

driver将job划分好阶段,将阶段提交到executor交给计算对象进行执行

进入CoarseGrainedExecutorBackend类中的receive方法

private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]   

case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
          //将发送过来的任务进行反编码
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        taskResources(taskDesc.taskId) = taskDesc.resources
          //executor启动任务进行计算
        executor.launchTask(this, taskDesc)
      }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

先将发送过来的任务进行反编码,然后将我们的任务进行一些封装后存放在taskResourcesmap中然后通过executor计算对象启动Task

进入launchTask方法

private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]  

//是一个缓存的线程池,里面是ThreadPoolExecutor
 private val threadPool =    Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    val tr = new TaskRunner(context, taskDescription)
    runningTasks.put(taskDescription.taskId, tr)
    //线程池执行计算任务
    threadPool.execute(tr)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

对计算任务进行封装成TaskRunner,通过ThreadPoolExecutor执行TaskRunner

自此job的划分、调度以及执行就此全部结束了,关于spark整个应用提交运行以及任务的提交执行就有了非常清楚的认识了,再去学习怎么使用spark框架就会更加得心应手了。

如果觉得有帮助的小伙伴点个赞再走呗!!!
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/655875
推荐阅读
相关标签
  

闽ICP备14008679号