当前位置:   article > 正文

Spark架构原理-Master资源调度算法原理剖析与源码分析_判断spark是master

判断spark是master

原文地址:https://blog.csdn.net/zhanglh046/article/details/78485783

Master是通过schedule方法进行资源调度,告知worker启动executor等。

一、schedule方法

  • 判断master状态,只有alive状态的master才可以进行资源调度,standby是不能够调度的
  • 将可用的worker节点打乱,这样有利于driver的均衡
  • 进行driver资源调度,遍历处于等待状态的driver队列,发起driver
  • 在worker上开启executor进程
  1. private def schedule(): Unit = {
  2. // 只有alive状态的master才可以进行资源调度,standby是不能够调度的
  3. if (state != RecoveryState.ALIVE) {
  4. return
  5. }
  6. // 将可用的worker节点打乱,这样有利于driver的均衡
  7. val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
  8. val numWorkersAlive = shuffledAliveWorkers.size
  9. var curPos = 0
  10. // 进行driver资源调度,遍历处于等待状态的driver队列
  11. for (driver <- waitingDrivers.toList) {
  12. var launched = false
  13. var numWorkersVisited= 0
  14. while (numWorkersVisited < numWorkersAlive && !launched) {
  15. // 获取worker
  16. val worker = shuffledAliveWorkers(curPos)
  17. // 记录worker访问数递增
  18. numWorkersVisited+= 1
  19. // 判断worker的可使用内存是否大于driver所需要的内存以及worker可使用cpu核数是否大于driver所需要的cpu核数
  20. if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
  21. // 满足条件发起driver
  22. launchDriver(worker, driver)
  23. // 将当前driver从等待队列中移除
  24. waitingDrivers-= driver
  25. // 标记该driver发起状态为true
  26. launched = true
  27. }
  28. // 将指针指向下一个worker,当然如果driver已经发起了,则为下一个准备发起下一个处于等待的driver
  29. curPos = (curPos + 1) % numWorkersAlive
  30. }
  31. }
  32. // 在worker上开启executor进程
  33. startExecutorsOnWorkers()
  34. }

二、startExecutorsOnWorkers 在worker上开启executor进程

  • 遍历处于等待状态的application,且处于等待的状态的application的所需要的cpu核数大于0
  • 得到每一个executor所需要的核数
  • 过滤出有效的可用worker,再从worker中过滤出worker剩余内存和CPU核数 不小于app对应executor所需要的内存和CPU核数,按照剩余的CPU核数反向排序worker
  • 在可用的worker上调度executor,启动executor有两种算法模式:
  1. 将应用程序尽可能多的分配到不同的worker上
  2. 和第一种相反,分配到尽可能少的worker上,通常用于计算密集型

每一个executor所需要的核数是可以配置的,一般来讲如果worker有足够的内存和CPU核数,同一个应用程序就可以在该worker启动多个executors;否则就不能再启动新的executor了,则需要到其他worker上去分配executor了

  • 在可用的worker上分配资源给executor
  1. private def startExecutorsOnWorkers(): Unit = {
  2. // 遍历处于等待状态的application,且处于等待的状态的application的所需要的cpu核数大于0
  3. // coresLeft=app请求的核数-已经分配给executor的核数的和
  4. for (app <- waitingApps if app.coresLeft > 0) {
  5. // 每一个executor所需要的核数
  6. val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
  7. // 过滤出有效的可用worker
  8. // 再从worker中过滤出worker剩余内存和CPU核数不小于app对应executor所需要的内存和CPU核数
  9. // 按照剩余的CPU核数反向排序woker
  10. val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
  11. .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
  12. worker.coresFree >= coresPerExecutor.getOrElse(1))
  13. .sortBy(_.coresFree).reverse
  14. // 在可用的worker上调度executor,启动executor有两种算法模式:
  15. // 一:将应用程序尽可能多的分配到不同的worker上
  16. // 二:和第一种相反,分配到尽可能少的worker上,通常用于计算密集型;
  17. // 每一个executor所需要的核数是可以配置的,一般来讲如果worker有足够的内存和CPU核数,同一个应用程序就可以
  18. // 在该worker启动多个executors;否则就不能再启动新的executor了,则需要到其他worker上去分配executor了
  19. val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
  20. // 在可用的worker上分配资源给executor
  21. for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
  22. allocateWorkerResourceToExecutors(
  23. app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
  24. }
  25. }
  26. }

三、scheduleExecutorsOnWorkers在每一个worker上调度资源

判断该worker能不能分配一个或者多个executor,能则分配相对应的executor所需要的CPU核数

  1. private def scheduleExecutorsOnWorkers(app: ApplicationInfo,
  2. usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = {
  3. // 如果我们指定executor需要分配的核数,coresPerExecutor表示executor所需要的cpu核数
  4. val coresPerExecutor = app.desc.coresPerExecutor
  5. // app中每个executor所需要的最小cpu核数,如果没有默认最小核数为1
  6. val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
  7. // 如果我们没有指定executor需要分配的核数,则一个worker上只能启动一个executor
  8. val oneExecutorPerWorker = coresPerExecutor.isEmpty
  9. // 每一个executor所需要的内存
  10. val memoryPerExecutor = app.desc.memoryPerExecutorMB
  11. // 获取可用worker数量
  12. val numUsable = usableWorkers.length
  13. // 构建一个可用worker长度的数组,用于存放每个worker节点分配到的cpu核数(16,16,16,16)
  14. val assignedCores = new Array[Int](numUsable)
  15. // 构建一个可用worker长度的数组,用于存放每一个worker上新分配的executor数量(1,2,1,0)
  16. val assignedExecutors = new Array[Int](numUsable)
  17. // 针对当前应用程序,还需要分配的cpu核数,它应该是application还需要的cpu核数和worker总共剩余核数之和中最小的
  18. // 防止超过当前可用的cpu核数
  19. var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
  20. // 判断我们是否可以为这个application在指定的worker上发起一个executor
  21. def canLaunchExecutor(pos: Int): Boolean = {
  22. // 判断当前需要分配的cpu核数是否大于或者等于每个executor所需要的cpu核数,比如总共只能分配8核,但是
  23. // 每个executor所需要的cpu核数是12,那么就不能发起executor了,因为资源不够用
  24. val keepScheduling = coresToAssign >= minCoresPerExecutor
  25. // 当前worker剩余的核数 - 应用程序分配到该worker上的核数是否满足发起一个executor,比如现在worker剩余核数16
  26. // 然后又给application他分配了12核,即还剩4核可用,但是启动一个executor需要12核,那么4 < 12 表示内核不足使用了
  27. val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
  28. // 如果我们允许每一个worker启动多个executor,然后我们可以启动一个新的executor
  29. // 否则如果worker已经启动一个新executor,只需要将更多的内核分配给该executor即可
  30. val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
  31. // 如果需要发起新的executor,既需要判断cpu核数是否足够,还需要判断 executor是否超过限制总数以及否内存是否足够
  32. if (launchingNewExecutor) {
  33. val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
  34. val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
  35. val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
  36. keepScheduling && enoughCores && enoughMemory && underLimit
  37. } else {
  38. // 否则只是对已经存在的executor添加cpu核数,没必要检查内存和executor限制
  39. keepScheduling && enoughCores
  40. }
  41. }
  42. // 过滤出那些可用的worker节点
  43. var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
  44. while (freeWorkers.nonEmpty) {
  45. // 遍历每一个空闲的worker
  46. freeWorkers.foreach { pos =>
  47. var keepScheduling = true
  48. // 检测当前worker是否能够发起executor
  49. while (keepScheduling && canLaunchExecutor(pos)) {
  50. // 需要分配的核数减去每个executor所需要的最小核数
  51. coresToAssign -= minCoresPerExecutor
  52. // 对应的worker节点需要分配的cpu核数加上要启动该executor所需要的最小CPU核数
  53. assignedCores(pos) += minCoresPerExecutor
  54. // 如果每一个worker只允许启动一个executor,那么该worker启动的executor数量只能是1,否则应该加一个
  55. if (oneExecutorPerWorker) {
  56. assignedExecutors(pos) = 1
  57. } else {
  58. assignedExecutors(pos) += 1
  59. }
  60. // 如果需要将executor分配到更多的worker,那么就不再从当前worker节点继续分配,而是从下一个worker上继续分配
  61. if (spreadOutApps) {
  62. keepScheduling = false
  63. }
  64. }
  65. }
  66. // 因为进行了一次分配,需要再次从可用的worker节点中过滤可用的worker节点
  67. freeWorkers = freeWorkers.filter(canLaunchExecutor)
  68. }
  69. assignedCores
  70. }

四、allocateWorkerResourceToExecutors在worker上分配具体的资源

  • 获取该worker应该有多少个executor
  • 获取每一个executor应该分配的核数,如果没有指定则使用计算的应该分配的核数
  • 向worker上添加executor,创建ExecutorDesc对象,更新application已经分配到的cpu核数
  • 启动executor
  • 更新application的状态
  1. private def allocateWorkerResourceToExecutors(app: ApplicationInfo, assignedCores: Int,
  2. coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = {
  3. // 获取该worker应该有多少个executor
  4. val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
  5. // 获取每一个executor应该分配的核数,如果没有指定则使用计算的应该分配的核数
  6. val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  7. for (i <- 1 to numExecutors) {
  8. // 向worker上添加executor,创建ExecutorDesc对象,更新application已经分配到的cpu核数
  9. val exec = app.addExecutor(worker, coresToAssign)
  10. // 启动executor
  11. launchExecutor(worker, exec)
  12. // 更新application的状态
  13. app.state = ApplicationState.RUNNING
  14. }
  15. }

五、launchDriver 发起driver

  1. private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  2. logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  3. // worker添加driver
  4. worker.addDriver(driver)
  5. driver.worker = Some(worker)
  6. // 向worker发送LaunchDriver消息
  7. worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
  8. // 更新driver状态为RUNNING
  9. driver.state = DriverState.RUNNING
  10. }

六、launchExecutor发起executor

  1. private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  2. logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  3. // worker启动executor,并且更新worker的cpu和内存信息
  4. worker.addExecutor(exec)
  5. // 向worker发送LaunchExecutor消息
  6. worker.endpoint.send(LaunchExecutor(masterUrl,
  7. exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  8. // 向application发送ExecutorAdded消息
  9. exec.application.driver.send(
  10. ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  11. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/265882
推荐阅读
相关标签
  

闽ICP备14008679号