赞
踩
上篇文章我们介绍到,SparkContext初始化的时候,在TaskScheduler启动阶段会向Master发送RegisterApplication事件,下面我们进入Master主类,从它接收到该事件后接着分析。
另外由于这里的代码不好远程debug,所以本篇文章的讲解不会涉及到debug过程,纯纯的是枯燥的
源码解析。
- case RegisterApplication(description, driver) =>
- // 如果当前master的状态是standby(备用的),也就是当前master不是active master,那么application来请求注册,什么都不会干
- if (state == RecoveryState.STANDBY) {
-
- } else {
- logInfo("Registering app " + description.name)
- //根据 Driver( TaskScheduler 启动阶段生成的RPC节点 )传递过来参数创建ApplicationInfo对象
- val app = createApplication(description, driver)
- //重点一:将application注册到master上
- registerApplication(app)
- logInfo("Registered app " + description.name + " with ID " + app.id)
- //将application信息持久化存储下来
- persistenceEngine.addApplication(app)
- //重点二:向Driver( TaskScheduler 启动阶段生成的RPC节点 )发送Application注册的响应
- driver.send(RegisteredApplication(app.id, self))
- //重点三:调用master的调度方法
- schedule()
- }

可以看到master接收到RegisterApplication事件之后,会先注册application的信息,随后响应driver,最后调用master的任务调度方法。下面我们分别看一下:
重点一:将application注册到master上
这一步没什么难点,主要是将application相关的信息登记到master的各种集合中:
- private def registerApplication(app: ApplicationInfo): Unit = {
- //获取driver地址信息
- val appAddress = app.driver.address
- if (addressToApp.contains(appAddress)) {
- logInfo("Attempted to re-register application at same address: " + appAddress)
- return
- }
- //在application的度量系统中记录该application的信息
- applicationMetricsSystem.registerSource(app.appSource)
- //将application添加到各种信息记录的集合中
- apps += app
- idToApp(app.id) = app
- endpointToApp(app.driver) = app
- addressToApp(appAddress) = app
- //将application添加到等待队列中(重点)
- waitingApps += app
- }

注册代码很少,主要留心下最后一步将application加入等待队列即可,加入等待队列后,后面就会被master进行调度处理。这块的具体处理我们再重点三中进行讲述。
重点二:向Driver( TaskScheduler 启动阶段生成的RPC节点 )发送Application注册的响应
在看源码前,可以先留心下master中的driver rpc节点是指什么,通过前面的文章我们可以知道driver节点实际是TaskScheduler 启动阶段生成的RPC节点,对于我们的案例来说,其实就是StandaloneAppClient对象节点。了解了这些概念,后面我们看源码也会清晰很多。
就比如说这里向Driver发送RegisteredApplication响应,接收事件的处理逻辑是什么,我们如果想知道,就必须知道这个Driver节点到底是什么。发送事件没什么好说的,这里我们直接看下StandaloneAppClient的响应逻辑
- case RegisteredApplication(appId_, masterRef) =>
- //记录appId
- appId.set(appId_)
- //标识app已被注册过
- registered.set(true)
- //存储master RPC节点的引用
- master = Some(masterRef)
- //通知LanchServer当前app的appid
- listener.connected(appId.get)
可以看到响应逻辑很简单,就是一些常见的标识修改,信息记录等,关键的是如何快速定位到所发送的RPC节点。本案例是StandaloneAppClient接收,如果是cluster部署,还是吗?答案肯定为不是。
重点三:调用master的调度方法(核心,重中之重)
master的schedule方法是master的核心方法,它为等待的Application安排当前可用的资源。 因此每次有新应用加入或资源可用性发生变化时,都会调用此方法。另外在阅读源码前,有几个小点留意下,这样更便于理解源码:
1)driver的调度一定要在executor之前,因为executor需要向driver创建过程中TaskScheduler注册自己的信息。
2)waitting driver遍历时,一开始会挑选ALIVE状态的worker并随机排列,随后则是依次查看worker是否可以启动driver。如果还有未启动的driver,且还有worker没有被遍历到则继续遍历worker尝试在其上启动driver。通过这个代码逻辑我们也可以看出来,每个worker默认是至多运行一个driver。
- private def schedule(): Unit = {
- //如果当前节点不是ALIVE活跃状态,则直接退出
- if (state != RecoveryState.ALIVE) {
- return
- }
- // 将spark集群中可用的worker随机打乱排列
- val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
- //记录活跃Worker的数量
- val numWorkersAlive = shuffledAliveWorkers.size
- var curPos = 0
- //遍历所有等待的driver,在资源足够时发起driver的启动构建
- //对于每个driver,我们依次遍历可用的worker,尝试在worker上发起driver的运行
- for (driver <- waitingDrivers.toList) {
- var launched = false
- var isClusterIdle = true
- //记录已经访问过的worker数
- var numWorkersVisited = 0
- //当还有剩余worker没有遍历到,且当前driver没有被发起时,进入while循环
- while (numWorkersVisited < numWorkersAlive && !launched) {
- //从可用worker列表中依据顺序取出下一个没有被访问到的worker
- val worker = shuffledAliveWorkers(curPos)
- //判断当前worker是不是driver和worker都没有启动过的空节点
- isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty
- //访问的worker记录数加1
- numWorkersVisited += 1
- //重点一:如果worker节点的资源(内存、cpu等)满足driver的发起条件,则尝试在该worker上启动driver
- if (canLaunchDriver(worker, driver.desc)) {
- val allocated = worker.acquireResources(driver.desc.resourceReqs)
- driver.withResources(allocated)
- //重点二:在该worker上发起driver运行
- launchDriver(worker, driver)
- //driver发起后,将该driver从等待调度的队列中移除
- waitingDrivers -= drive
- launched = true
- }
- //移动可用worker数组的下标
- curPos = (curPos + 1) % numWorkersAlive
- }
- //如果存在worker节点既没运行driver也没运行executor,但是资源仍不符合driver运行要求,说明driver需要的资源很多,我们需要调整所有worker的资源配给
- if (!launched && isClusterIdle) {
- logWarning(s"Driver ${driver.id} requires more resource than any of Workers could have.")
- }
- }
- //重点三:启动executor
- startExecutorsOnWorkers()
- }

schedule方法中又有三处比较关键的点,分别是driver启动资源的判断、driver调度的发起、worker调度的发起。下面分别看一下:
重点一:如果worker节点的资源(内存、cpu等)满足driver的发起条件,则尝试在该worker上启动driver
- private def canLaunchDriver(worker: WorkerInfo, desc: DriverDescription): Boolean = {
- canLaunch(worker, desc.mem, desc.cores, desc.resourceReqs)
- }
-
- private def canLaunch(
- worker: WorkerInfo,
- memoryReq: Int, // driver的内存需求
- coresReq: Int, //driver的cpu需求
- resourceRequirements: Seq[ResourceRequirement]) //driver的resource需求
- : Boolean = {
- val enoughMem = worker.memoryFree >= memoryReq
- val enoughCores = worker.coresFree >= coresReq
- val enoughResources = ResourceUtils.resourcesMeetRequirements(
- worker.resourcesAmountFree, resourceRequirements)
- enoughMem && enoughCores && enoughResources
- }

可以看到资源判断逻辑很简单,就是判断worker的内存、cpu、resource数三个指标是否同时符合driver的启动要求。这里留意下memoryFree 、coresFree 、resourcesAmountFree三个变量名,在后续讲解executor具体创建时还会涉及该变量。
重点二:在该worker上发起driver运行
在worker上发起driver绝对是重中之重,虽然我们的WordCount案例走的是client模式,不会有driver的调度和运行发起。但是由于这块实在太核心,所以这里也介绍下:
- private def launchDriver(worker: WorkerInfo, driver: DriverInfo): Unit = {
- logInfo("Launching driver " + driver.id + " on worker " + worker.id)
- //将driver加入worker内存的缓存结构
- //将worker内使用的内存和cpu数量,都加上driver需要的内存和cpu数量
- worker.addDriver(driver)
- //把worker加入到driver内部的缓存结构中
- driver.worker = Some(worker)
- //向worker发送LaunchDriver事件,让worker来启动driver
- worker.endpoint.send(LaunchDriver(driver.id, driver.desc, driver.resources))
- //将driver状态设置为RUNNING
- driver.state = DriverState.RUNNING
- }
可以看到driver的发起其实比较简单,主要是将worker和driver相互引用,更新worker的资源使用情况,然后通过RPC节点,向对应的worker节点发送LaunchDriver事件,接下来我们到worker上看下其接收到LaunchDriver事件后的处理逻辑:
- case LaunchDriver(driverId, driverDesc, resources_) =>
- logInfo(s"Asked to launch driver $driverId")
- //构建并启动DriverRunner线程(该线程会启动一个Driver进程)
- val driver = new DriverRunner(
- conf,
- driverId,
- workDir,
- sparkHome,
- driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
- self,
- workerUri,
- securityMgr,
- resources_)
- drivers(driverId) = driver
- //启动DriverRunner
- driver.start()
- //当前worker加上driver运行需要的内存和cpu等资源
- coresUsed += driverDesc.cores
- memoryUsed += driverDesc.mem
- addResourcesUsed(resources_)

我们接着看下DriverRunner的start方法:
- private[worker] def start() = {
- //启动一个java线程
- new Thread("DriverRunner for " + driverId) {
- override def run(): Unit = {
- //设置线程执行结束的钩子处理函数
- var shutdownHook: AnyRef = null
- try {
- shutdownHook = ShutdownHookManager.addShutdownHook { () =>
- logInfo(s"Worker shutting down, killing driver $driverId")
- kill()
- }
-
- //重点:准备driver需要的jar文件并运行driver
- val exitCode = prepareAndRunDriver()
-
- //对driver的状态做一些封装处理
- finalState = if (exitCode == 0) {
- Some(DriverState.FINISHED)
- } else if (killed) {
- Some(DriverState.KILLED)
- } else {
- Some(DriverState.FAILED)
- }
- } catch {
- case e: Exception =>
- kill()
- finalState = Some(DriverState.ERROR)
- finalException = Some(e)
- } finally {
- if (shutdownHook != null) {
- ShutdownHookManager.removeShutdownHook(shutdownHook)
- }
- }
-
- // 这个DriverRunner线程,向它所属的worker,发送一个DriverStateChanged事件
- worker.send(DriverStateChanged(driverId, finalState.get, finalException))
- }
- }.start()
- }

在driverRunner中会单独启动一个线程准备driver运行环境和运行driver,并将运行结果封装后发送给当前worker节点,接着我们在看下worker如何准备driver运行环境与发起driver运行的。
- private[worker] def prepareAndRunDriver(): Int = {
- //创建工作目录
- val driverDir = createWorkingDirectory()
- //下载用户上传的jar(我们编写完spark应用程序,打包成的jar)
- val localJarFilename = downloadUserJar(driverDir)
- //准备额外的资源文件
- val resourceFileOpt = prepareResourcesFile(SPARK_DRIVER_PREFIX, resources, driverDir)
-
- def substituteVariables(argument: String): String = argument match {
- case "{{WORKER_URL}}" => workerUrl
- case "{{USER_JAR}}" => localJarFilename
- case other => other
- }
- val javaOpts = driverDesc.command.javaOpts ++ resourceFileOpt.map(f =>
- Seq(s"-D${DRIVER_RESOURCES_FILE.key}=${f.getAbsolutePath}")).getOrElse(Seq.empty)
- //构建ProcessBuilder,传入了driver启动命令,需要的内存大小等信息
- val builder = CommandUtils.buildProcessBuilder(driverDesc.command.copy(javaOpts = javaOpts),
- securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
- //通过ProcessBuilder启动driver进程
- runDriver(builder, driverDir, driverDesc.supervise)
- }

可以看到worker会为了driver先创建对应的工作目录,随后通过命令调用的方式调度driver的运行,这里就不进一步的研究运行的命令具体是什么了,这个跟我们阅读源码的初衷偏离了很多。所以关于driver的启动暂时先介绍到这,我们回归主线,接着介绍worker创建任务的调度过程。
重点三:在worker上启动executor
还记得我们前一章讲解的executor构建命令拼装吗,其最后都封装在了application中,因此我们在创建启动executor的时候可以直接到application中获取相关的信息。
在worker上调度和启动executor的源码如下:
- private def startExecutorsOnWorkers(): Unit = {
- for (app <- waitingApps) {
- //获取每个executor的cpu需求数,默认是1
- val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
- // 如果app需要的core大于等于每个executor需要的core,则接着处理,如果app剩余需要的cpu数据,小于executor构建需要的cpu数,则不在进行新executor的构建(节省资源)
- if (app.coresLeft >= coresPerExecutor) {
- // 过滤出ALIVE状态且资源符合executor创建的worker节点
- val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
- .filter(canLaunchExecutor(_, app.desc))
- .sortBy(_.coresFree).reverse
- //如果只有一个app等待创建executor,但是仍没有可用worker创建executor,说明集群资源不够,打印warn警告
- val appMayHang = waitingApps.length == 1 &&
- waitingApps.head.executors.isEmpty && usableWorkers.isEmpty
- if (appMayHang) {
- logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
- }
- //重点一:计算每个worker要分配出去的cpu数,该方法返回的是一个数组
- val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
-
- //重点二:根据上一步计算的每个worker要分配出去的cpu数,遍历可用worker,如果该worker需要分配的cpu数大于0,则向该worker发送对应的LaunchExecutor事件
- for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
- allocateWorkerResourceToExecutors(
- app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
- }
- }
- }
- }

这里有一个比较绕的逻辑点,就是当application剩余需要的cpu数大于单个executor需要的cpu数据才进行新executor的创建,这一步的目的个人猜测是节省资源,要知道,如果application的剩余需求cpu小于单executor需要的cpu数,要么是已经存在了运行的executor,要么就是配置有问题,正常情况下很少出现application需求cpu数小于executor创建需求cpu数的情况。
整体来看,这块有两个重要的步骤,我们分别看下:
重点一:计算每个worker要分配出去的cpu数
计算worker分配cpu数的算法有两种,一种是spreadOutApps(默认):遍历worker,一次只在worker上分配一个executor,另一种是非spreadOutApps:尽量在一个worker启动所有executor,直到分配结束或者资源不够。下面我们看下详细代码:
- private def scheduleExecutorsOnWorkers(
- app: ApplicationInfo,
- usableWorkers: Array[WorkerInfo],
- spreadOutApps: Boolean): Array[Int] = {
- // 获取app中设置或者默认的每个executor需要的cpu和内存数
- val coresPerExecutor = app.desc.coresPerExecutor
- val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
- val oneExecutorPerWorker = coresPerExecutor.isEmpty
- val memoryPerExecutor = app.desc.memoryPerExecutorMB
- val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor
- //获取可用worker的数量
- val numUsable = usableWorkers.length
- //创建一个空数组,存储了每个worker要分配出去的cpu数量
- val assignedCores = new Array[Int](numUsable)
- //创建一个空数组,存储了每个worker要创建的executor的数量(资源足够或者spark.executor.cores未设置的情况下,可以在一个worker上启动多个executor)
- val assignedExecutors = new Array[Int](numUsable)
- //获取到底要分配多少cpu,取app剩余要分配的cpu数量和worker总共可用cpu数量的最小值
- var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
-
- // 内部方法,用于判断worker的资源是否符合发起executor创建
- def canLaunchExecutorForApp(pos: Int): Boolean = {
- val keepScheduling = coresToAssign >= minCoresPerExecutor
- val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
- val assignedExecutorNum = assignedExecutors(pos)
-
- val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0
- if (launchingNewExecutor) {
- val assignedMemory = assignedExecutorNum * memoryPerExecutor
- val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
- val assignedResources = resourceReqsPerExecutor.map {
- req => req.resourceName -> req.amount * assignedExecutorNum
- }.toMap
- val resourcesFree = usableWorkers(pos).resourcesAmountFree.map {
- case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))
- }
- val enoughResources = ResourceUtils.resourcesMeetRequirements(
- resourcesFree, resourceReqsPerExecutor)
- val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
- keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit
- } else {
- keepScheduling && enoughCores
- }
- }
-
- // 获取可以符合executor构建条件的workers
- var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)
- while (freeWorkers.nonEmpty) {
- //遍历worker进行分配
- freeWorkers.foreach { pos =>
- var keepScheduling = true
- while (keepScheduling && canLaunchExecutorForApp(pos)) {
- //待分配cpu数减去executor最小分配cpu数
- coresToAssign -= minCoresPerExecutor
- //记录当前worker分配出去的cpu数
- assignedCores(pos) += minCoresPerExecutor
-
- // 如果没设置executor需要的cpu数量,默认executor需要的cpu数为1,且可以再一个worker上启动该app的多个executor,该worker消耗总cpu数仍是1
- if (oneExecutorPerWorker) {
- assignedExecutors(pos) = 1
- } else {
- assignedExecutors(pos) += 1
- }
-
- //spreadOutApps调度模式一次只在一个worker上创建executor。非spreadOutApps调度模式,会尽可能在一个worker上启动所有的executor,直到资源不够再结束。
- if (spreadOutApps) {
- keepScheduling = false
- }
- }
- }
- //分配一轮后,再次过滤可以接着进行分配的worker
- freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)
- }
- //返回每个worker分配的core数量
- assignedCores
- }

这里其实判断worker资源能否符合executor创建的逻辑是比较复杂和重要的,但是由于这块跟我们主线关联不是特别大,且逻辑不易梳理,所以这块并没有详细介绍,后面有时间在补上。
经过上述步骤我们拿到了每个worker要分配的cpu数,接下来我们看下如何发起executor构建事件。
重点二:向worker发送对应的LaunchExecutor事件
通过上一步计算出了每个worker要分配出去的cpu数,这里接着遍历可用worker,如果该worker需要分配的cpu数大于0,则向该worker发送对应的LaunchExecutor事件。源码如下:
- private def allocateWorkerResourceToExecutors(
- app: ApplicationInfo,
- assignedCores: Int,
- coresPerExecutor: Option[Int],
- worker: WorkerInfo): Unit = {
- // 根据当前worker要分配的cpu总核数和每个executor需要的cpu数,计算出当前worker要分配的executor数
- val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
- // 获取当前worker要分配的cpu数
- val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
- //循环创建指定数目的executor
- for (i <- 1 to numExecutors) {
- //从worker获取指定资源
- val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor)
- //创建ExecutorDesc对象,并将其记录到application中
- val exec = app.addExecutor(worker, coresToAssign, allocated)
- //重点:发起executor的创建
- launchExecutor(worker, exec)
- //修改application的状态为running
- app.state = ApplicationState.RUNNING
- }
- }

这块的逻辑主要是根据worker要分配的总cpu数和每个executor要分配的cpu数计算出要在该worker上创建的executor数据,随后封装executor的构建信息为ExecutorDesc对象,再通过launchExecutor进行处理。我们再深入看下:
- private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
- logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
- //将executor加入worker内部的缓存
- worker.addExecutor(exec)
- //重点一:通知Worker节点启动executor
- worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id,
- exec.application.desc, exec.cores, exec.memory, exec.resources))
- //重点二:发消息告诉该application对应Driver该worker上的executor已经启动
- exec.application.driver.send(
- ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
- }
这块代码量不多,但是涉及到了两个RPC节点的通信,我们分别看下:
重点一:通知Worker节点启动executor
worker节点接收了LaunchExecutor事件后如何处理,这个我们放到下一章将,因为除了对executor如何构建感兴趣外,我们还想看看executor到底是个什么东西。所以这块准备单独开个文章讲。
重点二:将executor信息注册到Driver上
这里将executor注册进driver,在我们的WordCount案例中,因为我们是以client模式提交任务,所以实际上是将事件发送到TaskScheduler启动阶段创建的StandaloneAppClient节点上,下面我们到该节点对象上看下其接收后的处理逻辑:
- case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
- val fullId = appId + "/" + id
- logInfo("Executor added: %s on %s (%s) with %d core(s)".format(fullId, workerId, hostPort,
- cores))
- listener.executorAdded(fullId, workerId, hostPort, cores, memory)
这里的listenr是一个接口,我们这里直接看它的实现类方法:
- override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
- memory: Int): Unit = {
- logInfo("Granted executor ID %s on hostPort %s with %d core(s), %s RAM".format(
- fullId, hostPort, cores, Utils.megabytesToString(memory)))
- }
可以看到driver接收到executor之后没有任何记录,那么它后续怎么下发task任务给executor呢?这里说实话我也不知道,先留个悬念,再讲解job、stage、task的任务切割时,我们再结合这里的代码好好理一下。同时也对这的问题进行解答。
临时解答一:CoarseGrainedExecutorBackend创建Executor即将完成时会向CoarseGrainedSchedulerBackend(StandaloneSchedulerBackend继承了它)发送RegisterExecutor事件,这个时候会存储executor的相关信息。所以driver中实际上是记录了executor的信息的,至于最后是不是使用的这些信息,我们还不得而知,这个在我们阅读task任务源码时再进行补充解答。
最终解答:TaskScheduler(TaskScheduler是泛成,具体是CoarseGrainedSchedulerBackend对象)向executor提交task任务时,会从executorDataMap集合中选取全部活跃的executor,随后再经过一系列的判断进而确定所用的executor。这里的executorDataMap则是在“临时解答一”中介绍的RegisterExecutor事件里被注入executor的对象信息。
本篇文章的内容到这里就算结束了,下面是一些上述内容的总结:
1、在master节点上,它认为的driver RPC节点实际上是在TaskScheduler启动阶段生成的StandaloneAppClient。
2、driver的调度优先于executor的创建(cluster部署模式driver需要调度,client模式则不需要)
3、每个worker默认仅运行一个driver在它上面
4、executor的启动信息存储在application中
5、如果application需要的core大于等于单个executor需要的core,则发起新executor的构建,如果application需要的cpu数据,小于executor构建需要的cpu数,则不在进行新executor的构建(节省资源,个人推测)
6、计算worker分配cpu数的算法有两种,一种是spreadOutApps:遍历worker,一次只在worker上分配一个executor,另一种是非spreadOutApps:尽量在一个worker启动所有executor,直到分配结束或者资源不够。
7、如果没设置executor需要的cpu数量,默认executor需要的cpu数为1,且可以再一个worker上启动该app的多个executor,该worker消耗总cpu数仍是1。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。