赞
踩
因为程序是分布式集群中运行的,划分Stage的原因是由于任务类型的不一致:宽依赖和窄依赖。有的任务只依赖于一个父的RDD:一个Stage依赖于另一个Stage的结果,所以可以和父RDD放到同一task中执行,构成一个Stage。但是有的任务(例如reduce,group)等,需要的是全局数据,依赖于多个分区的处理结果,所以只等等待前面的执行,这就有了先后顺序(Stage),所以要划分。
划分出来的stage是放在集群中运行的,其中每个stage有多个task。每个task的逻辑一样,只是对应的分区是不相同。这多个task被分布在不同的机器上并发执行。不同的资源调度框架:Yarn、Mesos、Local等生成的DAG是完全一样,可以跨平台运行保证结果。
每个stage由多个task组成,这些task在任务逻辑上是一致的只是对应不同的分区。Partition的数量和同一任务逻辑的Task的数量是一致的,也就是说一个分区会被对应的stage的一个task处理,由多个Executor争抢。如果计算资源充足,一个Executor执行一个,不足会计算多个task。
宽依赖是DAG的分界线,也是stage的分界线
handleJobSubmitted开始stage的划分:从G开始,G依赖于B和F,先处理B还是F是随机的。由于B是G的窄依赖,所以G和B在同一个stage中。而对于F是G的宽依赖,中间存在Shuffle,只能在F执行完才能执行G,所以F比G早一个stage。AB之间也是宽依赖,所以B跟A不能是同一个阶段,A单独一个阶段。所以BG所在的stage3有两个父依赖阶段。stage1和2是独立的,可以并发,但是他们对于stage3来说是要等待其执行结果的
DAGScheduler随着SparkContext的创建而创建。
程序入口:
1.SparkContext的submitMapStage
- /**
- * Submit a map stage for execution. This is currently an internal API only, but might be
- * promoted to DeveloperApi in the future.
- */
- private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
- : SimpleFutureAction[MapOutputStatistics] = {
- assertNotStopped()
- val callSite = getCallSite()
- var result: MapOutputStatistics = null
- val waiter = dagScheduler.submitMapStage(
- dependency,
- (r: MapOutputStatistics) => { result = r },
- callSite,
- localProperties.get)
- new SimpleFutureAction[MapOutputStatistics](waiter, result)
- }

2.DAGScheduler的submitMapStage
- /**
- * Submit a shuffle map stage to run independently and get a JobWaiter object back. The waiter
- * can be used to block until the job finishes executing or can be used to cancel the job.
- * This method is used for adaptive query planning, to run map stages and look at statistics
- * about their outputs before submitting downstream stages.
- *
- * @param dependency the ShuffleDependency to run a map stage for
- * @param callback function called with the result of the job, which in this case will be a
- * single MapOutputStatistics object showing how much data was produced for each partition
- * @param callSite where in the user program this job was submitted
- * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
- */
- def submitMapStage[K, V, C](
- dependency: ShuffleDependency[K, V, C],
- callback: MapOutputStatistics => Unit,
- callSite: CallSite,
- properties: Properties): JobWaiter[MapOutputStatistics] = {
-
- val rdd = dependency.rdd
- val jobId = nextJobId.getAndIncrement()
- if (rdd.partitions.length == 0) {
- throw new SparkException("Can't run submitMapStage on RDD with 0 partitions")
- }
-
- // We create a JobWaiter with only one "task", which will be marked as complete when the whole
- // map stage has completed, and will be passed the MapOutputStatistics for that stage.
- // This makes it easier to avoid race conditions between the user code and the map output
- // tracker that might result if we told the user the stage had finished, but then they queries
- // the map output tracker and some node failures had caused the output statistics to be lost.
- val waiter = new JobWaiter[MapOutputStatistics](
- this, jobId, 1,
- (_: Int, r: MapOutputStatistics) => callback(r))
- eventProcessLoop.post(MapStageSubmitted(
- jobId, dependency, callSite, waiter, Utils.cloneProperties(properties)))
- waiter
- }

3.eventProcessLoop的post(事件循环器典型代码,项目中可参考),与上一篇类似,最终调用到DAGScheduler的doOnReceive方法
- case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
- dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
4.DAGScheduler的handleMapStageSubmitted方法
- private[scheduler] def handleMapStageSubmitted(jobId: Int,
- dependency: ShuffleDependency[_, _, _],
- callSite: CallSite,
- listener: JobListener,
- properties: Properties): Unit = {
- // Submitting this map stage might still require the creation of some parent stages, so make
- // sure that happens.
- var finalStage: ShuffleMapStage = null
- try {
- // New stage creation may throw an exception if, for example, jobs are run on a
- // HadoopRDD whose underlying HDFS files have been deleted.
- finalStage = getOrCreateShuffleMapStage(dependency, jobId)
- } catch {
- case e: Exception =>
- logWarning("Creating new stage failed due to exception - job: " + jobId, e)
- listener.jobFailed(e)
- return
- }
-
- val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
- clearCacheLocs()
- log.info("Got map stage job %s (%s) with %d output partitions".format(
- jobId, callSite.shortForm, dependency.rdd.partitions.length))
- log.info("Final stage: " + finalStage + " (" + finalStage.name + ")")
- log.info("Parents of final stage: " + finalStage.parents)
- log.info("Missing parents: " + getMissingParentStages(finalStage))
-
- val jobSubmissionTime = clock.getTimeMillis()
- jobIdToActiveJob(jobId) = job
- activeJobs += job
- finalStage.addActiveJob(job)
- val stageIds = jobIdToStageIds(jobId).toArray
- val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
- listenerBus.post(
- SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
- Utils.cloneProperties(properties)))
- submitStage(finalStage)
-
- // If the whole stage has already finished, tell the listener and remove it
- if (finalStage.isAvailable) {
- markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))
- }
- }

5.DAGScheduler的getOrCreateShuffleMapStage,里面获取所有的missing stage并createShuffleMapStage
- // New stage creation may throw an exception if, for example, jobs are run on a
- // HadoopRDD whose underlying HDFS files have been deleted.
- finalStage = getOrCreateShuffleMapStage(dependency, jobId)
-
- /**
- * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
- * shuffle map stage doesn't already exist, this method will create the shuffle map stage in
- * addition to any missing ancestor shuffle map stages.
- * 如果 shuffleIdToMapStage 中存在一个随机映射阶段,则获取一个随机映射阶段。
- * 否则,如果 shuffle map 阶段尚不存在,除了任何缺失的祖先 shuffle map 阶段之外,
- * 此方法还将创建 shuffle map 阶段。
- */
- private def getOrCreateShuffleMapStage(
- shuffleDep: ShuffleDependency[_, _, _],
- firstJobId: Int): ShuffleMapStage = {
- shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
- case Some(stage) =>
- stage
-
- case None =>
- // Create stages for all missing ancestor shuffle dependencies.
- getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
- // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
- // that were not already in shuffleIdToMapStage, it's possible that by the time we
- // get to a particular dependency in the foreach loop, it's been added to
- // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
- // SPARK-13902 for more information.
- // 尽管 getMissingAncestorShuffleDependencies
- // 只返回 shuffleIdToMapStage 中尚未包含的 shuffle 依赖项,
- // 但当我们到达 foreach 循环中的特定依赖项时,
- // 它可能已被早期依赖项的阶段创建过程添加到 shuffleIdToMapStage 中。
- // 有关更多信息,请参阅 SPARK-13902。
- if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
- createShuffleMapStage(dep, firstJobId)
- }
- }
- // Finally, create a stage for the given shuffle dependency.
- createShuffleMapStage(shuffleDep, firstJobId)
- }
- }

6.之后就是DAGScheduler的submitStage
提交stage:将所有没有提交的ParentStage提交运行,然后提交当前stage
- /** Submits stage, but first recursively submits any missing parents. */
- private def submitStage(stage: Stage): Unit = {
- val jobId = activeJobForStage(stage)
- if (jobId.isDefined) {
- log.debug(s"submitStage($stage (name=${stage.name};" +
- s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
- if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
- val missing = getMissingParentStages(stage).sortBy(_.id)
- log.debug("missing: " + missing)
- if (missing.isEmpty) {
- // 找不到父Stag说明是最开始stage,也就是触发action的RDD
- log.info("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
- submitMissingTasks(stage, jobId.get)
- } else {
- // 先提交父stage,对顺序进行了控制
- for (parent <- missing) {
- submitStage(parent)
- }
- waitingStages += stage
- }
- }
- } else {
- abortStage(stage, "No active job for stage " + stage.id, None)
- }
- }

7.DAGScheduler的submitMissingTasks。分为ShuffleMapTask和ResultTask,两种类型的task的处理逻辑不一样。
-
-
- /** Called when stage's parents are available and we can now do its task. */
- private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
- log.debug("submitMissingTasks(" + stage + ")")
-
- // Before find missing partition, do the intermediate state clean work first.
- // The operation here can make sure for the partially completed intermediate stage,
- // `findMissingPartitions()` returns all partitions every time.
- // 在找到丢失的分区之前,先做中间状态清理工作
- // 这里的操作可以确保部分完成的中间阶段
- // `findMissingPartitions()` 每次都返回所有分区
- stage match {
- case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
- mapOutputTracker.unregisterAllMapOutput(sms.shuffleDep.shuffleId)
- case _ =>
- }
-
- // Figure out the indexes of partition ids to compute.
- // 找出要计算的分区 ID 的索引。
- val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
-
- // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
- // with this Stage
- val properties = jobIdToActiveJob(jobId).properties
- addPySparkConfigsToProperties(stage, properties)
-
- runningStages += stage
- // SparkListenerStageSubmitted should be posted before testing whether tasks are
- // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
- // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
- // event.
- // SparkListenerStageSubmitted 应该在测试任务是否可序列化之前发布。
- // 如果任务不可序列化,则将发布 SparkListenerStageCompleted 事件,
- // 该事件应始终在相应的 SparkListenerStageSubmitted 事件之后。
- stage match {
- case s: ShuffleMapStage =>
- outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
- // Only generate merger location for a given shuffle dependency once. This way, even if
- // this stage gets retried, it would still be merging blocks using the same set of
- // shuffle services.
- if (pushBasedShuffleEnabled) {
- prepareShuffleServicesForShuffleMapStage(s)
- }
- case s: ResultStage =>
- outputCommitCoordinator.stageStart(
- stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
- }
- val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
- stage match {
- case s: ShuffleMapStage =>
- partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
- case s: ResultStage =>
- partitionsToCompute.map { id =>
- val p = s.partitions(id)
- (id, getPreferredLocs(stage.rdd, p))
- }.toMap
- }
- } catch {
- case NonFatal(e) =>
- stage.makeNewStageAttempt(partitionsToCompute.size)
- listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo,
- Utils.cloneProperties(properties)))
- abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
- runningStages -= stage
- return
- }
-
- stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
-
- // If there are tasks to execute, record the submission time of the stage. Otherwise,
- // post the even without the submission time, which indicates that this stage was
- // skipped.
- // 如果有任务要执行,记录stage的提交时间。 否则,在没有提交时间的情况下发布偶数,这表明此阶段已被跳过。
- if (partitionsToCompute.nonEmpty) {
- stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
- }
- listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo,
- Utils.cloneProperties(properties)))
-
- // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
- // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
- // the serialized copy of the RDD and for each task we will deserialize it, which means each
- // task gets a different copy of the RDD. This provides stronger isolation between tasks that
- // might modify state of objects referenced in their closures. This is necessary in Hadoop
- // where the JobConf/Configuration object is not thread-safe.
- // 任务的广播二进制文件,用于将任务分派给执行程序。 请注意,我们广播了 RDD 的序列化副本,
- // 并且对于每个任务,我们将对其进行反序列化,这意味着每个任务都会获得 RDD 的不同副本。
- // 这在可能修改其闭包中引用的对象状态的任务之间提供了更强的隔离。
- // 这在 Hadoop 中是必需的,因为 JobConf/Configuration 对象不是线程安全的。
- var taskBinary: Broadcast[Array[Byte]] = null
- var partitions: Array[Partition] = null
- try {
- // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
- // For ResultTask, serialize and broadcast (rdd, func).
- var taskBinaryBytes: Array[Byte] = null
- // taskBinaryBytes and partitions are both effected by the checkpoint status. We need
- // this synchronization in case another concurrent job is checkpointing this RDD, so we get a
- // consistent view of both variables.
- // taskBinaryBytes 和分区都受检查点状态的影响。 我们需要这种同步,
- // 以防另一个并发作业正在检查这个 RDD,所以我们得到了两个变量的一致视图。
- RDDCheckpointData.synchronized {
- taskBinaryBytes = stage match {
- case stage: ShuffleMapStage =>
- JavaUtils.bufferToArray(
- closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
- case stage: ResultStage =>
- JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
- }
-
- partitions = stage.rdd.partitions
- }
-
- if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
- logWarning(s"Broadcasting large task binary with size " +
- s"${Utils.bytesToString(taskBinaryBytes.length)}")
- }
- taskBinary = sc.broadcast(taskBinaryBytes)
- } catch {
- // In the case of a failure during serialization, abort the stage.
- case e: NotSerializableException =>
- abortStage(stage, "Task not serializable: " + e.toString, Some(e))
- runningStages -= stage
-
- // Abort execution
- return
- case e: Throwable =>
- abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
- runningStages -= stage
-
- // Abort execution
- return
- }
-
- val tasks: Seq[Task[_]] = try {
- val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
- stage match {
- case stage: ShuffleMapStage =>
- stage.pendingPartitions.clear()
- partitionsToCompute.map { id =>
- val locs = taskIdToLocations(id)
- val part = partitions(id)
- stage.pendingPartitions += id
- new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
- taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
- Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
- }
-
- case stage: ResultStage =>
- partitionsToCompute.map { id =>
- val p: Int = stage.partitions(id)
- val part = partitions(p)
- val locs = taskIdToLocations(id)
- new ResultTask(stage.id, stage.latestInfo.attemptNumber,
- taskBinary, part, locs, id, properties, serializedTaskMetrics,
- Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
- stage.rdd.isBarrier())
- }
- }
- } catch {
- case NonFatal(e) =>
- abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
- runningStages -= stage
- return
- }
-
- if (tasks.nonEmpty) {
- log.info(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
- s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
- taskScheduler.submitTasks(new TaskSet(
- tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
- stage.resourceProfileId))
- } else {
- // Because we posted SparkListenerStageSubmitted earlier, we should mark
- // the stage as completed here in case there are no tasks to run
- // 因为我们之前发布了 SparkListenerStageSubmitted,所以我们应该在此处将阶段标记为已完成,以防没有任务要运行
- markStageAsFinished(stage, None)
-
- stage match {
- case stage: ShuffleMapStage =>
- logDebug(s"Stage ${stage} is actually done; " +
- s"(available: ${stage.isAvailable}," +
- s"available outputs: ${stage.numAvailableOutputs}," +
- s"partitions: ${stage.numPartitions})")
- markMapStageJobsAsFinished(stage)
- case stage : ResultStage =>
- logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
- }
- submitWaitingChildStages(stage)
- }
- }

最后,走到TaskScheduler的submitTasks方法。至此,DAGScheduler完成了自己的使命,提交任务给TaskScheduler运行。
任务调度最重要的两个部分:DAGScheduler和TaskScheduler,负责将用户提交的任务生成执行关系(有向无环图:DAG),划分为不同的stage,提交到集群中进行最终的计算。
DAGScheduler和TaskScheduler中传输的是TaskSet,TaskScheduler和Worker中间传输的是Task。
1.通过用户的操作,创建DAG,生成DAG后发送给DAGScheduler。DAGScheduler这里对任务进行切分,切分成不同的stage,将每个stage中的TaskSet发送给TaskScheduler。
2.TaskScheduler通过ClusterManager发送Task给Worker结点的Executor,Executor将开启线程进行计算执行,并且保存任务到本地或者回传到Driver端。
TaskScheduler
为SparkContext调度任务,从不同的DAGscheduler(不同的任务),接收不同的Stage向集群提交任务、备份任务,他会调用SchedulerBackend(在新任务提交的时候,任务执行失败,计算结点挂掉了,执行过慢重新分配任务)
SchedulerBackend
对Exechutor分配任务Task,并且在Executor上执行Task,有多个TaskSheduler,也有多个SchedulerBackend,他们是一对一的关系,并且都被SparkContext持有。
Spark Stage切分 源码剖析——DAGScheduler
[spark] DAGScheduler划分stage源码解析
阿里云的数据集成(DataWorks):调度DLA Spark 任务 - 云原生数据湖分析 DLA - 阿里云
结合一个Example把调度流程套起来。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。