赞
踩
每一个Spark应用都会创建一个sparksession,用来跟Spark集群交互,如果提交任务的模式为cluster
模式,则Driver
进程会被随机在某个worker结点上启动,然后真正执行用户提供的入口类,或是使用Spark内置的入口类,同时会在Driver进程创建SparkContext
对象,提供Spark应用生命周期内所涉及到的各种系统角色。
使用Spark功能的主入口,用户可以通过此实例在集群中创建RDD、求累加各以及广播变量。
由于Driver
进程会真正执行Spark应用的入口程序,因此它只会在driver端被创建。
默认情况下,一个JVM环境只会创建一个SparkContext实例,但用户可以修改spark.driver.allowMultipleContexts
的默认值,来启用多个实例。SparkContext内部会启动多个线程来完成不同的工作,比如分发事件到监听者、动态分配和回收executors、接收executor的心跳等,因此需要用户主动调用stop()
接口来关闭此实例。
当用户通过Action函数触发RDD
上的计算时,(通常指count
、sum
、take
等的返回结果为非RDD的函数),便会生成一个Job
,被提交到Spark集群运行。而提交Job的入口便是此实例对象,比如RDD中的count()
方法的定义如下:
/**
* Return the number of elements in the RDD.
* sc 是SparkContext实例的变量名
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
可以看到当一个Action方法被调用时,会通过调用SparkContext
的runJob方法,并将当前RDD作为函数参数,触发计算过程,最终通过一系列地包装,将Job
任务的生成交由DAGScheduler
对象。
面向Stage的调度器,它会为每一个Job计算相应的stage,并跟踪哪个RDD和Stage的输出结果被物化了,并找到一个最小的调度方案来跑Job。DAGScheduler会把所有的stage封装成一系列的TaskSet
,提交给底层的一个TaskScheduler
的实现类,由它来负责任务的执行。TaskSet
包含了完整的依赖任务,这些任务都可以基于已经计算出现的数据结果,比如上游Stage的map任务的输出,完成自己的工作,即使这些任务会由于依赖数据不可用而失败。
Spark Stage的生成是通过在RDD图中划分shuffle边界而得到的。RDD的窄依赖,比如map()和filter()方法,会被以流水线的方式划分到一个stage的任务集中(TaskSet),但有shuffle行为的操作需要依赖多个其它的stage的完成(比如一个stage输出一组map数据文件,而另外一个stage会在被阻塞之后读取这些文件)。最终每一个stage都只会与其它stage有shuffle依赖,而且其可能会执行多个操作。
只有当各种类型的RDD之上的RDD.compute()
方法被触发时,才会真正地开始执行整个流水线。
另外对于一个DAG图中的所有stage,DAGScheduler也会根据当前集群的状态来决定每一个任务应该在哪个地方执行,最终才会把这些任务发送到底层的TashScheduler
。同时DAGScheduler会保证当一个stage的所需要的shuffle数据文件丢失时,同跑上游的stage,而TaskScheduler会保证当一个stage内部产生错误时(非数据文件丢失)会尝试重跑每个任务,在一定次数的尝试之后,取消整个stage的执行。
在尝试查看这部分的实现时,有如下一些概念需要明确:
ActiveJob
表示,两类),上层提交给调度器的工作单元。比如调用count()
方法时,一个新的Job就会生成,而这个Job会在执行了多个stage之后生成中间的数据。Stage
表示),包含了一组合任务集(TaskSet
),每一个stage会生成当前Job所需的中间结果,而每一个任务都会在同一个RDD数据集的所有Partition(分区)数据块上执行同一个方法。而由shuffle边界(Shuffle Boundries,或称之为Barrier屏障)所分隔的stage,会所有barrier之前的stage执行完成之后才开始执行。因此这里有两类的Stage,一个是ResultStage
,表示最后一个stage,输出action方法的结果;另一个是ShuffleMapStage
,为每一个shuffle操作输出数据文件。通常这些Stage产生的结果都是可以跨Job共享的,由于这不同的Job使用了同一份RDD数据。firstJobId
域,用于标识第一次提交当前Stage的Job的ID,如果使用的是FIFO的调度方式,通过此域可以提供首先执行前置Job中的stage或是有失败时快速恢复的能力。StageInfo
对象,然后转发给listeners(监听器)或是WEB UI。attempts
。如是TaskScheduler
报告某个任务由于FetchFailed
或是ExeuctorLost
事件而失败时,DAGScheduler会重新提交失败的stage。/** * SparkContext::runJob(..)方法会产生一个JobSubmitted消息,发送给DagScheduler的消息线程线程, * 最终调用如下的方法,开始从RDD依赖图生成Stage,然后再生成相应的ShuffleMapTask和ResultTask。 */ private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) }
DAGScheduler
接收到来自SparkContext
对象提交任务的请求后,会根据传递过来的RDD的paritions数量、用户提供的方法函数、JobId等信息,通过逆拓扑序的方法,递归地构建Stage
、TaskSet
。
提交Stage的过程描述如下:
至此,所有可能的祖先ShuffleMapStage被依次创建完毕,每一个ShuffleMapStage只包含以下其直接父依赖所对应的ShuffleMapStage。
## rdd方法调用图 rdd1 -- map() --> rdd2 -- filter() -> rdd3 -- groupbykey() --> rdd3 \ \ \ \ filter() --> rdd4 -- reducebykey() --> rdd5 -- join() --> rdd6 -- count() --> result ## rdd方法调用图转换成依赖图的结果如下 ## ndep == narrow dependency ## sdep == shuffle dependency rdd1 -- ndep --> rdd2 -- ndep -> rdd3 -- [sdep] --> rdd3 \ \ \ \ ndep --> rdd4 -- [sdep] --> rdd5 -- ndep --> rdd6 -- action --> result ## 如果result stage所对应的RDD的直接父依赖,不是ShuffleDependency,则继续回溯父依赖(NarrowDependency)的父依赖, ## 直到找到了每个直接父依赖可能存在的、离result stage最近的依赖,如stage1, stage2,则算完成第一轮创建。 ## 然后再递归创建stage1、stage2所对应的父ShuffleDependencty。 ## 依赖图换成stage图的结果如下 stage1 --> stage1 -> stage1 --> stage1 \ --> stage0 --> stage0 / stage2 --> stage2 --> stage2 --> stage2
Driver进程在初始化SparkContext的实例时,会创建任务调度相关的组件,它包括集群管理器SchedulerBackend
、任务调度器TaskScheduler
的实例,分别用来管理Spark应用关联的集群和任务分发和执行。根据用户的执行环境不同,一共有三种类型的SchedulerBackend(Cluster Manager)可能被创建:
CoarseGrainedSchedulerBackend
类,除了基本的分配和回收Executor功能外,它提供了用于和standalone集群交互的接口,不论是用户使用的是Spark开发模式(用户本地会启动一个常驻的子进程来交互式地提交任务)还是提交模式(spark-submit),都需要通过此实例与集群间接交互。核心的源码摘取如下:
private def createTaskScheduler( sc: SparkContext, master: String, deployMode: String): (SchedulerBackend, TaskScheduler) = { import SparkMasterRegex._ master match { case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1) scheduler.initialize(backend) (backend, scheduler) case LOCAL_N_REGEX(threads) => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler) case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => val scheduler = new TaskSchedulerImpl(sc) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) val masterUrls = localCluster.start() val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => { localCluster.stop() } (backend, scheduler) case masterUrl => val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) } } }
CoarseGrainedSchedulerBackend
(通常所说的driver,StandaloneSchedulerBackend
但是它的一个子类)为该类的一个实现类,用于管理所有的executors(这里的executor指的是一个在worker节点启动的后端进程CoarseGrainedExecutorBackend
,管理真正的Executor实例),这些executor的生命周期是与Spark Job的绑定的,而非每个Task
,这样就避免了过多的资源的申请、创建、回收等的过程,提高整个集群的性能。
当一个Spark应用被提交到集群时,会先传递给DAGScheduler
进行Stage
的划分及TaskSet
的生成,而后续任务的调度和执行由DAGScheduler
通过RPCJobSubmitted
消息转发给SchedulerBackend
对象。
其实现类为TaskSchedulerImpl
,它会接收来自DAGScheduler
生成的一系列TaskSet
,然后创建对应的TaskSetManager
(实际负责调度TaskSet中的每一个任务),并将TaskSetManager
添加到自己的等待资源池里,等待后续的调度(调度方式目前有两种,一是先进选出调度,一种是公平调度)。
def initialize(backend: SchedulerBackend) { // TaskScheduler内部使用的调度线程池,负责选择适合的TaskSetManager执行 this.backend = backend schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) case _ => throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " + s"$schedulingMode") } } schedulableBuilder.buildPools() }
当TaskSchedulerImpl接收到可调度的任务集后,就通知driver进程,尝试调度Task
执行。
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets.foreach { case (_, ts) => ts.isZombie = true } stageTaskSets(taskSet.stageAttemptId) = manager // 将task set manager添加到调度线程池中 schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } // 向SchedulerBackend实例,发送ReviveOffers消息,通知它有新的作业生成了,可以分配这些任务 // 到合适的Executor上执行了 backend.reviveOffers() }
TaskScheduler
从CoarseGrainedSchedulerBackend
接收已经注册的、可用的空闲executors信息,从自己的等待池中遍历所有的TaskSet
,尝试调度每一个TaskSet执行,但很多情况下,并不是每一个TaskSet
中的所有Task
被一次分配资源并执行,因此TaskScheduler
完成为某一个等待中的TaskSet找到一组可用的executors,至于如何在这些executors上分配自己管理的Task,则将由TaskSetManager
完成。TaskSetManager
尝试分配任务到时某个executor上的核心代码如下:
@throws[TaskNotSerializableException] def resourceOffer( execId: String, host: String, maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist => blacklist.isNodeBlacklistedForTaskSet(host) || blacklist.isExecutorBlacklistedForTaskSet(execId) } if (!isZombie && !offerBlacklisted) { var allowedLocality = maxLocality if (maxLocality != TaskLocality.NO_PREF) { allowedLocality = getAllowedLocalityLevel(curTime) if (allowedLocality > maxLocality) { // We're not allowed to search for farther-away tasks allowedLocality = maxLocality } } dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() // Do various bookkeeping copiesRunning(index) += 1 val attemptNum = taskAttempts(index).size val info = new TaskInfo(taskId, index, attemptNum, curTime, execId, host, taskLocality, speculative) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) // Update our locality level for delay scheduling // NO_PREF will not affect the variables related to delay scheduling if (maxLocality != TaskLocality.NO_PREF) { currentLocalityIndex = getLocalityIndex(taskLocality) lastLaunchTime = curTime } // Serialize and return the task val serializedTask: ByteBuffer = try { ser.serialize(task) } catch { case NonFatal(e) => throw new TaskNotSerializableException(e) } if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && !emittedTaskSizeWarning) { emittedTaskSizeWarning = true addRunningTask(taskId) sched.dagScheduler.taskStarted(task, info) new TaskDescription( taskId, attemptNum, execId, taskName, index, task.partitionId, addedFiles, addedJars, task.localProperties, serializedTask) } } else { None } }
通过上面过程生成的TaskDescriptions的信息,都是可以执行的,最终通过CoarseGrainedSchedulerBackend::launchTasks
方法,将这些任务分发到绑定的executor上执行:
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv) extends ExecutorAllocationClient with SchedulerBackend with Logging { // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = TaskDescription.encode(task) if (serializedTask.limit >= maxRpcMessageSize) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + "spark.rpc.message.maxSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + s"${executorData.executorHost}.") // Worker结点上的CoarseGrainedExecutorBackend实例(实际上就是Executor)会接收到 // LaunchTask消息,然后将其扔到自己的工作线程池中执行。 executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } } }
每一个该类的实例,都对应于一个TaskSet
,它负责调度TaskSet中的任务,并跟踪、记录每一个任务的状态和行为。
TaskSetManager
内部使用本地化可感知的延迟算法,为自己管理的TaskSet
中的每一个任务Task
(主要是ShuffleMapTask
或ResultTask
类型的任务)分配executor,同时创建相应的TaskDescription
集体,返回给上层角色。
Executor
执行器分配管理器,(当用户开启了动态资源分配策略时spark.dynamicAllocation.enabled
时会在SparkContext
内部创建此对象),基于工作负载动态分配和回收executors,它内部维护了一个可变的目标数据变量,表示当前应用需要多少个活动的executor才能解决任务的积压问题,它的最小值为配置的初始化值,并跟随堆积和正在运行的任务的数量而变化。manager会周期性地与cluster manager
(SchedulerBackend后端)同步executor的目标数量的值。
如果当前executor的数量大于目前的负载,会被减少至能够容纳当前正在运行和堆积的任务的数量。而需要增加executor数量的情况发生在有任务堆积待执行的时候,并且在N秒内不能够处理完等待队列中的任务;或者之前增加executor数量的操作不能够在M秒内消费完队列中的任务时候,继续增加executor数量的操作,然后继续下一轮判定。在每一轮的判定中,executor数量的增加是指数级的,直到达到一个上界值,而这个上界是基于配置的spark属性及当前正在运行和等待的任务数量共同决定的。
至于为何以指数的递增方式增加executor,这里有两个方面的合理性:
移除executor的逻辑很简单,如果某个executor在K秒内没有运行过任务,那么就应该移除它。ExecutorAllocationManager
没有重试的逻辑,以增加或是减少executor,因此如何才能保证达到请求的executor数量是由ClusterManager保证的。
与此管理器相关的、可配置的属性有以下几个:
* spark.dynamicAllocation.enabled - 是否开启动态分配功能
* spark.dynamicAllocation.minExecutors - 活动executor的最小数量
* spark.dynamicAllocation.maxExecutors - 活动executor的最大数量
* spark.dynamicAllocation.initialExecutors - 应用启动时请求的executor数量
*
* spark.dynamicAllocation.executorAllocationRatio - 控制executor数量的因子,实际的最大executor数量=(任务运行数+等待任务数)* ration / executor.cores
*
* spark.dynamicAllocation.schedulerBacklogTimeout (M) - 如果在timeout时间后依然有堆积的任务,则尝试增加executor的数量
*
* spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) - 如果
*
* spark.dynamicAllocation.executorIdleTimeout (K) - timeout时间后移除executor
rdd.take(10)
方法),就开始在Spark集群中计算数据。MapPartitionsRDD
或是ShuffledRDD
等,并封装调用的RDD为其父依赖,MapPartitionsRDD
对应于OneToOneDependency
,ShuffledRDD
对应于ShuffleDependency
),创建Stage
图,而后尝试提交最后一个Stage(即ResultStage
)执行。Task
。当回溯到某个Stage没有父依赖时,意味着当前Stage应该被首先执行,且可以提交执行,因此将当前Stage添加到running队列中,并为这个Stage的每一个待生成的Partition创建对应的Task,而后提交执行。TaskSchedulerImpl
接收到任务后,且有worker资源启动任务时,就会将任务分配到可用的worker执行,例如ShuffleMapTask
会执行Reduce/Map操作,即先reduce之前依赖的Partition的数据,再Map输出reduce之后的Partition数据。MapStatus
的实例,描述当前Task的状态信息,并序列化后发送给driver端。Stage_0
和Stage_1
可以同时执行,但只有先完成Stage_0
才会执行Stage_1
。Spark应用提交到Standalone集群过程的时序图如下所示,一个App提交给master后,最终会在集群内启动一个driver进程,通过各种RPC交互与集群交互,完成应用的运行。
最终一个Spark应用从Driver进程到启动Executor执行的过程的时序图如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。