34 Spark中任务处理的Stage划分和Task最佳位置算法_划分stage的算子



1.     Job Stage的划分算法

2.     Task最佳计算位置算法






1.      Stage划分算法思想




       SparkApplication 中可以因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是有一个或者多个Stage构成,后面的Stage依赖前面的Stage,也就是说只有前面的Stage计算完后,后面的Stage才会运行。




   * Return an array that contains all of the elements in this RDD.
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)

   * Run a job on all partitions in an RDD and return the results in an array.
  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.length)

   * Run a job on a given set of partitions of an RDD, but take a function of type
   * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: Iterator[T] => U,
      partitions: Seq[Int]): Array[U] = {
    val cleanedFunc = clean(func)
    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)

   * Run a function on a given set of partitions in an RDD and return the results as an array.
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)

   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark.
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    waiter.awaitResult() match {
      case JobSucceeded =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case JobFailed(exception: Exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
      jobId, rdd, func2, partitions.toArray, callSite, waiter,

/** A result-yielding job was submitted on a target RDD */
private[scheduler] case class JobSubmitted(//这里面封装了哪些Partition要进行计算,
    jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties = null)
  extends DAGSchedulerEvent

例如 collect 会导致SparkContext中的runJob方法的执行,最终会导致DAGScheduler中的submit的执行。这其中最为核心的是通过发送一个名为JobSubmit的case class对象给eventProcessLoop。


private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)<pre name="code" class="plain">  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging
private[spark] abstract class EventLoop[E](name: String) extends Logging {

   * Put the event into the event queue. The event thread will process it later.
  def post(event: E): Unit = {
def start(): Unit = { //启动一个
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    // Call onStart before starting the event thread to make sure it happens before onReceive

private val eventThread = new Thread(name) {
    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()//取出放入的消息
          try {

  protected def onReceive(event: E): Unit//抽象方法调用子类的实现。

/** DAGSchedulerEventProcessLoop
   * The main event loop of the DAG scheduler.
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
    } finally {
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {//处理消息
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)






  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) //5
    } catch {
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

   * Create a ResultStage associated with the provided jobId.  5处被调用
  private def newResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
    stageIdToStage(id) = stage //将Stage的id放入stageIdToStage结构中。
    updateJobIdStageIdMaps(jobId, stage) //更新JobIdStageIdMaps


   * Helper function to eliminate some code re-use when creating new stages.
  private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
val parentStages = getParentStages(rdd, firstJobId)
	val id = nextStageId.getAndIncrement()
    (parentStages, id)


   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
   * the provided firstJobId.
  private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
	val parents = new HashSet[Stage]
	val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              parents += getShuffleMapStage(shufDep, firstJobId)  //6
            case _ =>
	//以resultRDD作为第一个需要处理的RDD,然后从该rdd开始,顺序处理其parent rdd
    while (waitingForVisit.nonEmpty) {

可以看出,我们将Result RDD和firstJobId(由于有些Job在执行中会触发其他Job)作为出入参数,调用getParentStages方法,来创建我们Job的DAG。从源码上来看,是将我们前面的第二十三讲的思想,具体实现为代码。



   * Get or create a shuffle map stage for the given shuffle dependency's map side.
  private def getShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
      //如果这个Shuffle Stage已存在,则返回该Stage
	case Some(stage) => stage
	case None =>
        // We are going to register ancestor shuffle dependencies
	//注册以前的shuffle dependency,并将shuffle dependency转换为新的Stage或已存在
        getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>    //7
          shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)   //8
        // Then register current shuffleDep
		//注册当前的shuffle dependency
        val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)   //8
        shuffleToMapStage(shuffleDep.shuffleId) = stage

//寻找DAG中当前shuffle dependency的rdd树中以前的shuffle dependency  7处被调用
private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
	//存储前面的shuffle dependency
    val parents = new Stack[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
            case _ =>

    while (waitingForVisit.nonEmpty) {

<span style="font-family: Arial, Helvetica, sans-serif;">//8处被调用</span>
private def newOrUsedShuffleStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    val numTasks = rdd.partitions.length
    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)  //9
    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
	val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      (0 until locs.length).foreach { i =>
        // locs(i)等于空表示
        if (locs(i) ne null) {
          stage.addOutputLoc(i, locs(i))
    } else {
      // 由于具体的partition我们并不知道,所以我们只能在内存中记录rdd以及映射output 
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)


private def newShuffleMapStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int,
      callSite: CallSite): ShuffleMapStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)
    val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
      firstJobId, callSite, shuffleDep)

    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(firstJobId, stage)


由于创建DAG的过程是递归进行的,所以在创建后一个Stage时,必然保证其直接父Stage已经创建(未创建,便会递归调用getParentStage来创建父Stage),知道递归的最底层,也就是遇到了DAG中的最左边的RDD,此时会为这个时候的ShuffleDependency创建Stage0,并跳出底层递归,然后逐层创建stage,并返回给上一层,知道最后来到顶层也就是Result Stage,创建Result Stage,完成DAG的创建。





       在上一节,我们介绍了Job Stage划分算法,并最终得到了DAG图中的Result Stage(final Stage)。接下来我们通过查看Task任务本地性(为了保证Data Locality)的运用场景----Task的运行调度处理,来引入Task任务本地性算法。

       在得到逻辑上Result Stage,Spark为了进行计算就必须先报任务以一定的集群可识别形式提交给集群进行计算。Spark的任务提交过程如下:


private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage) //9


/**提交Stage,如果有未提交的ParentStage,则会递归提交这些ParentStage,只有所有ParentStage都计算完了,才能提交当前Stage */  
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
	//如果当前Stage不再等待parent Stage的返回,也不是正在运行的Stage,并且也没有提
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get) //10
        } else {
          for (parent <- missing) {
          waitingStages += stage
    } else {//无效作业,停止它。
      abortStage(stage, "No active job for stage " + stage.id, None)


private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")
    // Get our pending tasks and remember them in our pendingTasks entry
    //找到需要计算的Partition id
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
    if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {
    val properties = jobIdToActiveJob(jobId).properties
    runningStages += stage
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          val job = s.activeJob.get
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
    } catch {
      case NonFatal(e) =>
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
        runningStages -= stage

    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
    val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.internalAccumulators)

        case stage: ResultStage =>
          val job = stage.activeJob.get
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, stage.internalAccumulators)
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
        runningStages -= stage
    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingPartitions ++= tasks.map(_.partitionId)
      logDebug("New pending partitions: " + stage.pendingPartitions)
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      // 如果Tasks执行完了,表示该Stage执行完成
      markStageAsFinished(stage, None)

      val debugString = stage match {
        case stage: ShuffleMapStage =>
          s"Stage ${stage} is actually done; " +
            s"(available: ${stage.isAvailable}," +
            s"available outputs: ${stage.numAvailableOutputs}," +
            s"partitions: ${stage.numPartitions})"
        case stage : ResultStage =>
          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"




val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap  //11
        case s: ResultStage =>
          val job = s.activeJob.get
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
    } catch {
      case NonFatal(e) =>
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
        runningStages -= stage

这里会将要计算的分片(Partition)转换为(id, getPreferredLocs(stage.rdd, id)) 类型的truple,进而由truple转换未一个Map映射,在Task构造时需要一个locs参数,便可以利用这个映射由id得到相应Partition的本地性级别。


private[spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
    getPreferredLocsInternal(rdd, partition, new HashSet)
private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    // 如果已访问过RDD,即以获得RDD的TaskLocation则不需再次获得
    if (!visited.add((rdd, partition))) {
      // Nil has already been returned for previously visited partitions.
      return Nil
    // 如果RDD缓存在内存中,我们访问RDD实例化时的信息便可知道RDD在那个节点上
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    // 利用RDD在创建时重写的preferredLocations获得数据Location,从而确定Task的本地性
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))

    // 如果RDD是窄依赖,则递归查找窄依赖链条上的第一个RDD的第一个Partition的
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
      case _ =>





