赞
踩
TaskSet其实是stage划分完成后,为不同类型stage中的每个分区创建一个task。stage中partition个数的tasks组成一个taskSet,用TaskScheduler来提交。
TaskScheduler提交的taskSet,用自己的CoarseGrainedSchedulerBackend按照不同的本地化级别分配Executor,Executor把task放入线程池去执行。
stage划分好以后,找到了祖先parent,就可以执行从头一个stage开始的所有task了。
RDD Action 触发sc.runJob -> DAGScheduler事件循环 -> submitStage() -> submitMissingTasks()
submitMissingTasks为stage创建tasks,从stage中获取numPartitions,为每一个partition创建一个task。
//为每一个partition/task 计算最佳位置 val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { } // 为每一个partition创建一个task,如果是finalStage,就创建ResultTask // 否则,为每一个partition创建ShuffleMapTask。 val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return }
partition的最佳位置
最佳位置含义是task尽量在本地结点上执行,而不必迁移数据。
为当前Stage创建partition个数的task后,把tasks.toArray等信息封装成TaskSet,提交给TaskScheduler执行。
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
TaskScheduler提交TaskSet时,
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) // stageTaskSets.foreach { case (_, ts) => ts.isZombie = true } stageTaskSets(taskSet.stageAttemptId) = manager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } backend.reviveOffers() }
TaskSetManager
TaskSetManager调度TaskSchedulerImpl中的一个单独的TaskSet。
这个类一直跟踪每个task,如果失败了就重试这些task,并且通过延迟调度,为这个TaskSet进行locality-aware的调度()。
TaskSetManager主要的接口是resourceOffer(),它询问这个TaskSet是否要在一个结点上运行一个task,并更新状态,告诉TaskSet某个task状态改变了。
locality-aware
Task的本地化级别,依次是PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY。
- PROCESS_LOCAL:进程本地化,rdd的partition和task,在同一个Executor进程内,速度最快
- NODE_LOCAL:rdd的partition和task不在一个进程,但是在同一个worker结点上;
- RACK_LOCAL:rdd的partition和task在同一个机架上;
reviveOffers()
这里的backend是SparkContext.createTaskScheduler时,根据sparkUrl初始化的。Standalone模式时:
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
但是StandaloneSchedulerBackend中并没有reviveOffers(),而是在它继承的CoarseGrainedSchedulerBackend中,
向内部类driverEndpoint发送了ReviveOffers消息,又是自己收到,
case ReviveOffers =>
makeOffers()
最后执行的是CoarseGrainedSchedulerBackend的makeOffers():
// Make fake resource offers on all executors private def makeOffers() { // Make sure no executor is killed while some task is launching on it val taskDescs = withLock { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores, Some(executorData.executorAddress.hostPort)) }.toIndexedSeq scheduler.resourceOffers(workOffers) } if (!taskDescs.isEmpty) { launchTasks(taskDescs) } }
makeOffers()
makeOffers – resourceOffers()
TaskSchedulerImpl的resourceOffers(),传入的参数WorkerOffers,封装了当前App的所有可用Executors的资源信息。
resourceOffers被调用来在集群的slaves上分配资源。为每个结点用round-robin方式分配tasks,把task分配到Executor上。
resourceOffers() 是task资源分配机制的核心。
最终每个taskSet所需的Executor资源的分配,是在resourceOfferSingleTaskSet()中完成的。
private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: IndexedSeq[ArrayBuffer[TaskDescription]], addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = { var launchedTask = false // nodes and executors that are blacklisted for the entire application have already been // filtered out by this point for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host if (availableCpus(i) >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId taskIdToTaskSetManager.put(tid, taskSet) taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) // Only update hosts for a barrier task. if (taskSet.isBarrier) { // The executor address is expected to be non empty. addressesWithDescs += (shuffledOffers(i).address.get -> task) } launchedTask = true } } catch { case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. return launchedTask } } } return launchedTask }
makeOffers – launchTasks()
launchTasks拿到序列化后的task,发送一个LaunchTask消息给executorEndpoint,
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
这个executorEndpoint其实是CoarseGrainedExecutorBackend。
参考它的伴生对象中:
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
ExecutorBackend收到消息后调用Executor的launchTask():
// 把CoarseGrainedExecutorBackend作为第一个参数传给Executor。
executor.launchTask(this, taskDesc)
最后,把一个task通过executor的线程池启动起来:
// Executor.scala
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
// 内部类TaskRunner把task反序列化,封装成TaskRunner的Runner,放入线程池执行
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。