1. RDD——Resillient Distributed Dataset 弹性分布式数据集。
2. Operation——作用于RDD的各种操作分为transformation和action。
3. Job——作业,一个JOB包含多个RDD及作用于相应RDD上的各种operation。
4. Stage——一个作业分为多个阶段。
5. Partition——数据分区, 一个RDD中的数据可以分成多个不同的区。
6. DAG——Directed Acycle graph,有向无环图,反应RDD之间的依赖关系。
7. Narrow dependency——窄依赖,子RDD依赖于父RDD中固定的data partition。
8. Wide Dependency——宽依赖,子RDD对父RDD中的所有data partition都有依赖。
9. Caching Managenment——缓存管理,对RDD的中间计算结果进行缓存管理以加快整体的处理速度。
1)调用applicaiton_jar.jar(应用程序入口函数),应用程序的运行过程依赖SparkContext,且需要初始化SparkContext sc,通过sc就可以创建RDD了(因为RDD是调用sc来创建的);
2)初始化SparkContext过程中会初始化DAGScheduler,并调用SparkContext.createTaskScheduler(this, master, deployMode)来初始化TaskScheduler、ShedulerBackend;
TaskSchedulerImpl类使用时,必选先调用TaskSchedulerImpl#initialize(backend: SchedulerBackend),TaskSchedulerImpl#start(),然后才可以调用TaskSchedulerImpl#submitTasks(taskSet: TaskSet)来提交任务,从initialize(backend: SchedulerBackend)的参数上可以看出,TaskSchedulerImpl使用过程中需要依赖SchedulerBackend。
2)Spark stages 是将RDD图在Shuffle边界处断开来创建的。具有“窄(narrow)”依赖关系的RDD操作(如map()和filter())在每个阶段中被流水线连接到一组任务中,但是具有shuffle依赖关系的操作需要多个阶段(一个阶段写入一组映射输出文件,另一个阶段在屏障后读取这些文件)。最后,每个阶段将只具有对其他阶段的shuffle依赖,并且可以在其中计算多个操作。这些操作的实际管道化发生在各种RDD的rdd.compute()函数中。
4)要从故障中恢复,同一阶段可能需要多次运行,这称为“attempts”。如果 TaskScheduler 报告某个任务由于前一阶段的映射输出文件丢失而失败,则DAGScheduler将重新提交该丢失的阶段。这是通过具有FetchFailed的CompletionEvent或ExecutorLost事件检测到的。DAGScheduler将等待一小段时间来查看其他节点或任务是否失败,然后为计算丢失任务的任何丢失阶段重新提交TaskSets(任务集)。作为这个过程的一部分,我们可能还必须为以前清理stage objects的旧(已完成)stage创建stage objects。由于stage的旧“attempts”中的任务可能仍在运行,因此必须小心映射在正确的stage对象中接收到的任何事件。
-Cache tracking: DagScheduler会找出缓存哪些RDD以避免重新计算它们,同样会记住哪些shuffle map stage已经生成了输出文件,以避免重复shuffle的映射端。
-Preferred locations:DAGScheduler还根据其底层RDD的首选位置,或缓存或无序处理数据的位置,计算在阶段中运行每个任务的位置。
private[spark] class DAGScheduler( private[scheduler] val sc: SparkContext, private[scheduler] val taskScheduler: TaskScheduler, listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv, clock: Clock = new SystemClock()) extends Logging { def this(sc: SparkContext, taskScheduler: TaskScheduler) = { this( sc, taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) } def this(sc: SparkContext) = this(sc, sc.taskScheduler)
√)sc: SparkContext:当前SparkContext对象,就是applicaiton_jar.jar的main函数调用时初始化的SparkContext对象,而DAGScheduler在SparkContext初始化时初始化的SarpkContext的属性。
√)taskScheduler: TaskScheduler:和DAGScheduler、SchedulerBackend都是在SparkContext初始化时初始化的SparkContext的属性,因此该参数从当前sc内置的taskScheduler获取。
√)listenerBus: LiveListenerBus:异步处理事件的对象,从sc中获取。https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
√)mapOutputTracker: MapOutputTrackerMaster:运行在Driver端管理shuffle map task的输出,从sc属性env:SparkEnv的mapOutputTracker属性获取。https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
√)blockManagerMaster: BlockManagerMaster:运行在Driver端,管理整个Job的Block信息,从sc中env.blockManager.master获取。https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
√)env: SparkEnv:Spark的运行环境,从sc的env属性获取。
private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this) private[scheduler] val nextJobId = new AtomicInteger(0) private[scheduler] def numTotalJobs: Int = nextJobId.get() private val nextStageId = new AtomicInteger(0) private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] private[scheduler] val stageIdToStage = new HashMap[Int, Stage] /** * Mapping from shuffle dependency ID to the ShuffleMapStage that will generate the data for * that dependency. Only includes stages that are part of currently running job (when the job(s) * that require the shuffle stage complete, the mapping will be removed, and the only record of * the shuffle data will be in the MapOutputTracker). */ private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage] private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] // Stages we need to run whose parents aren't done private[scheduler] val waitingStages = new HashSet[Stage] // Stages we are running right now private[scheduler] val runningStages = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures private[scheduler] val failedStages = new HashSet[Stage] private[scheduler] val activeJobs = new HashSet[ActiveJob] /** * Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids * and its values are arrays indexed by partition numbers. Each array value is the set of * locations where that RDD partition is cached. * * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454). */ private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]] // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with // every task. When we detect a node failing, we note the current epoch number and failed // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results. // // TODO: Garbage collect information about failure epochs when we know there are no more // stray messages to detect. private val failedEpoch = new HashMap[String, Long] private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator // A closure serializer that we reuse. // This is only safe because DAGScheduler runs in a single thread. private val closureSerializer = SparkEnv.get.closureSerializer.newInstance() /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */ private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false) /** * Whether to unregister all the outputs on the host in condition that we receive a FetchFailure, * this is set default to false, which means, we only unregister the outputs related to the exact * executor(instead of the host) on a FetchFailure. */ private[scheduler] val unRegisterOutputOnHostOnFetchFailure = sc.getConf.get(config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE) /** * Number of consecutive stage attempts allowed before a stage is aborted. */ private[scheduler] val maxConsecutiveStageAttempts = sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) /** * Number of max concurrent tasks check failures for each barrier job. */ private[scheduler] val barrierJobIdToNumTasksCheckFailures = new ConcurrentHashMap[Int, Int] /** * Time in seconds to wait between a max concurrent tasks check failure and the next check. */ private val timeIntervalNumTasksCheck = sc.getConf .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL) /** * Max number of max concurrent tasks check failures allowed for a job before fail the job * submission. */ private val maxFailureNumTasksCheck = sc.getConf .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES) private val messageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message") private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
√)metricsSource: DAGSchedulerSource:metrics system的Source角色,内注册了failedStages、runningStages、waitingStages、allJobs、activeJobs这些度量监控。https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
√)numTotalJobs: Int:总的job数
√)jobIdToStageIds:HashMap[Int, HashSet[Int]]:记录某个job与其包含的所有stageId的映射
√)stageIdToStage:HashMap[Int, Stage]:记录stageId与Stage的映射
√)shuffleIdToMapStage:HashMap[Int, ShuffleMapStage]:记录每一个shuffle对应的ShuffleMapStage,key为shuffleId。
√)jobIdToActiveJob:HashMap[Int, ActiveJob]:记录处于Active状态的Job,key为jobId,value为ActiveJob类型的对象。
√)waitingStages:HashSet[Stage]:等待运行的stage,一般这些是在登台Parent Stage运行完成才能开始。
√)failedStages:HashSet[Stage]:处于Failed状态的stage,失败原因因为fetch failures的stage,并等待重新提交。
√)cacheLocs:HashMap[Int, IndexedSeq[Seq[TaskLocation]]]:维护着RDD的partitions 的 location信息。Map的key是rdd的id,value是rdd对应的partition编号索引的数组。每个数组值都是缓存该rdd partition的location set集合
√)failedEpoch:HashMap[String, Long]:对于跟踪失败的节点,我们使用MapOutputTracker的epoch编号,它与每个任务一起发送。当我们检测到一个节点失败时,我们记录到当前的epoch编号和失败的执行器,为新任务增加它,并使用它忽略杂散的ShuffleMapTask结果。
√)barrierJobIdToNumTasksCheckFailures:ConcurrentHashMap[Int, Int]:每个屏障作业的最大并发任务检查失败数。
√)messageScheduler:ScheduledExecutorService:dag-scheduler-message线程池调度器(调度的线程内部通过eventProcessLoop来实现: ResubmitFailedStages/JobSubmitted )
√)eventProcessLoop:DAGSchedulerEventProcessLoop:DAGSchedulerEventProcessLoop类定义在DAGScheduler类文件下,集成了EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") 。
- EventLoop实际上内置了一个用来存储消息的队列,对外提供了post方法用来接收消息存放到队列中,一个消费队列中消息的线程,消费线程以死循环方式获取队列中消息,当获取到消息后调用 onReceive(msg)进行消息处理。
- DAGSchedulerEventProcessLoop的构造函数接收dagScheduler: DAGScheduler,在onReceive方法中会根据消息类型调用dagScheduler的不同方法进行消息处理。
- DAGScheduler与EventLoop之间配合工作图:
在DAGScheduler类的属性中定义eventProcessLoop:DAGSchedulerEventProcessLoop成员变量,DAGScheduler类初始化过程中会初始化变量eventProcessLoop = new DAGSchedulerEventProcessLoop(this),初始化最后一步是调用eventProcessLoop.start()来启动该事件循环处理。
/** * An event loop to receive events from the caller and process all events in the event thread. It * will start an exclusive event thread to process all events. * * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can * handle events in time to avoid the potential OOM. */ private[spark] abstract class EventLoop[E](name: String) extends Logging { private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() private val stopped = new AtomicBoolean(false) // Exposed for testing. private[spark] val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } } def start(): Unit = { if (stopped.get) { throw new IllegalStateException(name + " has already been stopped") } // Call onStart before starting the event thread to make sure it happens before onReceive onStart() eventThread.start() } def stop(): Unit = { if (stopped.compareAndSet(false, true)) { eventThread.interrupt() var onStopCalled = false try { eventThread.join() // Call onStop after the event thread exits to make sure onReceive happens before onStop onStopCalled = true onStop() } catch { case ie: InterruptedException => Thread.currentThread().interrupt() if (!onStopCalled) { // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since // it's already called. onStop() } } } else { // Keep quiet to allow calling `stop` multiple times. } } /** * Put the event into the event queue. The event thread will process it later. */ def post(event: E): Unit = { eventQueue.put(event) } /** * Return if the event thread has already been started but not yet stopped. */ def isActive: Boolean = eventThread.isAlive /** * Invoked when `start()` is called but before the event thread starts. */ protected def onStart(): Unit = {} /** * Invoked when `stop()` is called and the event thread exits. */ protected def onStop(): Unit = {} /** * Invoked in the event thread when polling events from the event queue. * * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked * and cannot process events in time. If you want to call some blocking actions, run them in * another thread. */ protected def onReceive(event: E): Unit /** * Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError` * will be ignored. */ protected def onError(e: Throwable): Unit }
1)内置了一个消息队列eventQueue: BlockingQueue[E],配合实现消息存储、消息消费使用;
3)内置了一个消费线程eventThread,消费线程以阻塞死循环方式消费队列中的消息,消费处理接口函数是onReceive(event: E),消费异常函数接口onError(e: Throwable);
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging { private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer /** * The main event loop of the DAG scheduler. */ override def onReceive(event: DAGSchedulerEvent): Unit = { val timerContext = timer.time() try { doOnReceive(event) } finally { timerContext.stop() } } private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) case StageCancelled(stageId, reason) => dagScheduler.handleStageCancellation(stageId, reason) case JobCancelled(jobId, reason) => dagScheduler.handleJobCancellation(jobId, reason) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId, reason) => val workerLost = reason match { case SlaveLost(_, true) => true case _ => false } dagScheduler.handleExecutorLost(execId, workerLost) case WorkerRemoved(workerId, host, message) => dagScheduler.handleWorkerRemoved(workerId, host, message) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case SpeculativeTaskSubmitted(task) => dagScheduler.handleSpeculativeTaskSubmitted(task) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion: CompletionEvent => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() } override def onError(e: Throwable): Unit = { logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e) try { dagScheduler.doCancelAllJobs() } catch { case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) } dagScheduler.sc.stopInNewThread() } override def onStop(): Unit = { // Cancel any active jobs in postStop hook dagScheduler.cleanUpAfterSchedulerStop() } }
3)在内部阻塞死循环方式去从队列中获取DAGSchedulerEvent事件,获取到后并处理它[调用onReceive(event: DAGSchedulerEvent)方法];
2)根据RDD DAG划分Stages
当一个spark application代码被提交yanr上时,比如yarn-cluster方式提交,通过SparkSubmit->YarnClusterApplication类中运行的是Client中run方法,Client#run()->ApplicationMaster#userClassThread用来执行application main的线程,当执行applicatin main函数时,会先初始化SparkContext对象,在初始化SparkContext过程会初始化DAGScheduler:
@volatile private var _dagScheduler: DAGScheduler = _ 。。。 private[spark] def dagScheduler: DAGScheduler = _dagScheduler private[spark] def dagScheduler_=(ds: DAGScheduler): Unit = { _dagScheduler = ds } 。。。 // Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor _taskScheduler.start()
在DAGScheduler初始化过程中会初始化DAGScheduler变量eventProcessLoop = new DAGSchedulerEventProcessLoop(this),初始化最后一步是调用eventProcessLoop.start()来启动该事件循环处理。
2)DAGScheduler之根据RDD DAG划分Stages
application jar的代码[RDD(Spark Core),注意Dataset、DataFrame、sparkSession.sql("select ...")经过catalyst代码解析会将代码转化为RDD,
Each RDD has 2 sets of parallel operations: transformation and action.(1)Transformation:Return a MappedRDD[U] by applying function f to each element
sample(withReplacement, fraction, seed)
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
pipe(command, [envVars])
(2)Action:Return T by reducing the elements using specified commutative and associative binary operator
takeOrdered(n, [ordering])
在applicaiton jar代码开始执行时,当遇到action操作时,就会调用sc.runJob(...)。下边以WordCount为例来展开RDD DAG图划分Stage流程:
import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { val wordFile = "E:\\personOrder.csv" val conf = new SparkConf().setMaster("local[1,1]").setAppName("wordcount"); val sc = new SparkContext(conf) val input = sc.textFile(wordFile, 2).cache() val lines = input.flatMap(line => line.split("[,|-]")) val count = lines.map(word => (word, 1)).reduceByKey { case (x, y) => x + y } count.foreach(println) } }
当application jar的main被调用,代码执行到count.foreach(println)时,RDD#foreach底层实现如下:
/** * Applies a function f to all elements of this RDD. */ def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like `first()` * @param resultHandler callback to pass each result to */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }
3)partitions:要运行的分区集;某些作业可能不希望在目标RDD的所有分区上进行计算,例如对于“first()”之类的操作。`。生成方式:{0 until rdd.partitions.length}
/** * Return the first element in this RDD. */ def first(): T = withScope { take(1) match { case Array(t) => t case _ => throw new UnsupportedOperationException("empty collection") } }
/** * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @note Due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */ def take(num: Int): Array[T] = withScope { val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2) if (num == 0) { new Array[T](0) } else { val buf = new ArrayBuffer[T] val totalParts = this.partitions.length var partsScanned = 0 while (buf.size < num && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1L val left = num - buf.size if (partsScanned > 0) { // If we didn't find any rows after the previous iteration, quadruple and retry. // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. if (buf.isEmpty) { numPartsToTry = partsScanned * scaleUpFactor } else { // As left > 0, numPartsToTry is always >= 1 numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor) } } val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p) res.foreach(buf ++= _.take(num - buf.size)) partsScanned += p.size } buf.toArray } }
/** * Run a function on a given set of partitions in an RDD and return the results as an array. * The function that is run against each partition additionally takes `TaskContext` argument. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like `first()` * @return in-memory collection with a result of the job (each collection element will contain * a result from one partition) */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U] = { val results = new Array[U](partitions.size) runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res) results }
上边代码中:resultHadnler是:(index, res) => results(index) = res;将每个partittion中计算结果,赋值给results[partition index],最终并返回results结果集。
/** * Run an action job on the given RDD and pass all the results to the resultHandler function as * they arrive. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like first() * @param callSite where in the user program this job was called * @param resultHandler callback to pass each result to * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name * * @note Throws `Exception` when the job fails */ def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception } }
6)properties:要附加到此作业的scheduler属性,例如fair scheduler pool name
3)根据waiter完成后返回值作相应响应:Success,打印:‘Job x finished:xxx’;Failure,打印:‘Job x failed:xxx’,并抛出异常。
/** * Submit an action job to the scheduler. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like first() * @param callSite where in the user program this job was called * @param resultHandler callback to pass each result to * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name * * @return a JobWaiter object that can be used to block until the job finishes executing * or can be used to cancel the job. * * @throws IllegalArgumentException when partitions ids are illegal */ def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: BarrierJobSlotsNumberCheckFailed => logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " + "than the total number of slots in the cluster currently.") // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically. val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, Int, Int] { override def apply(key: Int, value: Int): Int = value + 1 }) if (numCheckFailures <= maxFailureNumTasksCheck) { messageScheduler.schedule( new Runnable { override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func, partitions, callSite, listener, properties)) }, timeIntervalNumTasksCheck, TimeUnit.SECONDS ) return } else { // Job failed, clear internal data. barrierJobIdToNumTasksCheckFailures.remove(jobId) listener.jobFailed(e) return } case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } // Job submitted, clear internal data. barrierJobIdToNumTasksCheckFailures.remove(jobId) val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) }
比如上图中,在RDD G处调用了Action操作,在划分Stage时,会从G开始逆向分析,G依赖于B和F,其中对B是窄依赖,对F是宽依赖,所以F和G不能算在同一个Stage中,即在F和G之间会有一个Stage分界线。上图中还有一处宽依赖在A和B之间,所以这里还会分出一个Stage。最终形成了3个Stage,由于Stage1和Stage2是相互独立的,所以可以并发执行,等Stage1和Stage2准备就绪后,Stage3才能开始执行。
private[scheduler] abstract class Stage( val id: Int, val rdd: RDD[_], val numTasks: Int, val parents: List[Stage], val firstJobId: Int, val callSite: CallSite) extends Logging { val numPartitions = rdd.partitions.length /** Set of jobs that this stage belongs to. */ val jobIds = new HashSet[Int] /** The ID to use for the next new attempt for this stage. */ private var nextAttemptId: Int = 0 val name: String = callSite.shortForm val details: String = callSite.longForm /** * Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized * here, before any attempts have actually been created, because the DAGScheduler uses this * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts * have been created). */ private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId) /** * Set of stage attempt IDs that have failed. We keep track of these failures in order to avoid * endless retries if a stage keeps failing. * We keep track of each attempt ID that has failed to avoid recording duplicate failures if * multiple tasks from the same stage attempt fail (SPARK-5945). */ val failedAttemptIds = new HashSet[Int] private[scheduler] def clearFailures() : Unit = { failedAttemptIds.clear() } /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */ def makeNewStageAttempt( numPartitionsToCompute: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = { val metrics = new TaskMetrics metrics.register(rdd.sparkContext) _latestInfo = StageInfo.fromStage( this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences) nextAttemptId += 1 } /** Returns the StageInfo for the most recent attempt for this stage. */ def latestInfo: StageInfo = _latestInfo override final def hashCode(): Int = id override final def equals(other: Any): Boolean = other match { case stage: Stage => stage != null && stage.id == id case _ => false } /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */ def findMissingPartitions(): Seq[Int] }
Stage的RDD参数只有一个RDD, final RDD, 而不是一系列的RDD。
因为在一个stage中的所有RDD都是map, partition不会有任何改变, 只是在data依次执行不同的map function所以对于TaskScheduler而言, 一个RDD的状况就可以代表这个stage。
在Spark Job中,所有任务都具有相同的shuffle依赖项。每个任务的DAG运行由调度程序在发生shuffle的边界处分为多个stages,然后DagScheduler以拓扑顺序运行这些阶段。
每个stage都可以是shuffle map stage,在这种情况下,任务的结果将输入到其他阶段或结果阶段,在这种情况下,其任务直接计算Spark action(例如count()、save()等)在RDD上运行函数。对于shuffle map stages,我们还跟踪每个输出分区所在的节点。
最后,由于故障恢复,一个stage可以在多次尝试中重新执行。在这种情况下,stage对象将跟踪要传递给侦听器(listeners)或Web UI的多个StageInfo对象。
@id 唯一阶段ID
@rdd 这个阶段运行的RDD:对于shuffle map stage,是我们运行映射任务的RDD,而对于结果阶段,是我们运行操作的目标RDD
@tasks 阶段中任务的总数;结果阶段可能不需要计算所有分区,例如first()、lookup()和take()。
@parents 此阶段依赖的阶段列表(通过shuffle dependencies)。
@firstJobId 此阶段所属的第一个作业的ID,用于FIFO调度。
@callSite 与此阶段关联的用户程序中的调用站点位置:创建目标RDD的位置、shuffle map stage的位置或调用结果阶段的action的位置。
var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: BarrierJobSlotsNumberCheckFailed => logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " + "than the total number of slots in the cluster currently.") // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically. val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, Int, Int] { override def apply(key: Int, value: Int): Int = value + 1 }) if (numCheckFailures <= maxFailureNumTasksCheck) { messageScheduler.schedule( new Runnable { override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func, partitions, callSite, listener, properties)) }, timeIntervalNumTasksCheck, TimeUnit.SECONDS ) return } else { // Job failed, clear internal data. barrierJobIdToNumTasksCheckFailures.remove(jobId) listener.jobFailed(e) return } case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return }
上边这段代码是 DAGScheduler#handleJobSubmitted 中划分Stage的主要实现代码。前面提到了Stage的划分是从最后一个Stage开始逆推的,每遇到一个宽依赖处,就分裂成另外一个Stage,依此类推直到Stage划分完毕为止。并且,只有最后一个Stage的类型是ResultStage类型。
/** * Create a ResultStage associated with the provided jobId. */ private def createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) // 获取当前Stage的parent Stage,这个方法是划分Stage的核心实现 val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() // 创建当前最后的ResultStage val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) // 将ResultStage与stageId相关联 stageIdToStage(id) = stage // 更新该job中包含的stage updateJobIdStageIdMaps(jobId, stage) // 返回ResultStage stage }
3)checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size):检查以确保我们不使用不支持的RDD链模式启动屏障阶段。不支持以下模式:
/** * Get or create the list of parent stages for a given RDD. The new Stages will be created with * the provided firstJobId. */ private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList }
/** * Returns shuffle dependencies that are immediate parents of the given RDD. * * This function will not return more distant ancestors. * For example, if C has a shuffle dependency on B which has a shuffle dependency on A: * A <-- B <-- C calling this function with rdd C will only return the B <-- C dependency. * * This function is scheduler-visible for the purpose of unit testing. */ private[scheduler] def getShuffleDependencies( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { //用来存放依赖 val parents = new HashSet[ShuffleDependency[_, _, _]] //遍历过的RDD放入这个里面 val visited = new HashSet[RDD[_]] //创建一个待遍历RDD的栈结构 val waitingForVisit = new ArrayStack[RDD[_]] //压入finalRDD,逻辑图中的RDD9 waitingForVisit.push(rdd) //循环遍历这个栈结构 while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() // 如果RDD没有被遍历过,则执行if内部的代码 if (!visited(toVisit)) { //然后把其放入已经遍历队列中 visited += toVisit //得到依赖,我们知道依赖中存放的有父RDD的对象 toVisit.dependencies.foreach { //如果这个依赖是shuffle依赖,则放入返回队列中 case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep //如果不是shuffle依赖,把其父RDD压入待访问栈中,从而进行循环 case dependency => waitingForVisit.push(dependency.rdd) } } } parents }
如果shuffleIdToMapStage中存在shuffle,则获取shuffle map stage。否则,如果shuffle map stage不存在,该方法将创建shuffle map stage以及任何丢失的parent shuffle map stage。
/** * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the * shuffle map stage doesn't already exist, this method will create the shuffle map stage in * addition to any missing ancestor shuffle map stages. */ private def getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleIdToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => // Create stages for all missing ancestor shuffle dependencies. getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies // that were not already in shuffleIdToMapStage, it's possible that by the time we // get to a particular dependency in the foreach loop, it's been added to // shuffleIdToMapStage by the stage creation process for an earlier dependency. See // SPARK-13902 for more information. if (!shuffleIdToMapStage.contains(dep.shuffleId)) { createShuffleMapStage(dep, firstJobId) } } // Finally, create a stage for the given shuffle dependency. createShuffleMapStage(shuffleDep, firstJobId) } }
创建一个ShuffleMapStage,它生成给定的shuffle依赖项的分区。如果先前运行的stage生成了相同的shuffle 数据,则此函数将复制先前shuffle 中仍然可用的输出位置,以避免不必要地重新生成数据。
/** * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a * previously run stage generated the same shuffle data, this function will copy the output * locations that are still available from the previous shuffle to avoid unnecessarily * regenerating data. */ def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ShuffleMapStage( id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker) stageIdToStage(id) = stage shuffleIdToMapStage(shuffleDep.shuffleId) = stage updateJobIdStageIdMaps(jobId, stage) if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) } stage }
2)《Spark Scheduler模块详解-DAGScheduler实现》
4)《Spark Scheduler模块源码分析之DAGScheduler》
5)《Spark Scheduler模块源码分析之TaskScheduler和SchedulerBackend》
6)《spark DAGScheduler、TaskSchedule、Executor执行task源码分析》