当前位置:   article > 正文

Spark之Stage的生成及任务的执行_spark stage

spark stage

每一个Spark应用都会创建一个sparksession,用来跟Spark集群交互,如果提交任务的模式为cluster模式,则Driver进程会被随机在某个worker结点上启动,然后真正执行用户提供的入口类,或是使用Spark内置的入口类,同时会在Driver进程创建SparkContext对象,提供Spark应用生命周期内所涉及到的各种系统角色。

client master worker driver dirver executor RequestSubmitDriver LaunchDriver new DriverRunner["deploy.worker.DriverWrapper"] ProcessBuilder.run() waiting driver subprocess to exit setup new RpcEndpoint workerWatcher invoke user defined main class or the internal class loop [WaitFor] LaunchExecutor ExecutorRunner.start() prepare and run subprocess ExecutorStateChanged handle executor state changed, ExecutorStateChanged try to clean finished executors from worker and app info cached ExecutorUpdated client master worker driver dirver executor

SparkContext

使用Spark功能的主入口,用户可以通过此实例在集群中创建RDD、求累加各以及广播变量。
由于Driver进程会真正执行Spark应用的入口程序,因此它只会在driver端被创建。
默认情况下,一个JVM环境只会创建一个SparkContext实例,但用户可以修改spark.driver.allowMultipleContexts的默认值,来启用多个实例。SparkContext内部会启动多个线程来完成不同的工作,比如分发事件到监听者、动态分配和回收executors、接收executor的心跳等,因此需要用户主动调用stop()接口来关闭此实例。

当用户通过Action函数触发RDD上的计算时,(通常指countsumtake等的返回结果为非RDD的函数),便会生成一个Job,被提交到Spark集群运行。而提交Job的入口便是此实例对象,比如RDD中的count()方法的定义如下:

  /**
   * Return the number of elements in the RDD.
   * sc 是SparkContext实例的变量名
   */
  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
  • 1
  • 2
  • 3
  • 4
  • 5

可以看到当一个Action方法被调用时,会通过调用SparkContext的runJob方法,并将当前RDD作为函数参数,触发计算过程,最终通过一系列地包装,将Job任务的生成交由DAGScheduler对象。

DAGScheduler

面向Stage的调度器,它会为每一个Job计算相应的stage,并跟踪哪个RDD和Stage的输出结果被物化了,并找到一个最小的调度方案来跑Job。DAGScheduler会把所有的stage封装成一系列的TaskSet,提交给底层的一个TaskScheduler的实现类,由它来负责任务的执行。TaskSet包含了完整的依赖任务,这些任务都可以基于已经计算出现的数据结果,比如上游Stage的map任务的输出,完成自己的工作,即使这些任务会由于依赖数据不可用而失败。
Spark Stage的生成是通过在RDD图中划分shuffle边界而得到的。RDD的窄依赖,比如map()和filter()方法,会被以流水线的方式划分到一个stage的任务集中(TaskSet),但有shuffle行为的操作需要依赖多个其它的stage的完成(比如一个stage输出一组map数据文件,而另外一个stage会在被阻塞之后读取这些文件)。最终每一个stage都只会与其它stage有shuffle依赖,而且其可能会执行多个操作。
只有当各种类型的RDD之上的RDD.compute()方法被触发时,才会真正地开始执行整个流水线。

另外对于一个DAG图中的所有stage,DAGScheduler也会根据当前集群的状态来决定每一个任务应该在哪个地方执行,最终才会把这些任务发送到底层的TashScheduler。同时DAGScheduler会保证当一个stage的所需要的shuffle数据文件丢失时,同跑上游的stage,而TaskScheduler会保证当一个stage内部产生错误时(非数据文件丢失)会尝试重跑每个任务,在一定次数的尝试之后,取消整个stage的执行。

在尝试查看这部分的实现时,有如下一些概念需要明确:

  • Jobs (内部以ActiveJob表示,两类),上层提交给调度器的工作单元。比如调用count()方法时,一个新的Job就会生成,而这个Job会在执行了多个stage之后生成中间的数据。
  • Stages(内部以Stage表示),包含了一组合任务集(TaskSet),每一个stage会生成当前Job所需的中间结果,而每一个任务都会在同一个RDD数据集的所有Partition(分区)数据块上执行同一个方法。而由shuffle边界(Shuffle Boundries,或称之为Barrier屏障)所分隔的stage,会所有barrier之前的stage执行完成之后才开始执行。因此这里有两类的Stage,一个是ResultStage,表示最后一个stage,输出action方法的结果;另一个是ShuffleMapStage,为每一个shuffle操作输出数据文件。通常这些Stage产生的结果都是可以跨Job共享的,由于这不同的Job使用了同一份RDD数据。
    每一个Stage都有一个firstJobId域,用于标识第一次提交当前Stage的Job的ID,如果使用的是FIFO的调度方式,通过此域可以提供首先执行前置Job中的stage或是有失败时快速恢复的能力。
    最终一个Stage能够在失败时尝试恢复执行多次,因此Stage对象必须跟踪多个StageInfo对象,然后转发给listeners(监听器)或是WEB UI。
  • TaskSet,包含一组有依赖关系的任务,同时也表示这些任务都有共同的shuffle依赖,通常构成一组流水线操作,比如map().filter(),或表示一个特定stage丢失的分区。
  • Tasks,最小的执行单元,会被分发到每个机器上执行。
  • Cache tracking,记录了有哪个RDD已经被缓存了,以避免重复计算;同时也记录了哪些ShuffleMapStage已经完成并生成了数据文件,以避免重复执行shuffle阶段的map任务。
  • Preferred locations: DAGScheduler会根据底层RDD,或是cache或shuffle的数据的位置来计算一个Stage中每个任务最优地执行节点。
  • Cleanup:为了避免一个长周期运行的应用可能导致内存泄漏的问题,正在运行的Job完成时所依赖的所有数据结构都会被清理。
    为了失败时能够故障恢复,一个stage可能被提交多次,称为attempts。如是TaskScheduler报告某个任务由于FetchFailed或是ExeuctorLost事件而失败时,DAGScheduler会重新提交失败的stage。
  /**
   * SparkContext::runJob(..)方法会产生一个JobSubmitted消息,发送给DagScheduler的消息线程线程,
   * 最终调用如下的方法,开始从RDD依赖图生成Stage,然后再生成相应的ShuffleMapTask和ResultTask。
   */
  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

DAGScheduler接收到来自SparkContext对象提交任务的请求后,会根据传递过来的RDD的paritions数量、用户提供的方法函数、JobId等信息,通过逆拓扑序的方法,递归地构建StageTaskSet

提交Stage的过程描述如下:

  1. 首先从ResultStage开始,提交Stage执行,首先尝试创建其父依赖的ShuffleMapStage
  2. 遍历当前Stage的父依赖,如果是NarrowDependency类型的依赖,则将其绑定的RDD添加到待遍历队列,以便继续回溯查找其父依赖,直到找到一个最近的ShuffleMapStage,否则若为ShuffleDependency,则跳转到步骤3
  3. 调用getOrCreateShuffleMapStage()方法,先尝试从缓存的ShuffleMapStage中,按传递进来的ShuffleDependency的shuffleId字段,查找是否已经被创建过,有则返回,没有跳转到步骤4
  4. 找到当前父ShuffleDependency所包含的rdd的所有未创建过对应ShuffleMapStage的祖先ShuffleDenpendency,为这些祖先依赖创建ShuffleMapStage,递归重复步骤2,否则如果所有的祖先依赖都已经被创建,由执行步骤5
  5. 创建当前ShuffleDependency对应的ShuffleMapStage,并绑定所有的直接父Stage
  6. ResultStage对应的所有父ShuffleMapStage创建成功后,并将这些父依赖Stage作为参数创建最后的ResultStage

至此,所有可能的祖先ShuffleMapStage被依次创建完毕,每一个ShuffleMapStage只包含以下其直接父依赖所对应的ShuffleMapStage。

## rdd方法调用图
rdd1 -- map() --> rdd2 -- filter() -> rdd3 -- groupbykey() --> rdd3 
                   \                                               \
                    \                                               \
                      filter() --> rdd4 -- reducebykey() --> rdd5 -- join() --> rdd6 -- count() --> result
## rdd方法调用图转换成依赖图的结果如下
## ndep == narrow dependency
## sdep == shuffle dependency
rdd1 -- ndep --> rdd2 -- ndep -> rdd3 -- [sdep] --> rdd3
                   \                                    \
                    \                                    \
                      ndep --> rdd4 -- [sdep] --> rdd5 -- ndep --> rdd6 -- action --> result
## 如果result stage所对应的RDD的直接父依赖,不是ShuffleDependency,则继续回溯父依赖(NarrowDependency)的父依赖,
## 直到找到了每个直接父依赖可能存在的、离result stage最近的依赖,如stage1, stage2,则算完成第一轮创建。
## 然后再递归创建stage1、stage2所对应的父ShuffleDependencty。
## 依赖图换成stage图的结果如下
stage1 --> stage1  -> stage1 --> stage1 
                                       \
                                          --> stage0 --> stage0
                                       /
stage2 --> stage2 --> stage2 --> stage2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

SchedulerBackend

Driver进程在初始化SparkContext的实例时,会创建任务调度相关的组件,它包括集群管理器SchedulerBackend、任务调度器TaskScheduler的实例,分别用来管理Spark应用关联的集群和任务分发和执行。根据用户的执行环境不同,一共有三种类型的SchedulerBackend(Cluster Manager)可能被创建:

  • LocalSchedulerBackend
    当用户提交的任务以本地方式执行时,创建此类的实例,它会在当前的JVM环境中创建executor线程,执行任务。
  • StandaloneSchedulerBackend
    当用户提交的任务以本地集群的方式执行或是提交到standalone集群执行时,会创建此类的实例,它继承自CoarseGrainedSchedulerBackend类,除了基本的分配和回收Executor功能外,它提供了用于和standalone集群交互的接口,不论是用户使用的是Spark开发模式(用户本地会启动一个常驻的子进程来交互式地提交任务)还是提交模式(spark-submit),都需要通过此实例与集群间接交互。
  • ExternalClusterManager
    外部集群管理器,

核心的源码摘取如下:

  private def createTaskScheduler(
      sc: SparkContext,
      master: String,
      deployMode: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_REGEX(threads) =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val localCluster = new LocalSparkCluster(
          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
        val masterUrls = localCluster.start()
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
          localCluster.stop()
        }
        (backend, scheduler)

      case masterUrl =>
        val cm = getClusterManager(masterUrl) match {
          case Some(clusterMgr) => clusterMgr
          case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
        }
        try {
          val scheduler = cm.createTaskScheduler(sc, masterUrl)
          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
          cm.initialize(scheduler, backend)
          (backend, scheduler)
        } catch {
          case se: SparkException => throw se
          case NonFatal(e) =>
            throw new SparkException("External scheduler cannot be instantiated", e)
        }
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

CoarseGrainedSchedulerBackend(通常所说的driver,StandaloneSchedulerBackend但是它的一个子类)为该类的一个实现类,用于管理所有的executors(这里的executor指的是一个在worker节点启动的后端进程CoarseGrainedExecutorBackend,管理真正的Executor实例),这些executor的生命周期是与Spark Job的绑定的,而非每个Task,这样就避免了过多的资源的申请、创建、回收等的过程,提高整个集群的性能。

当一个Spark应用被提交到集群时,会先传递给DAGScheduler进行Stage的划分及TaskSet的生成,而后续任务的调度和执行由DAGScheduler通过RPCJobSubmitted消息转发给SchedulerBackend对象。

TaskScheduler

其实现类为TaskSchedulerImpl,它会接收来自DAGScheduler生成的一系列TaskSet,然后创建对应的TaskSetManager(实际负责调度TaskSet中的每一个任务),并将TaskSetManager添加到自己的等待资源池里,等待后续的调度(调度方式目前有两种,一是先进选出调度,一种是公平调度)。

  def initialize(backend: SchedulerBackend) {
    // TaskScheduler内部使用的调度线程池,负责选择适合的TaskSetManager执行
    this.backend = backend
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

当TaskSchedulerImpl接收到可调度的任务集后,就通知driver进程,尝试调度Task执行。

  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])

      stageTaskSets.foreach { case (_, ts) =>
        ts.isZombie = true
      }
      stageTaskSets(taskSet.stageAttemptId) = manager
      // 将task set manager添加到调度线程池中
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    // 向SchedulerBackend实例,发送ReviveOffers消息,通知它有新的作业生成了,可以分配这些任务
    // 到合适的Executor上执行了
    backend.reviveOffers()
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

TaskSchedulerCoarseGrainedSchedulerBackend接收已经注册的、可用的空闲executors信息,从自己的等待池中遍历所有的TaskSet,尝试调度每一个TaskSet执行,但很多情况下,并不是每一个TaskSet中的所有Task被一次分配资源并执行,因此TaskScheduler完成为某一个等待中的TaskSet找到一组可用的executors,至于如何在这些executors上分配自己管理的Task,则将由TaskSetManager完成。TaskSetManager尝试分配任务到时某个executor上的核心代码如下:

  @throws[TaskNotSerializableException]
  def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {
    val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
      blacklist.isNodeBlacklistedForTaskSet(host) ||
        blacklist.isExecutorBlacklistedForTaskSet(execId)
    }
    if (!isZombie && !offerBlacklisted) {
      var allowedLocality = maxLocality

      if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }

      dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
        // Found a task; do some bookkeeping and return a task description
        val task = tasks(index)
        val taskId = sched.newTaskId()
        // Do various bookkeeping
        copiesRunning(index) += 1
        val attemptNum = taskAttempts(index).size
        val info = new TaskInfo(taskId, index, attemptNum, curTime,
          execId, host, taskLocality, speculative)
        taskInfos(taskId) = info
        taskAttempts(index) = info :: taskAttempts(index)
        // Update our locality level for delay scheduling
        // NO_PREF will not affect the variables related to delay scheduling
        if (maxLocality != TaskLocality.NO_PREF) {
          currentLocalityIndex = getLocalityIndex(taskLocality)
          lastLaunchTime = curTime
        }
        // Serialize and return the task
        val serializedTask: ByteBuffer = try {
          ser.serialize(task)
        } catch {

          case NonFatal(e) =>
            throw new TaskNotSerializableException(e)
        }
        if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
          !emittedTaskSizeWarning) {
          emittedTaskSizeWarning = true

        addRunningTask(taskId)

        sched.dagScheduler.taskStarted(task, info)
        new TaskDescription(
          taskId,
          attemptNum,
          execId,
          taskName,
          index,
          task.partitionId,
          addedFiles,
          addedJars,
          task.localProperties,
          serializedTask)
      }
    } else {
      None
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

通过上面过程生成的TaskDescriptions的信息,都是可以执行的,最终通过CoarseGrainedSchedulerBackend::launchTasks方法,将这些任务分发到绑定的executor上执行:

class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
  extends ExecutorAllocationClient with SchedulerBackend with Logging
{
    // Launch tasks returned by a set of resource offers
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val serializedTask = TaskDescription.encode(task)
        if (serializedTask.limit >= maxRpcMessageSize) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
                "spark.rpc.message.maxSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK

          logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
            s"${executorData.executorHost}.")
          // Worker结点上的CoarseGrainedExecutorBackend实例(实际上就是Executor)会接收到
          // LaunchTask消息,然后将其扔到自己的工作线程池中执行。
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
TaskSetManager

每一个该类的实例,都对应于一个TaskSet,它负责调度TaskSet中的任务,并跟踪、记录每一个任务的状态和行为。

TaskSetManager内部使用本地化可感知的延迟算法,为自己管理的TaskSet中的每一个任务Task(主要是ShuffleMapTaskResultTask类型的任务)分配executor,同时创建相应的TaskDescription集体,返回给上层角色。

ExecutorAllocationManager

Executor执行器分配管理器,(当用户开启了动态资源分配策略时spark.dynamicAllocation.enabled时会在SparkContext内部创建此对象),基于工作负载动态分配和回收executors,它内部维护了一个可变的目标数据变量,表示当前应用需要多少个活动的executor才能解决任务的积压问题,它的最小值为配置的初始化值,并跟随堆积和正在运行的任务的数量而变化。manager会周期性地与cluster manager(SchedulerBackend后端)同步executor的目标数量的值。

如果当前executor的数量大于目前的负载,会被减少至能够容纳当前正在运行和堆积的任务的数量。而需要增加executor数量的情况发生在有任务堆积待执行的时候,并且在N秒内不能够处理完等待队列中的任务;或者之前增加executor数量的操作不能够在M秒内消费完队列中的任务时候,继续增加executor数量的操作,然后继续下一轮判定。在每一轮的判定中,executor数量的增加是指数级的,直到达到一个上界值,而这个上界是基于配置的spark属性及当前正在运行和等待的任务数量共同决定的。

至于为何以指数的递增方式增加executor,这里有两个方面的合理性:

  1. 添加executor的行为在起始阶段应当是缓慢的,以防当前应用只需求很小的数量便能够完成工作,否则才需要增加更多的executor;
  2. 在一定时间以后,增加executor的动作应该是快速的,以花费更多的时间才能达到目标数量的executor。

移除executor的逻辑很简单,如果某个executor在K秒内没有运行过任务,那么就应该移除它。ExecutorAllocationManager没有重试的逻辑,以增加或是减少executor,因此如何才能保证达到请求的executor数量是由ClusterManager保证的。

与此管理器相关的、可配置的属性有以下几个:

 *   spark.dynamicAllocation.enabled - 是否开启动态分配功能
 *   spark.dynamicAllocation.minExecutors - 活动executor的最小数量
 *   spark.dynamicAllocation.maxExecutors - 活动executor的最大数量
 *   spark.dynamicAllocation.initialExecutors - 应用启动时请求的executor数量
 *
 *   spark.dynamicAllocation.executorAllocationRatio - 控制executor数量的因子,实际的最大executor数量=(任务运行数+等待任务数)* ration / executor.cores
 *
 *   spark.dynamicAllocation.schedulerBacklogTimeout (M) - 如果在timeout时间后依然有堆积的任务,则尝试增加executor的数量
 *
 *   spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) - 如果
 *
 *   spark.dynamicAllocation.executorIdleTimeout (K) - timeout时间后移除executor
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

Job / Stage / Task间的关系

  1. 当前用户调用一个action方法触发执行时,就会创建一个Job,(比如rdd.take(10)方法),就开始在Spark集群中计算数据。
  2. 一个Job开始以后,会先经由DAGScheduler进行依赖分析,从最后一个方法调用开始,回溯RDD的依赖图(当调用一个RDD的方法时,就会新建一个新的RDD实例,如MapPartitionsRDD或是ShuffledRDD等,并封装调用的RDD为其父依赖,MapPartitionsRDD对应于OneToOneDependencyShuffledRDD对应于ShuffleDependency),创建Stage图,而后尝试提交最后一个Stage(即ResultStage)执行。
  3. 具体到如何计算Partition数据,需要创建具体的Task实例在集群中执行。因此DAGScheduler创建好Stage图之后,会采用逆拓扑排序的方法,依次创建为每一个Stage的每个Partition创建任务实例Task。当回溯到某个Stage没有父依赖时,意味着当前Stage应该被首先执行,且可以提交执行,因此将当前Stage添加到running队列中,并为这个Stage的每一个待生成的Partition创建对应的Task,而后提交执行。
  4. TaskSchedulerImpl接收到任务后,且有worker资源启动任务时,就会将任务分配到可用的worker执行,例如ShuffleMapTask会执行Reduce/Map操作,即先reduce之前依赖的Partition的数据,再Map输出reduce之后的Partition数据。
  5. 当前任务执行完成后,不论成功还是失败,都会返回一个MapStatus的实例,描述当前Task的状态信息,并序列化后发送给driver端。
  6. driver端收到任务的更新消息后,就会更新任务相关的所有对象,最终如果当前Stage包含的所有任务都已经正常完成,则会尝试启动所有的孩子Stage执行。如下图中表示的依赖关系,Stage_0Stage_1可以同时执行,但只有先完成Stage_0才会执行Stage_1
    关系图

Spark App提交过程

Spark应用提交到Standalone集群过程的时序图如下所示,一个App提交给master后,最终会在集群内启动一个driver进程,通过各种RPC交互与集群交互,完成应用的运行。

spark submit clientapp master worker driver sparksession sparkcontext 封装DriverDescription RequestSubmitDriver LaunchDriver 启动子进程 SparkSession.getOrCreate() SparkContext.getOrCreate() SparkContext集群交互的入口 spark submit clientapp master worker driver sparksession sparkcontext

Spark App执行过程

Spark App Cluster构建时序图

最终一个Spark应用从Driver进程到启动Executor执行的过程的时序图如下:

sparkcontext StandaloneSchedulerBackend StandaloneAppClient master worker driver dirver CoarseGrainedExecutorBackend 创建Executor调度器,即Driver RpcEndpoint 内部创建ApplicationDescription实例 创建App客户端,用于交互编程 即AppClient RpcEndpoint,提供访问Cluster Manager的接口 send RegisterApplication,并等待注册完成 注册App,生成ID并添加到等待队列,然后返回注册成功的消息给driver进程 receive RegisteredApplication,更新标记位,master地址,appId 通知SchedulerBackend完成注册 更新标记位并更新appId 调用schedule()方法,为app分配executors 为每一个executor构建ExecutorDesc,并更新缓存的ApplicationInfo send LaunchExecutor 默认情况下,master会把app所需要的executor分散到尽可能多的worker上面,用户可以修改spark.deploy.spreadOut的值为false来禁用这个策略 send ExecutorAdded 打印添加Executor成功的信息 ExecutorRunner.start(),内部启动cores个线程来跑Task 创建子进程调用java命令启动入口类CoarseGrainedExecutorBackend,同时更新资源变量并发送ExecutorStateChanged到master ask RegisterExecutor 更新本地缓存的executors信息,如果不存在,则构建ExecutorData response RegisteredExecutor send ExecutorStateChanged receive ExecutorStateChanged,schedule() again 更新缓存的ApplicationInfo的对象中相应executor的状态为最新状态,如果完成状态,则从worker和appInfo中移除 send ExecutorUpdated 打印状态更新日志,如果executor完成,则从driver端移除并更新各类变量 sparkcontext StandaloneSchedulerBackend StandaloneAppClient master worker driver dirver CoarseGrainedExecutorBackend

Spark App 执行

生成RDD

one
createDataFrame
rdd
SparkPlan.execute
doExecute
DataFrame
sparksession
QueryExecutor
HiveTableScanExec
RDD

执行Shuffle Map任务的时序图

RDD sparkcontext dagscheduler mapoutputtrackermaster taskschedulerimpl schedulerbackend driver tasksetmanager executorbackend executor Task blockmanager ShuffleMapTask SortShuffleWriter runJob 执行collect(),结果写到results数组,对应每个分区的结果 runJob submitJob() to send JobSubmitted 生成Job id,创建JobWaiter对象,异步等待Job完成 handleJobSubmitted() to create shuffle map stages 从最后一个rdd向上回溯,构建ShuffleMapStage,如果其有父依赖是Shuffle依赖,则以深度拓扑排序的方法尝试构建所有的祖先ShuffleMapStage,最后构建ResultStage submitStage() to submit stage recursively numAvailableOutputs == numPartitions 先递归提交所有的没有完成的父stage,如果不存在未完成的,则提交当前stage。 submitMissingTasks() to submit stages findMissingPartitions() check partitions of the Job for ResultStage, another of MapOutputTracker make new Broadcast for stage.rdd Broadcast with shuffle dependencies for ShuffuleMapStage, or with func for ResultStage submitTasks() with a TaskSet for the stage new ShuffleMapTask for each partition of SMS or new ResultTask for each partition of RS submitTasks() with a TaskSet createTaskSetManager() for the TaskSet schedulerBuilder.addTaskSetManager() schedulerBuilder is FIFOSchedulableBuilder or FairSchedulableBuilder reviveOffers() send ReviveOffers with driver endpoint ref makeOffers() make new WorkerOffer for each alive executor resourceOffers with IndexedSeq[WorkerOffer] 尝试分析等待任务到时可用的worker中的executor resourceOfferSingleTaskSet() resourceOffer(execId, host, maxLocality) update cache and mark launchedTask as true loop [ShuffledWorkerOffers] loop [SortedTaskSets] loop [submitStage()] launchTasks(tasks: Seq[Seq[TaskDescription]]) serialize each launched TaskDescription update executor free cores in ExecutorDataMap send LaunchTask(new SerializableBuffer(serializedTask)) loop [LaunchedTasks] deserialize TaskDescription launchTask(this, taskDesc) make new TaskRunner with TaskDescription Executors.newCachedThreadPool.execute(task) run() ShuffleMapTask or ResultTask registerTask(taskAttemptId) make new TaskContextImpl() runTask(context) runTask(context) deserialize RDD & ShuffleDependency get writer to write partition data getOrCompute(partition, context) 如果没有对应RDDBlock,则创建对应的RDDBlock,并以迭代器类型存入BlockManager write(records: Iterator[Product2[K, V]]) 上一步得到的是RDD的全量数据,在写出Shuffle数时,会创建ExternalSorter,以遍历迭代器的方式应用用户定义的聚合函数Aggregator和关键字排序函数Ordering make new ShuffleBlockId write data file and index file for this Partition or Task serializedResult,MapStatus including BlockManager adress & block size statusUpdate(taskId, TaskState.FINISHED, serializedResult) send StatusUpdate(executorId, taskId, state, data) statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) taskEnded(tasks(index), Success...) send CompletionEvent handleTaskCompletion(CompletionEvent) RDD sparkcontext dagscheduler mapoutputtrackermaster taskschedulerimpl schedulerbackend driver tasksetmanager executorbackend executor Task blockmanager ShuffleMapTask SortShuffleWriter
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/655827
推荐阅读
相关标签
  

闽ICP备14008679号