赞
踩
上篇是Spark入门的第一篇,写了一些关于Spark编程中RDD的一些基本操作,主要是为了能快速入手Spark编程,接下来会对Spark的内部原理进行分析。
对于Spark来说,任务的调度和执行可以说是其运行的核心流程,所以本文从源码的角度对这个过程进行详细的分析。
【Spark任务执行流程】
通过上文的基本介绍我们知道,Spark的编程模型基础是RDD,数据被封装为RDD类型,进行后续一系列转换,那么Spark是如何将RDD进行处理的呢?
在Spark内部,对RDD又进行了整合,首先,以行动算子为划分粒度,划分为Job(作业),各个Job根据依赖关系(宽窄依赖)划分为Stage(调度阶段),每个Stage中存在多个Task,组成一个TaskSet(任务集),各个Task可以并发执行,执行逻辑相同,但处理的数据不同,处理的是不同partition(分区)下的数据。上述这些划分工作都是在Driver上进行,Task是被分发到Executor上的任务,是Spark实际执行的基本单元。
上述过程具体到代码实现中,主要是下面几个类,在接下来的源码分析中,也几乎都是在这几个类下进行跳转:
org.apache.spark.scheduler.DAGScheduler
负责分析用户提交的应用:
① 根据任务的依赖关系建立DAG;
② 将DAG划分到不同Stage
org.apache.spark.scheduler.TaskScheduler
负责为创建它的SparkContext调度任务:
① 从DAGScheduler接收不同stage的任务;
② 向集群提交这些任务;
③ 为执行慢的任务动备份任务
org.apache.spark.scheduler.SchedulerBackend
负责分配当前可用资源:
① 向目前等待分配Executor的Task分配Executor;
② 在已分配的Executor上启动Task,完成计算的调度
SparkContext
、TaskScheduler
、DAGScheduler
】TaskScheduler
、DAGScheduler
都是在创建SparkContext时创建(
TaskScheduler
的创建方式:调用createTaskScheduler方法:
org.apache.spark.SparkContext#createTaskScheduler
DAGScheduler
的创建方式:调用构造函数:
_dagScheduler = new DAGScheduler(this)
提交一个Job遵循着如下的调用步骤:
org.apache.spark.SparkContext#runJob
org.apache.spark.scheduler.DAGScheduler#runJob
org.apache.spark.scheduler.DAGScheduler#submitJob
在这里有一点需要说明:
runJob调用submitJob会发生阻塞,直到完成或者返回失败
submitJob过程如下:
def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // 忽略一些参数校验等 ...... val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }
创建JobWaiter对象,通过内部消息处理,把这个JobWaiter对象发给DAGSchedulerEventProcessLoop的onReceive方法
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#onReceive
onReceive方法中调用了doOnReceive方法如下:
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
......
}
如果JobSubmitted模式可以匹配成功,那么就会调用org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted
;
调用了handleJobSubmitted之后,就是划分stage了,我们将它放在下个部分分析。
划分Stage的主要逻辑在org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // 获取最后一个Stage finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) } catch { ...... } // 后面部分代码省略,是下一部分研究的~~ ...... // 提交调度,第四部分内容,暂留伏笔 submitStage(finalStage) }
该方法首先根据最后一个RDD生成ResultStage,其中newResultStage()中调用org.apache.spark.scheduler.DAGScheduler#getParentStagesAndId
,进而调用org.apache.spark.scheduler.DAGScheduler#getParentStages
获取ParentStage
getParentStages()方法就是stage划分阶段重要的逻辑所在了,划分依据就是是否存在shuffle操作。
代码执行的主要逻辑就是每遇到一个ShuffleDependency,生成一个ParentStage
private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
// 要返回的 ParentStage
val parents = new HashSet[Stage]
// 已被访问过的RDD
val visited = new HashSet[RDD[_]]
// 需要被处理的RDD,非ShuffleDependency的RDD
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
parents.toList
}
其中visit()方法就是遍历处理的方法,先标记访问过的RDD,然后判断当前RDD所依赖的RDD的操作类型,如果是ShuffleDependency,就调用getShuffleMapStage(),划分ShuffleMap调度阶段(向前遍历划分),如果非ShuffleDependency,入waitingForVisit栈中。
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, firstJobId)
case _ =>
waitingForVisit.push(dep.rdd)
}
}
}
}
划分调度阶段的方法:org.apache.spark.scheduler.DAGScheduler#getShuffleMapStage
主要逻辑是首先寻找该分支上所有宽依赖RDD,生成ShuffleMapStage
private def getShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => // 寻找该分支上其他的宽依赖 getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => if (!shuffleToMapStage.contains(dep.shuffleId)) { shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) } } // 生成 ShuffleStage val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) shuffleToMapStage(shuffleDep.shuffleId) = stage stage } }
是否还记得在步骤二中提到的org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted
方法中,留下的伏笔submitStage(finalStage)
。这里为了阅读方便,删掉了一些原有代码,特此标明~~
概括来说,调用getMissingParentStages()
获取父stage,如果已经不存在父stage了,就调用 submitMissingTasks(stage, jobId.get),否则继续递归调用,直到不存在父stage为止。
private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) // 获取finalStage的父stage val missing = getMissingParentStages(stage).sortBy(_.id) // 不存在父stage if (missing.isEmpty) { submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { // 递归调用 submitStage submitStage(parent) } waitingStages += stage } }
这里有个小坑,需要说明:
在提交Stage前,要判断所依赖的父调度阶段(父Stage)是否运行成功,成功才提交该Stage,否则重新提交父Stage。
判断逻辑在ShuffleMapTask完成时进行,是通过下面的方式完成的:
在Executor.run()任务执行完成发送消息,通知DAGScheduler等调度器的更新状态,handleTaskCompletion()对事件进行处理。
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
......
case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
......
}
这时DAGScheduler会检查调度阶段的所有任务是否已经完成,如果存在执行失败的Stage,则重新提交。具体判断逻辑在下面代码的第21~32行。
private[scheduler] def handleTaskCompletion(event: CompletionEvent) { val task = event.task val taskId = event.taskInfo.id val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) ...... val stage = stageIdToStage(task.stageId) event.reason match { case Success => stage.pendingPartitions -= task.partitionId task match { case rt: ResultTask[_, _] => val resultStage = stage.asInstanceOf[ResultStage] resultStage.activeJob match { case Some(job) => case None => } case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { shuffleStage.addOutputLoc(smt.partitionId, status) } } case Resubmitted => logInfo("Resubmitted " + task + ", so marking it as still running") stage.pendingPartitions += task.partitionId case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleToMapStage(shuffleId) submitWaitingStages() }
根据调度阶段分区拆分对应个数的Task,组成任务集交给TaskScheduler
主要逻辑:
对于ShuffleMapStage,生成ShuffleMapTask
对于ResultStage,生成ResultTask
每个TaskSet包含了对应Stage中的所有Task,划分依据是数据Partition。
private def submitMissingTasks(stage: Stage, jobId: Int) { ...... val tasks: Seq[Task[_]] = try { stage match { case stage: ShuffleMapStage => partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, stage.latestInfo.taskMetrics, properties) } case stage: ResultStage => val job = stage.activeJob.get partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } // 提交 if (tasks.size > 0) { stage.pendingPartitions ++= tasks.map(_.partitionId) taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { markStageAsFinished(stage, None) } }
提交任务步骤如下:
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks this.synchronized { // 创建任务集管理器 val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) ...... backend.reviveOffers() }
private def makeOffers() {
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
launchTasks(scheduler.resourceOffers(workOffers))
}
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
接收到LaunchTask的消息,
会调用org.apache.spark.executor.Executor#launchTask
会初始化一个TaskRunner,然后放到线程池中执行。
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
org.apache.spark.executor.Executor.TaskRunner#run
省略了一些代码,包括反序列化Task以及Task所依赖的jar文件,
override def run(): Unit = { ...... var taskStart = System.currentTimeMillis() val value = try { val res = task.run( taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } finally { ...... } val taskFinish = System.currentTimeMillis() }
然后会调用org.apache.spark.scheduler.Task#runTask
方法,由于Task是一个抽象类,有两个实现类
org.apache.spark.scheduler.ShuffleMapTask
org.apache.spark.scheduler.ResultTask
对于ResultTask
,计算结果会直接返回
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
func(context, rdd.iterator(partition, context))
}
对于ShuffleMapTask
,计算结果写入BlockManager
中,返回一个MapStatus
对象,这个MapStatus
对象存储的是结果存入BlockManager
的相关信息,这样做是为了方便下一阶段任务获得输入数据。
override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { ...... } }
关于Spark任务调度部分的源码,大体流程就是如此了,对于Spark集群之间只如何通信的,以及Spark对于数据是如何存储的,后续会继续分析。
参考文献:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。