(word,1)) val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_) _task划分依据">
当前位置:   article > 正文

spark源码解读-stage,task等划分_task划分依据

task划分依据

如何划分stage

def dataAnalysis() = {

  val lines = wordCountDao.readFile("datas/word.txt")
  val words: RDD[String] = lines.flatMap(_.split(" "))
  val wordToOne = words.map(word=>(word,1))
  val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)
  val array: Array[(String, Int)] = wordToSum.collect()
  array
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

例如上面的代码,action算子collect触发计算,所以跟踪collect:

def collect(): Array[T] = withScope {
  
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}
  • 1
  • 2
  • 3
  • 4
  • 5

会发现有一个runJob的方法,注意这里传入了一个this,就是当前调用collect算子的rdd,后面划分阶段的时候会用到一路跟踪下去:

runJob

  • submitJob (DAGScheduler.scala)

    eventProcessLoop.post(JobSubmitted(
          jobId, rdd, func2, partitions.toArray, callSite, waiter,
          Utils.cloneProperties(properties)))
    
    • 1
    • 2
    • 3

    看到JobSubmitted (DAGScheduler.scala),搜索handleJobSubmitted会找到一个方法,这个方法中的

     try {
          // New stage creation may throw an exception if, for example, jobs are run on a
          // HadoopRDD whose underlying HDFS files have been deleted.
       		// 首先创建一个ResultStage
          finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
        } catch {
       ...
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    由上面的代码可知,一个作业,首先会创建一个ResultStage,点进去这个方法,可以看到如下代码

      private def createResultStage(
          rdd: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          jobId: Int,
          callSite: CallSite): ResultStage = {
        checkBarrierStageWithDynamicAllocation(rdd)
        checkBarrierStageWithNumSlots(rdd)
        checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
        // 在创建ResultStage前获取父阶段的血缘信息,因为可能有依赖关系
        val parents = getOrCreateParentStages(rdd, jobId)
        val id = nextStageId.getAndIncrement()
        // 真正创建ResultStage
        val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
        stageIdToStage(id) = stage
        updateJobIdStageIdMaps(jobId, stage)
        stage
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    点进去getOrCreateParentStages方法,可以看到:

    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
      // 获取到父阶段的依赖,并依次创建stage
      getShuffleDependencies(rdd).map { shuffleDep =>
        getOrCreateShuffleMapStage(shuffleDep, firstJobId)
      }.toList
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    getOrCreateParentStages 方法会递归调用,一直到没有找到父依赖为止

    这里是如何获取的呢?点进去getShuffleDependencies方法:

    private[scheduler] def getShuffleDependencies(
        rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
      val parents = new HashSet[ShuffleDependency[_, _, _]]
      val visited = new HashSet[RDD[_]]
      val waitingForVisit = new ListBuffer[RDD[_]]
      // 将rdd,添加进list中,这里的rdd就是上文说的调用collect的rdd
      waitingForVisit += rdd
      while (waitingForVisit.nonEmpty) { // 这里必然不为空
        val toVisit = waitingForVisit.remove(0) // 将list中的元素移除并获取
        if (!visited(toVisit)) { // 这个rdd还没有被添加进visited这个hashSet,所以会进入if分值
          visited += toVisit
          toVisit.dependencies.foreach { // 这里判断是否为shffle依赖,根据我们的代码,可以发现,reduceByKey是一个shuffle依赖
            case shuffleDep: ShuffleDependency[_, _, _] =>  // 模式匹配进入这个分支,然后结束
              parents += shuffleDep
            case dependency =>
              waitingForVisit.prepend(dependency.rdd)
          }
        }
      }
      // 返回父依赖关系
      parents
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    在ResultStage的相关代码中,有如下方法

     override def findMissingPartitions(): Seq[Int] = {
        val job = activeJob.get
     (0 until job.numPartitions).filter(id => !job.finished(id)) // 这里可以看到是根据job的分区数来创建的
      }
    
    • 1
    • 2
    • 3
    • 4

    总结

    当RDD中存在shuffle依赖时,阶段会自动增加一个

    阶段的数量 = shuffle依赖的数量 + 1

    Ga2vml

如何划分Task

还是从上面的wordcount的列子来看,具体从handleJobSubmitted方法开始:

在该方法的最后一行 有一个 submitStage(finalStage)方法,也就是一个ResultStage,

submtStage(finalStage)源码如下:

private def submitStage(stage: Stage): Unit = {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    logDebug(s"submitStage($stage (name=${stage.name};" +
      s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      // 判断是否有缺失的父stage,因为要执行当前stage必须保证父stage执行完
      val missing = getMissingParentStages(stage).sortBy(_.id)
      logDebug("missing: " + missing)
      if (missing.isEmpty) { // 如果没有父stage,则进入该分支
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage, jobId.get)
      } else {
        for (parent <- missing) {
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

判断是否有缺失的父stage,因为要执行当前stage必须保证父stage执行完,没有则执行submitMissingTasks(stage, jobId.get)方法。点进去看,列出关键代码:

 val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

因为最后执行的肯定是ResultStage,所以我们具体看ResultStage分支

partitionsToCompute.map { id =>
  val p = s.partitions(id)
  (id, getPreferredLocs(stage.rdd, p))
}.toMap

...
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这里的代码去计算task的数量,点击findMissingPartitions()方法,进入是一个抽象类,找到具体ResultStage的实现子类,发现方法:

  override def findMissingPartitions(): Seq[Int] = {
    val job = activeJob.get
    (0 until job.numPartitions).filter(id => !job.finished(id))
  }
  • 1
  • 2
  • 3
  • 4

由上述代码可以知道 task的数量由当前stage的最后一个RDD的分区个数来决定

同理ShuffleMapStage也是相同的原理

总结

ShuffleMapStage 对应的任务是ShuffleMapTask。

ResultStage对应的任务是ResultTask。

val tasks: Seq[Task[_]] = try {
  val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
  stage match {
    case stage: ShuffleMapStage =>
    stage.pendingPartitions.clear()
    partitionsToCompute.map { id =>
      val locs = taskIdToLocations(id)
      val part = partitions(id)
      stage.pendingPartitions += id
      new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
                         taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
                         Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
    }

    case stage: ResultStage =>
    partitionsToCompute.map { id =>
      val p: Int = stage.partitions(id)
      val part = partitions(p)
      val locs = taskIdToLocations(id)
      new ResultTask(stage.id, stage.latestInfo.attemptNumber,
                     taskBinary, part, locs, id, properties, serializedTaskMetrics,
                     Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
                     stage.rdd.isBarrier())
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

一个stage阶段中,最后一个RDD的分区个数就是task的个数

RDD 任务切分中间分为:Application、Job、Stage 和 Task

⚫ Application:初始化一个 SparkContext 即生成一个 Application;

⚫ Job:一个 Action 算子就会生成一个 Job;

⚫ Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;

⚫ Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。 注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。

解释:

一个applicaiton会有多个job:只要有一个action算子就会submitjob一次

一个job中有多个stage:只要在job中存在shuffle依赖关系就会划分出stage

一个stage中有多个task:task的数量主要决定于该stage阶段中,最有一个RDD的分区个数。

image-20210519235522054

一个 Action 算子就会生成一个 Job,绿色和紫色是一个job,RDD有3个task 黄色是一个job,RDD有3个task 所以整个Job一共6个task

– by 俩只猴

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/655834
推荐阅读
相关标签
  

闽ICP备14008679号