当前位置:   article > 正文

spark源码(三)spark 如何进行driver、executor任务的调度,以及executor向driver的注册_spark driver注册

spark driver注册

上篇文章我们介绍到,SparkContext初始化的时候,在TaskScheduler启动阶段会向Master发送RegisterApplication事件,下面我们进入Master主类,从它接收到该事件后接着分析。

另外由于这里的代码不好远程debug,所以本篇文章的讲解不会涉及到debug过程,纯纯的是枯燥的

源码解析。

  1. case RegisterApplication(description, driver) =>
  2. // 如果当前master的状态是standby(备用的),也就是当前master不是active master,那么application来请求注册,什么都不会干
  3. if (state == RecoveryState.STANDBY) {
  4. } else {
  5. logInfo("Registering app " + description.name)
  6. //根据 Driver( TaskScheduler 启动阶段生成的RPC节点 )传递过来参数创建ApplicationInfo对象
  7. val app = createApplication(description, driver)
  8. //重点一:将application注册到master上
  9. registerApplication(app)
  10. logInfo("Registered app " + description.name + " with ID " + app.id)
  11. //将application信息持久化存储下来
  12. persistenceEngine.addApplication(app)
  13. //重点二:向Driver( TaskScheduler 启动阶段生成的RPC节点 )发送Application注册的响应
  14. driver.send(RegisteredApplication(app.id, self))
  15. //重点三:调用master的调度方法
  16. schedule()
  17. }

可以看到master接收到RegisterApplication事件之后,会先注册application的信息,随后响应driver,最后调用master的任务调度方法。下面我们分别看一下:

重点一:将application注册到master上

这一步没什么难点,主要是将application相关的信息登记到master的各种集合中:

  1. private def registerApplication(app: ApplicationInfo): Unit = {
  2. //获取driver地址信息
  3. val appAddress = app.driver.address
  4. if (addressToApp.contains(appAddress)) {
  5. logInfo("Attempted to re-register application at same address: " + appAddress)
  6. return
  7. }
  8. //在application的度量系统中记录该application的信息
  9. applicationMetricsSystem.registerSource(app.appSource)
  10. //将application添加到各种信息记录的集合中
  11. apps += app
  12. idToApp(app.id) = app
  13. endpointToApp(app.driver) = app
  14. addressToApp(appAddress) = app
  15. //将application添加到等待队列中(重点)
  16. waitingApps += app
  17. }

注册代码很少,主要留心下最后一步将application加入等待队列即可,加入等待队列后,后面就会被master进行调度处理。这块的具体处理我们再重点三中进行讲述。

重点二:向Driver( TaskScheduler 启动阶段生成的RPC节点 )发送Application注册的响应

在看源码前,可以先留心下master中的driver rpc节点是指什么,通过前面的文章我们可以知道driver节点实际是TaskScheduler 启动阶段生成的RPC节点,对于我们的案例来说,其实就是StandaloneAppClient对象节点。了解了这些概念,后面我们看源码也会清晰很多。

就比如说这里向Driver发送RegisteredApplication响应,接收事件的处理逻辑是什么,我们如果想知道,就必须知道这个Driver节点到底是什么。发送事件没什么好说的,这里我们直接看下StandaloneAppClient的响应逻辑

  1. case RegisteredApplication(appId_, masterRef) =>
  2. //记录appId
  3. appId.set(appId_)
  4. //标识app已被注册过
  5. registered.set(true)
  6. //存储master RPC节点的引用
  7. master = Some(masterRef)
  8. //通知LanchServer当前app的appid
  9. listener.connected(appId.get)

可以看到响应逻辑很简单,就是一些常见的标识修改,信息记录等,关键的是如何快速定位到所发送的RPC节点。本案例是StandaloneAppClient接收,如果是cluster部署,还是吗?答案肯定为不是。

重点三:调用master的调度方法(核心,重中之重)

master的schedule方法是master的核心方法,它为等待的Application安排当前可用的资源因此每次有新应用加入或资源可用性发生变化时,都会调用此方法。另外在阅读源码前,有几个小点留意下,这样更便于理解源码:

1)driver的调度一定要在executor之前,因为executor需要向driver创建过程中TaskScheduler注册自己的信息。

2)waitting driver遍历时,一开始会挑选ALIVE状态的worker并随机排列,随后则是依次查看worker是否可以启动driver。如果还有未启动的driver,且还有worker没有被遍历到则继续遍历worker尝试在其上启动driver。通过这个代码逻辑我们也可以看出来,每个worker默认是至多运行一个driver。

  1. private def schedule(): Unit = {
  2. //如果当前节点不是ALIVE活跃状态,则直接退出
  3. if (state != RecoveryState.ALIVE) {
  4. return
  5. }
  6. // 将spark集群中可用的worker随机打乱排列
  7. val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
  8. //记录活跃Worker的数量
  9. val numWorkersAlive = shuffledAliveWorkers.size
  10. var curPos = 0
  11. //遍历所有等待的driver,在资源足够时发起driver的启动构建
  12. //对于每个driver,我们依次遍历可用的worker,尝试在worker上发起driver的运行
  13. for (driver <- waitingDrivers.toList) {
  14. var launched = false
  15. var isClusterIdle = true
  16. //记录已经访问过的worker数
  17. var numWorkersVisited = 0
  18. //当还有剩余worker没有遍历到,且当前driver没有被发起时,进入while循环
  19. while (numWorkersVisited < numWorkersAlive && !launched) {
  20. //从可用worker列表中依据顺序取出下一个没有被访问到的worker
  21. val worker = shuffledAliveWorkers(curPos)
  22. //判断当前worker是不是driver和worker都没有启动过的空节点
  23. isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty
  24. //访问的worker记录数加1
  25. numWorkersVisited += 1
  26. //重点一:如果worker节点的资源(内存、cpu等)满足driver的发起条件,则尝试在该worker上启动driver
  27. if (canLaunchDriver(worker, driver.desc)) {
  28. val allocated = worker.acquireResources(driver.desc.resourceReqs)
  29. driver.withResources(allocated)
  30. //重点二:在该worker上发起driver运行
  31. launchDriver(worker, driver)
  32. //driver发起后,将该driver从等待调度的队列中移除
  33. waitingDrivers -= drive
  34. launched = true
  35. }
  36. //移动可用worker数组的下标
  37. curPos = (curPos + 1) % numWorkersAlive
  38. }
  39. //如果存在worker节点既没运行driver也没运行executor,但是资源仍不符合driver运行要求,说明driver需要的资源很多,我们需要调整所有worker的资源配给
  40. if (!launched && isClusterIdle) {
  41. logWarning(s"Driver ${driver.id} requires more resource than any of Workers could have.")
  42. }
  43. }
  44. //重点三:启动executor
  45. startExecutorsOnWorkers()
  46. }

schedule方法中又有三处比较关键的点,分别是driver启动资源的判断、driver调度的发起、worker调度的发起。下面分别看一下:

重点一:如果worker节点的资源(内存、cpu等)满足driver的发起条件,则尝试在该worker上启动driver

  1. private def canLaunchDriver(worker: WorkerInfo, desc: DriverDescription): Boolean = {
  2. canLaunch(worker, desc.mem, desc.cores, desc.resourceReqs)
  3. }
  4. private def canLaunch(
  5. worker: WorkerInfo,
  6. memoryReq: Int, // driver的内存需求
  7. coresReq: Int, //driver的cpu需求
  8. resourceRequirements: Seq[ResourceRequirement]) //driver的resource需求
  9. : Boolean = {
  10. val enoughMem = worker.memoryFree >= memoryReq
  11. val enoughCores = worker.coresFree >= coresReq
  12. val enoughResources = ResourceUtils.resourcesMeetRequirements(
  13. worker.resourcesAmountFree, resourceRequirements)
  14. enoughMem && enoughCores && enoughResources
  15. }

 可以看到资源判断逻辑很简单,就是判断worker的内存、cpu、resource数三个指标是否同时符合driver的启动要求。这里留意下memoryFree 、coresFree 、resourcesAmountFree三个变量名,在后续讲解executor具体创建时还会涉及该变量。

重点二:在该worker上发起driver运行

在worker上发起driver绝对是重中之重,虽然我们的WordCount案例走的是client模式,不会有driver的调度和运行发起。但是由于这块实在太核心,所以这里也介绍下:

  1. private def launchDriver(worker: WorkerInfo, driver: DriverInfo): Unit = {
  2. logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  3. //将driver加入worker内存的缓存结构
  4. //将worker内使用的内存和cpu数量,都加上driver需要的内存和cpu数量
  5. worker.addDriver(driver)
  6. //把worker加入到driver内部的缓存结构中
  7. driver.worker = Some(worker)
  8. //向worker发送LaunchDriver事件,让worker来启动driver
  9. worker.endpoint.send(LaunchDriver(driver.id, driver.desc, driver.resources))
  10. //将driver状态设置为RUNNING
  11. driver.state = DriverState.RUNNING
  12. }

可以看到driver的发起其实比较简单,主要是将worker和driver相互引用,更新worker的资源使用情况,然后通过RPC节点,向对应的worker节点发送LaunchDriver事件,接下来我们到worker上看下其接收到LaunchDriver事件后的处理逻辑:

  1. case LaunchDriver(driverId, driverDesc, resources_) =>
  2. logInfo(s"Asked to launch driver $driverId")
  3. //构建并启动DriverRunner线程(该线程会启动一个Driver进程)
  4. val driver = new DriverRunner(
  5. conf,
  6. driverId,
  7. workDir,
  8. sparkHome,
  9. driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
  10. self,
  11. workerUri,
  12. securityMgr,
  13. resources_)
  14. drivers(driverId) = driver
  15. //启动DriverRunner
  16. driver.start()
  17. //当前worker加上driver运行需要的内存和cpu等资源
  18. coresUsed += driverDesc.cores
  19. memoryUsed += driverDesc.mem
  20. addResourcesUsed(resources_)

我们接着看下DriverRunner的start方法:

  1. private[worker] def start() = {
  2. //启动一个java线程
  3. new Thread("DriverRunner for " + driverId) {
  4. override def run(): Unit = {
  5. //设置线程执行结束的钩子处理函数
  6. var shutdownHook: AnyRef = null
  7. try {
  8. shutdownHook = ShutdownHookManager.addShutdownHook { () =>
  9. logInfo(s"Worker shutting down, killing driver $driverId")
  10. kill()
  11. }
  12. //重点:准备driver需要的jar文件并运行driver
  13. val exitCode = prepareAndRunDriver()
  14. //对driver的状态做一些封装处理
  15. finalState = if (exitCode == 0) {
  16. Some(DriverState.FINISHED)
  17. } else if (killed) {
  18. Some(DriverState.KILLED)
  19. } else {
  20. Some(DriverState.FAILED)
  21. }
  22. } catch {
  23. case e: Exception =>
  24. kill()
  25. finalState = Some(DriverState.ERROR)
  26. finalException = Some(e)
  27. } finally {
  28. if (shutdownHook != null) {
  29. ShutdownHookManager.removeShutdownHook(shutdownHook)
  30. }
  31. }
  32. // 这个DriverRunner线程,向它所属的worker,发送一个DriverStateChanged事件
  33. worker.send(DriverStateChanged(driverId, finalState.get, finalException))
  34. }
  35. }.start()
  36. }

在driverRunner中会单独启动一个线程准备driver运行环境和运行driver,并将运行结果封装后发送给当前worker节点,接着我们在看下worker如何准备driver运行环境与发起driver运行的。

  1. private[worker] def prepareAndRunDriver(): Int = {
  2. //创建工作目录
  3. val driverDir = createWorkingDirectory()
  4. //下载用户上传的jar(我们编写完spark应用程序,打包成的jar)
  5. val localJarFilename = downloadUserJar(driverDir)
  6. //准备额外的资源文件
  7. val resourceFileOpt = prepareResourcesFile(SPARK_DRIVER_PREFIX, resources, driverDir)
  8. def substituteVariables(argument: String): String = argument match {
  9. case "{{WORKER_URL}}" => workerUrl
  10. case "{{USER_JAR}}" => localJarFilename
  11. case other => other
  12. }
  13. val javaOpts = driverDesc.command.javaOpts ++ resourceFileOpt.map(f =>
  14. Seq(s"-D${DRIVER_RESOURCES_FILE.key}=${f.getAbsolutePath}")).getOrElse(Seq.empty)
  15. //构建ProcessBuilder,传入了driver启动命令,需要的内存大小等信息
  16. val builder = CommandUtils.buildProcessBuilder(driverDesc.command.copy(javaOpts = javaOpts),
  17. securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
  18. //通过ProcessBuilder启动driver进程
  19. runDriver(builder, driverDir, driverDesc.supervise)
  20. }

可以看到worker会为了driver先创建对应的工作目录,随后通过命令调用的方式调度driver的运行,这里就不进一步的研究运行的命令具体是什么了,这个跟我们阅读源码的初衷偏离了很多。所以关于driver的启动暂时先介绍到这,我们回归主线,接着介绍worker创建任务的调度过程。

重点三:在worker上启动executor

还记得我们前一章讲解的executor构建命令拼装吗,其最后都封装在了application中,因此我们在创建启动executor的时候可以直接到application中获取相关的信息。

在worker上调度和启动executor的源码如下:

  1. private def startExecutorsOnWorkers(): Unit = {
  2. for (app <- waitingApps) {
  3. //获取每个executor的cpu需求数,默认是1
  4. val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
  5. // 如果app需要的core大于等于每个executor需要的core,则接着处理,如果app剩余需要的cpu数据,小于executor构建需要的cpu数,则不在进行新executor的构建(节省资源)
  6. if (app.coresLeft >= coresPerExecutor) {
  7. // 过滤出ALIVE状态且资源符合executor创建的worker节点
  8. val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
  9. .filter(canLaunchExecutor(_, app.desc))
  10. .sortBy(_.coresFree).reverse
  11. //如果只有一个app等待创建executor,但是仍没有可用worker创建executor,说明集群资源不够,打印warn警告
  12. val appMayHang = waitingApps.length == 1 &&
  13. waitingApps.head.executors.isEmpty && usableWorkers.isEmpty
  14. if (appMayHang) {
  15. logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
  16. }
  17. //重点一:计算每个worker要分配出去的cpu数,该方法返回的是一个数组
  18. val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
  19. //重点二:根据上一步计算的每个worker要分配出去的cpu数,遍历可用worker,如果该worker需要分配的cpu数大于0,则向该worker发送对应的LaunchExecutor事件
  20. for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
  21. allocateWorkerResourceToExecutors(
  22. app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
  23. }
  24. }
  25. }
  26. }

这里有一个比较绕的逻辑点,就是当application剩余需要的cpu数大于单个executor需要的cpu数据才进行新executor的创建,这一步的目的个人猜测是节省资源,要知道,如果application的剩余需求cpu小于单executor需要的cpu数,要么是已经存在了运行的executor,要么就是配置有问题,正常情况下很少出现application需求cpu数小于executor创建需求cpu数的情况。

整体来看,这块有两个重要的步骤,我们分别看下:

重点一:计算每个worker要分配出去的cpu数

计算worker分配cpu数的算法有两种,一种是spreadOutApps(默认):遍历worker,一次只在worker上分配一个executor,另一种是非spreadOutApps:尽量在一个worker启动所有executor,直到分配结束或者资源不够。下面我们看下详细代码:

  1. private def scheduleExecutorsOnWorkers(
  2. app: ApplicationInfo,
  3. usableWorkers: Array[WorkerInfo],
  4. spreadOutApps: Boolean): Array[Int] = {
  5. // 获取app中设置或者默认的每个executor需要的cpu和内存数
  6. val coresPerExecutor = app.desc.coresPerExecutor
  7. val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
  8. val oneExecutorPerWorker = coresPerExecutor.isEmpty
  9. val memoryPerExecutor = app.desc.memoryPerExecutorMB
  10. val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor
  11. //获取可用worker的数量
  12. val numUsable = usableWorkers.length
  13. //创建一个空数组,存储了每个worker要分配出去的cpu数量
  14. val assignedCores = new Array[Int](numUsable)
  15. //创建一个空数组,存储了每个worker要创建的executor的数量(资源足够或者spark.executor.cores未设置的情况下,可以在一个worker上启动多个executor)
  16. val assignedExecutors = new Array[Int](numUsable)
  17. //获取到底要分配多少cpu,取app剩余要分配的cpu数量和worker总共可用cpu数量的最小值
  18. var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
  19. // 内部方法,用于判断worker的资源是否符合发起executor创建
  20. def canLaunchExecutorForApp(pos: Int): Boolean = {
  21. val keepScheduling = coresToAssign >= minCoresPerExecutor
  22. val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
  23. val assignedExecutorNum = assignedExecutors(pos)
  24. val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0
  25. if (launchingNewExecutor) {
  26. val assignedMemory = assignedExecutorNum * memoryPerExecutor
  27. val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
  28. val assignedResources = resourceReqsPerExecutor.map {
  29. req => req.resourceName -> req.amount * assignedExecutorNum
  30. }.toMap
  31. val resourcesFree = usableWorkers(pos).resourcesAmountFree.map {
  32. case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))
  33. }
  34. val enoughResources = ResourceUtils.resourcesMeetRequirements(
  35. resourcesFree, resourceReqsPerExecutor)
  36. val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
  37. keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit
  38. } else {
  39. keepScheduling && enoughCores
  40. }
  41. }
  42. // 获取可以符合executor构建条件的workers
  43. var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)
  44. while (freeWorkers.nonEmpty) {
  45. //遍历worker进行分配
  46. freeWorkers.foreach { pos =>
  47. var keepScheduling = true
  48. while (keepScheduling && canLaunchExecutorForApp(pos)) {
  49. //待分配cpu数减去executor最小分配cpu数
  50. coresToAssign -= minCoresPerExecutor
  51. //记录当前worker分配出去的cpu数
  52. assignedCores(pos) += minCoresPerExecutor
  53. // 如果没设置executor需要的cpu数量,默认executor需要的cpu数为1,且可以再一个worker上启动该app的多个executor,该worker消耗总cpu数仍是1
  54. if (oneExecutorPerWorker) {
  55. assignedExecutors(pos) = 1
  56. } else {
  57. assignedExecutors(pos) += 1
  58. }
  59. //spreadOutApps调度模式一次只在一个worker上创建executor。非spreadOutApps调度模式,会尽可能在一个worker上启动所有的executor,直到资源不够再结束。
  60. if (spreadOutApps) {
  61. keepScheduling = false
  62. }
  63. }
  64. }
  65. //分配一轮后,再次过滤可以接着进行分配的worker
  66. freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)
  67. }
  68. //返回每个worker分配的core数量
  69. assignedCores
  70. }

这里其实判断worker资源能否符合executor创建的逻辑是比较复杂和重要的,但是由于这块跟我们主线关联不是特别大,且逻辑不易梳理,所以这块并没有详细介绍,后面有时间在补上。

经过上述步骤我们拿到了每个worker要分配的cpu数,接下来我们看下如何发起executor构建事件。

 重点二:向worker发送对应的LaunchExecutor事件

通过上一步计算出了每个worker要分配出去的cpu数,这里接着遍历可用worker,如果该worker需要分配的cpu数大于0,则向该worker发送对应的LaunchExecutor事件。源码如下:

  1. private def allocateWorkerResourceToExecutors(
  2. app: ApplicationInfo,
  3. assignedCores: Int,
  4. coresPerExecutor: Option[Int],
  5. worker: WorkerInfo): Unit = {
  6. // 根据当前worker要分配的cpu总核数和每个executor需要的cpu数,计算出当前worker要分配的executor数
  7. val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
  8. // 获取当前worker要分配的cpu数
  9. val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  10. //循环创建指定数目的executor
  11. for (i <- 1 to numExecutors) {
  12. //从worker获取指定资源
  13. val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor)
  14. //创建ExecutorDesc对象,并将其记录到application中
  15. val exec = app.addExecutor(worker, coresToAssign, allocated)
  16. //重点:发起executor的创建
  17. launchExecutor(worker, exec)
  18. //修改application的状态为running
  19. app.state = ApplicationState.RUNNING
  20. }
  21. }

这块的逻辑主要是根据worker要分配的总cpu数和每个executor要分配的cpu数计算出要在该worker上创建的executor数据,随后封装executor的构建信息为ExecutorDesc对象,再通过launchExecutor进行处理。我们再深入看下:

  1. private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  2. logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  3. //将executor加入worker内部的缓存
  4. worker.addExecutor(exec)
  5. //重点一:通知Worker节点启动executor
  6. worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id,
  7. exec.application.desc, exec.cores, exec.memory, exec.resources))
  8. //重点二:发消息告诉该application对应Driver该worker上的executor已经启动
  9. exec.application.driver.send(
  10. ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  11. }

这块代码量不多,但是涉及到了两个RPC节点的通信,我们分别看下:

重点一:通知Worker节点启动executor

worker节点接收了LaunchExecutor事件后如何处理,这个我们放到下一章将,因为除了对executor如何构建感兴趣外,我们还想看看executor到底是个什么东西。所以这块准备单独开个文章讲。

重点二:将executor信息注册到Driver上

这里将executor注册进driver,在我们的WordCount案例中,因为我们是以client模式提交任务,所以实际上是将事件发送到TaskScheduler启动阶段创建的StandaloneAppClient节点上,下面我们到该节点对象上看下其接收后的处理逻辑:

  1. case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
  2. val fullId = appId + "/" + id
  3. logInfo("Executor added: %s on %s (%s) with %d core(s)".format(fullId, workerId, hostPort,
  4. cores))
  5. listener.executorAdded(fullId, workerId, hostPort, cores, memory)

这里的listenr是一个接口,我们这里直接看它的实现类方法:

  1. override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
  2. memory: Int): Unit = {
  3. logInfo("Granted executor ID %s on hostPort %s with %d core(s), %s RAM".format(
  4. fullId, hostPort, cores, Utils.megabytesToString(memory)))
  5. }

可以看到driver接收到executor之后没有任何记录,那么它后续怎么下发task任务给executor呢?这里说实话我也不知道,先留个悬念,再讲解job、stage、task的任务切割时,我们再结合这里的代码好好理一下。同时也对这的问题进行解答。

临时解答一:CoarseGrainedExecutorBackend创建Executor即将完成时会向CoarseGrainedSchedulerBackend(StandaloneSchedulerBackend继承了它)发送RegisterExecutor事件,这个时候会存储executor的相关信息。所以driver中实际上是记录了executor的信息的,至于最后是不是使用的这些信息,我们还不得而知,这个在我们阅读task任务源码时再进行补充解答。

最终解答:TaskScheduler(TaskScheduler是泛成,具体是CoarseGrainedSchedulerBackend对象)向executor提交task任务时,会从executorDataMap集合中选取全部活跃的executor,随后再经过一系列的判断进而确定所用的executor。这里的executorDataMap则是在“临时解答一”中介绍的RegisterExecutor事件里被注入executor的对象信息。

本篇文章的内容到这里就算结束了,下面是一些上述内容的总结:

1、在master节点上,它认为的driver RPC节点实际上是在TaskScheduler启动阶段生成的StandaloneAppClient。

2、driver的调度优先于executor的创建(cluster部署模式driver需要调度,client模式则不需要)

3、每个worker默认仅运行一个driver在它上面

4、executor的启动信息存储在application中

5、如果application需要的core大于等于单个executor需要的core,则发起新executor的构建,如果application需要的cpu数据,小于executor构建需要的cpu数,则不在进行新executor的构建(节省资源,个人推测

6、计算worker分配cpu数的算法有两种,一种是spreadOutApps:遍历worker,一次只在worker上分配一个executor,另一种是非spreadOutApps:尽量在一个worker启动所有executor,直到分配结束或者资源不够

7、如果没设置executor需要的cpu数量,默认executor需要的cpu数为1,且可以再一个worker上启动该app的多个executor,该worker消耗总cpu数仍是1。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/265886
推荐阅读
相关标签
  

闽ICP备14008679号