路径是 core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scala
/** * Run a function on a given set of partitions in an RDD and pass the results to the given handler function. This is the main entry point for all actions in Spark. * 在RDD一系列分区上运行一个函数并且将结果传递给下一个处理函数。这是spark中action算子的主要入口 * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all partitions of the target RDD, e.g. for operations like `first()` * @param resultHandler callback to pass each result to */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } // 直接调用了dagScheduler的runJob()方法 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }
** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of * stages for each job, keeps track of which RDDs and stage outputs are materialize * d, and finds a minimal schedule to run the job. It then submits stages as TaskSets to an underlying * TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent * tasks that can run right away based on the data that's already on the cluster (e.g. map output * files from previous stages), though it may fail if this data becomes unavailable. 面向stage的高层次的调度。它会给每个job都计算一个DAG有向无环图 追踪每个rdd的输出是否被物化了,并且寻找执行job的一个最小调度 以taskset的方式提交一个stage并且以TaskScheduler的实现类在集群上执行。 * In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred * locations to run each task on, based on the current cache status, and passes these to the * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task * a small number of times before cancelling the whole stage. 除了处理stage的DAG,DAGScheduler还决定运行每个task的最佳位置,基于当前的缓存状态,将这些最佳位置提供给底层的TaskSchedulerImpl去运行 此外,他还会处理由于shuffle输出文件丢失导致的失败,这种情况下,旧的stage需要重新被提交。在一个stage内,不是由于shuffle输出文件丢失导致的失败, 会被TaskScheduler去处理,他会多次运行task,直到取消整个stage */
/** * Run an action job on the given RDD and pass all the results to the resultHandler function as * they arrive. *在给定的RDD上运行一个action job 并且把结果传递给resultHandler */ def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime // 核心是调用了submitJob()方法 val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) }
def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // 确认不会在不存在的partition上启动task val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException() } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) // 在这里调用了JobSubmitted()方法 eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // 首先用触发job的最后一个RDD创建一个resultStage finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { // 进行异常捕捉,因为直接创建一个finalStage 可能会报错。比如job可能会运行一个hadoopDD但是HDFS文件却被删了。 } // Job submitted, clear internal data. barrierJobIdToNumTasksCheckFailures.remove(jobId) // 用finalstage创建一个job val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) /** * cacheLocas:缓存的每个RDD的所有分区的位置信息,最终建立已经缓存的分区号和位置信息序列映射。为什么分区号和位置还会是序列呢? * 因为每一个分区可能存在多个副本机制,因此RDD的每一个分区的BLock可能存在多个节点的BlockManager上,因此是序列 */ // 这里将rdd缓存的位置信息全部清除 clearCacheLocs() // job添加到内存 val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) // 重要!!!stage提交算法 submitStage(finalStage) }
2.用finalstage创建一个job (job以action算子划分)
private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) // 获取finalstage对应的jobid if (jobId.isDefined) { if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { // 获取当前stage的父stage val missing = getMissingParentStages(stage).sortBy(_.id) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") // 如果没有父stage说明整个job就只有一个satge 直接提交这个stage中的task submitMissingTasks(stage, jobId.get) } else { // 核心!!!如果有父stage,遍历所有的父stage,递归寻找stage的父stage for (parent <- missing) { submitStage(parent) } waitingStages += stage // 并把当前stage加入到等待调度的队列中 } } }
/** *这是一个宽度优先遍历的算法(sparkstreaming中划分stage也是这种算法)。通过递归调用遍历上游依赖,直到找到需要进行实际计算的最小集合 */ // 接收一个stage,返回的是一个stage的集合,也就是该stage所有的父stage private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] val waitingForVisit = new ArrayStack[RDD[_]] // 手动维护一个栈 避免因为递归调用导致内存溢出 def visit(rdd: RDD[_]) { // 定义一个遍历rdd的方法,接收一个RDD if (!visited(rdd)) { visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) { // 如果这个RDD中有没有缓存的partition就遍历这个RDD的依赖 这是为啥? for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => // 遍历这个RDD的依赖,如果是宽依赖,那就说明是2个stage 就创建一个shuffleMapStage。除了最后一个stage是resultStage,其他都是shuffleMapStage val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage // 把这个shuffleMapStage加入父stage集合返回 } case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) // 如果是窄依赖,那么就把窄依赖的这个rdd推入栈中 } } } } } waitingForVisit.push(stage.rdd) // 首先往栈中推入了该stage的rdd // 只要还有rdd没有遍历到,就循环遍历这个rdd while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) // 依次遍历该栈中所有的rdd,这里注意,如果从waitingForVisit中pop()出来了一个rdd,遍历他的依赖发现是个窄依赖,那么会将他窄依赖的那个rdd也推入到waitingForVisit这个栈中 } missing.toList // 将父stage返回 }
-> DAGScheduler.runJob->submitJob(rdd, func, partitions, callSite, resultHandler, properties) -> DAGScheduler.eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) -> DAGSchedulerEventProcessLoop-> case JobSubmitted(jobId, dependency, callSite, listener, properties)-> -> DAGScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) -> DAGScheduler.createResultStage(finalRDD, func, partitions, jobId, callSite) (创建resultStage) -> DAGScheduler.submitStage(finalStage) (广度优先算法,建立无环图) -> DAGScheduler.getMissingParentStages(stage) -> DAGScheduler.submitMissingTasks(stage, jobId.get)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。