(word,1)) val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_) _task划分依据">
赞
踩
def dataAnalysis() = {
val lines = wordCountDao.readFile("datas/word.txt")
val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne = words.map(word=>(word,1))
val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)
val array: Array[(String, Int)] = wordToSum.collect()
array
}
例如上面的代码,action算子collect触发计算,所以跟踪collect:
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
会发现有一个runJob的方法,注意这里传入了一个this,就是当前调用collect算子的rdd,后面划分阶段的时候会用到一路跟踪下去:
runJob
submitJob (DAGScheduler.scala)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
Utils.cloneProperties(properties)))
看到JobSubmitted (DAGScheduler.scala),搜索handleJobSubmitted会找到一个方法,这个方法中的
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
// 首先创建一个ResultStage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
...
}
由上面的代码可知,一个作业,首先会创建一个ResultStage,点进去这个方法,可以看到如下代码
private def createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) // 在创建ResultStage前获取父阶段的血缘信息,因为可能有依赖关系 val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() // 真正创建ResultStage val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }
点进去getOrCreateParentStages方法,可以看到:
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
// 获取到父阶段的依赖,并依次创建stage
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
getOrCreateParentStages
方法会递归调用,一直到没有找到父依赖为止
这里是如何获取的呢?点进去getShuffleDependencies方法:
private[scheduler] def getShuffleDependencies( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { val parents = new HashSet[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] val waitingForVisit = new ListBuffer[RDD[_]] // 将rdd,添加进list中,这里的rdd就是上文说的调用collect的rdd waitingForVisit += rdd while (waitingForVisit.nonEmpty) { // 这里必然不为空 val toVisit = waitingForVisit.remove(0) // 将list中的元素移除并获取 if (!visited(toVisit)) { // 这个rdd还没有被添加进visited这个hashSet,所以会进入if分值 visited += toVisit toVisit.dependencies.foreach { // 这里判断是否为shffle依赖,根据我们的代码,可以发现,reduceByKey是一个shuffle依赖 case shuffleDep: ShuffleDependency[_, _, _] => // 模式匹配进入这个分支,然后结束 parents += shuffleDep case dependency => waitingForVisit.prepend(dependency.rdd) } } } // 返回父依赖关系 parents }
在ResultStage的相关代码中,有如下方法
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id)) // 这里可以看到是根据job的分区数来创建的
}
当RDD中存在shuffle依赖时,阶段会自动增加一个
阶段的数量 = shuffle依赖的数量 + 1
还是从上面的wordcount的列子来看,具体从handleJobSubmitted方法开始:
在该方法的最后一行 有一个 submitStage(finalStage)方法,也就是一个ResultStage,
submtStage(finalStage)源码如下:
private def submitStage(stage: Stage): Unit = { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug(s"submitStage($stage (name=${stage.name};" + s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { // 判断是否有缺失的父stage,因为要执行当前stage必须保证父stage执行完 val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { // 如果没有父stage,则进入该分支 logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
判断是否有缺失的父stage,因为要执行当前stage必须保证父stage执行完,没有则执行submitMissingTasks(stage, jobId.get)方法。点进去看,列出关键代码:
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return }
因为最后执行的肯定是ResultStage,所以我们具体看ResultStage分支
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
...
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
这里的代码去计算task的数量,点击findMissingPartitions()方法,进入是一个抽象类,找到具体ResultStage的实现子类,发现方法:
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}
由上述代码可以知道 task的数量由当前stage的最后一个RDD的分区个数来决定。
同理ShuffleMapStage也是相同的原理
ShuffleMapStage 对应的任务是ShuffleMapTask。
ResultStage对应的任务是ResultTask。
val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } }
一个stage阶段中,最后一个RDD的分区个数就是task的个数
RDD 任务切分中间分为:Application、Job、Stage 和 Task
⚫ Application:初始化一个 SparkContext 即生成一个 Application;
⚫ Job:一个 Action 算子就会生成一个 Job;
⚫ Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
⚫ Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。 注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
解释:
一个applicaiton会有多个job:只要有一个action算子就会submitjob一次
一个job中有多个stage:只要在job中存在shuffle依赖关系就会划分出stage
一个stage中有多个task:task的数量主要决定于该stage阶段中,最有一个RDD的分区个数。
一个 Action 算子就会生成一个 Job,绿色和紫色是一个job,RDD有3个task 黄色是一个job,RDD有3个task 所以整个Job一共6个task
– by 俩只猴
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。