赞
踩
*sc.runJob(this, (iter: Iterator[T]) => iter.toArray) //传入了一个函数f,它将每个分区迭代器转换为数组
*runJob(rdd, func, 0 until rdd.partitions.length) //多传入了一个0---分区长度的range集合
*val cleanedFunc = clean(func) //闭包检查
*runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions) //继续调用runJob
*dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
*eventProcessLoop.post(JobSubmitted()) //封装一个JobSubmitted对象,然后交给eventProcessLoop.post方法处理
*eventQueue.put(event) //内部调用了一个事件队列对象的put方法,将当前事件存放到事件队列中去
*val event = eventQueue.take() //为了提高效率,在EventLoop类中开启了多个线程来处理队列中的事件
*onReceive(event) //将当前事件作为参数传给onReceive方法来处理
//onReceive底层调用dagScheduler.handleJobSubmitted
*dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
*dagScheduler.handleJobSubmitted
//创建finalStage 对象
*finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
//其实内部已经通过递归的方式得到所有shufflemapstage并维护stage之间依赖关系
*val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
//提交finalStage 对象
*submitStage(finalStage)
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null //创建finalStage 对象 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) //创建了一个ActiveJob对象封装一个作业 传入参数有jobid finalstage 在job对象中关联了finalstage val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job //在finalstage中关联job对象 finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //最终提交的时候其实是finalstage submitStage(finalStage) }
private def createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
/** * Returns shuffle dependencies that are immediate parents of the given RDD. * This function will not return more distant ancestors. */ private[scheduler] def getShuffleDependencies( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { //创建一个存放宽依赖的set集合 val parents = new HashSet[ShuffleDependency[_, _, _]] //创建一个存放rdd的set集合 记录访问过的rdd val visited = new HashSet[RDD[_]] //创建一个存放rdd的栈 记录待访问的rdd val waitingForVisit = new ArrayStack[RDD[_]] //把当前rdd存放到栈中 waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { //取出栈顶rdd val toVisit = waitingForVisit.pop() //如果当前rdd没有被访问过 if (!visited(toVisit)) { //将其加入到访问过的集合中 visited += toVisit //遍历当前rdd的依赖 如果是宽依赖就加入到parents集合中,如果是窄依赖,就将窄依赖对应rdd得到存放到栈中,等待下次遍历 toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep case dependency => waitingForVisit.push(dependency.rdd) } } } parents }
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
private def getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleIdToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => // Create stages for all missing ancestor shuffle dependencies. getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => if (!shuffleIdToMapStage.contains(dep.shuffleId)) { createShuffleMapStage(dep, firstJobId) } } // Finally, create a stage for the given shuffle dependency. createShuffleMapStage(shuffleDep, firstJobId) } }
private def getMissingAncestorShuffleDependencies( rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = { //记录所有宽依赖 val ancestors = new ArrayStack[ShuffleDependency[_, _, _]] //记录访问过的rdd val visited = new HashSet[RDD[_]] //记录待访问的rdd val waitingForVisit = new ArrayStack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() if (!visited(toVisit)) { visited += toVisit getShuffleDependencies(toVisit).foreach { shuffleDep => if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) { ancestors.push(shuffleDep) waitingForVisit.push(shuffleDep.rdd) } // Otherwise, the dependency and its ancestors have already been registered. } } } ancestors }
private def getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleIdToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => // Create stages for all missing ancestor shuffle dependencies. getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => //如果当前宽依赖没有创建stage就创建stage if (!shuffleIdToMapStage.contains(dep.shuffleId)) { createShuffleMapStage(dep, firstJobId) } } // Finally, create a stage for the given shuffle dependency. createShuffleMapStage(shuffleDep, firstJobId) } }
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ShuffleMapStage( id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker) stageIdToStage(id) = stage shuffleIdToMapStage(shuffleDep.shuffleId) = stage updateJobIdStageIdMaps(jobId, stage) if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) } stage }
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null //创建finalStage 对象 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) //创建了一个ActiveJob对象封装一个作业 传入参数有jobid finalstage 在job对象中关联了finalstage val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job //在finalstage中关联job对象 finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //最终提交的时候其实是finalstage submitStage(finalStage) }
private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug(s"submitStage($stage (name=${stage.name};" + s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { //获取所有父级的stage val missing = getMissingParentStages(stage).sortBy(_.id) //如果missing为空,说明当前stage是第一个stage if (missing.isEmpty) { //那么调用submitMissingTasks方法提交当前stage submitMissingTasks(stage, jobId.get) } else { //如果当前stage不是第一个stage,往前继续寻找,递归遍历,直到找到第一个stage for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
private def submitMissingTasks(stage: Stage, jobId: Int) { val tasks: Seq[Task[_]] = { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() //进行模式匹配 stage match { //如果当前是ShuffleMapStage 通过partitionsToCompute计算所有分区,通过map遍历分区 case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id //对于每一个分区创建一个ShuffleMapTask new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } //如果当前是resultstage 通过partitionsToCompute计算所有分区,通过map遍历分区 case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) //对于每一个分区创建一个 ResultTask new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } } } if (tasks.size > 0) { logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) stage match { case stage: ShuffleMapStage => markMapStageJobsAsFinished(stage) case stage : ResultStage => logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})") } submitWaitingChildStages(stage) } }
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
继续跟踪 stage类的findMissingPartitions是一个抽象方法,它有两个子类 ShuffleMapStage 和 ResultStage 。
def findMissingPartitions(): Seq[Int]
分别看一下两个子类是如何实现的 它们得到的分区都是当前stage的最后一个rdd的分区
//ResultStage
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}
//ShuffleMapStage
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}
private def submitMissingTasks(stage: Stage, jobId: Int) { val tasks: Seq[Task[_]] = { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() //进行模式匹配 stage match { //如果当前是ShuffleMapStage 通过partitionsToCompute计算所有分区,通过map遍历分区 case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id //对于每一个分区创建一个ShuffleMapTask new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } //如果当前是resultstage 通过partitionsToCompute计算所有分区,通过map遍历分区 case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) //对于每一个分区创建一个 ResultTask new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } } } if (tasks.size > 0) { logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) stage match { case stage: ShuffleMapStage => markMapStageJobsAsFinished(stage) case stage : ResultStage => logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})") } submitWaitingChildStages(stage) } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。