当前位置:   article > 正文

Spark内核之YARN Cluster模式源码详解(Submit详解)_spark-yarn cluster

spark-yarn cluster

目录

一、YARN Cluster 模式图解

二、Yarn cluster运行机制源码分析

1.SparkSubmit

2. yarn.YarnClusterApplication

3. ApplicationMaster

4.  注册ApplicationMaster并申请资源启动ExecutorBackend

5. CoarseGrainedExecutorBackend

6.  总结

7.  Spark通用运行流程概述



由于在实际工厂环境下使用的绝大多数的集群管理器是Hadoop YARN,因此我们关注的重点是Hadoop YARN模式下的Spark集群部署的YARN Cluste模式源码详解。

一、YARN Cluster 模式图解

  1. 执行脚本提交任务,实际是启动一个SparkSubmitJVM进程;
  2. SparkSubmit类中的main方法反射调用YarnClusterApplication的main方法;
  3. YarnClusterApplication创建Yarn客户端,然后向Yarn服务器发送执行指令:bin/java ApplicationMaster
  4. Yarn框架收到指令后会在指定的NM中启动ApplicationMaster
  5. ApplicationMaster启动Driver线程,执行用户的作业;
  6. AMRM注册,申请资源;
  7. 获取资源后AMNM发送指令:bin/java YarnCoarseGrainedExecutorBackend
  8. CoarseGrainedExecutorBackend进程会接收消息,跟Driver通信,注册已经启动的Executor;然后启动计算对象Executor等待接收任务
  9. Driver线程继续执行完成作业的调度和任务的执行。
  10. Driver分配任务并监控任务的执行。
  11. 注意:SparkSubmitApplicationMasterCoarseGrainedExecutorBackend是独立的进程;
  12. Driver是独立的线程;
  13. ExecutorYarnClusterApplication是对象。

二、Yarn cluster运行机制源码分析

1.SparkSubmit

◼  概述

  1. org.apache.spark.deploy.SparkSubmit
  2. --主要讲述job提交应用以后,环境的准备工作。主要包含以下:
  3. 1. spark向yarn提交job的过程
  4. 2. yarn中application、driver、executor、container是如何相互响应

◼  提交应用

  1. bin/spark-submit \
  2. --class org.apache.spark.examples.SparkPi \
  3. --master yarn \
  4. --deploymode cluster \ 表示yarn的集群模式
  5. ./examples/jars/spark-examples_2.12-2.4.5.jar \
  6. 10
  7. -- 说明:
  8. --master yarn 默认是采用yarn的客户端模式,但是在实际过程中,我们都是使用yarn的集群模式。
  9. 所以增加:--deploymode cluster \

◼  源码解析

  1. org.apache.spark.deploy.SparkSubmit
  2. 点击Submit中的main方法
  3. <****************************************main******************************************>
  4. override def main(args: Array[String]): Unit = {
  5. val submit = new SparkSubmit() {
  6. .......
  7. submit.doSubmit(args) -->执行提交程序,点击doSubmit
  8. }
  9. <**************************************doSubmit*****************************************>
  10. def doSubmit(args: Array[String]): Unit = {
  11. ............
  12. //解析参数,便于处理
  13. ①、 val appArgs = parseArguments(args) -->解析参数,解析应用提交的参数,点击parseArguments
  14. a、parse(args.asJava) -->具体进行参数的解析,点击parse,返回参数的解析,方法的内部调用了handle方法
  15. action = Option(action).getOrElse(SUBMIT),-->默认值为submit
  16. b、handle(opt: String, value: String) -->opt:参数的名称,value:参数的值。
  17. 左边是参数 => 右边是赋值的变量
  18. // --master yarn => master
  19. // --deploy-mode cluster => deployMode
  20. // --class SparkPI(WordCount) => 【mainClass】
  21. "如上为解析参数"
  22. ......
  23. // 如果没有指定 action, 则 action 的默认值是: action = Option(action).getOrElse(SUBMIT)
  24. appArgs.action match {
  25. case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
  26. case SparkSubmitAction.KILL => kill(appArgs)
  27. case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
  28. case SparkSubmitAction.PRINT_VERSION => printVersion()
  29. }
  30. }
  31. <**************************************Submit*****************************************>
  32. private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
  33. def doRunMain(): Unit = {
  34. ........
  35. runMain(args, uninitLog)
  36. ........
  37. }
  38. if (args.isStandaloneCluster && args.useRest) {
  39. .......
  40. } else {
  41. doRunMain()
  42. }
  43. }
  44. <*************************************doRunMain*****************************************>
  45. def doRunMain(): Unit = {
  46. ......
  47. runMain(args, uninitLog) -- 运行主程序,在runmain()方法中
  48. .......
  49. }
  50. <*************************************RunMain*****************************************>
  51. private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
  52. //1.准备提交环境
  53. val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
  54. .............
  55. var mainClass: Class[_] = null
  56. try {
  57. //2.通过类名加载这个类,'反射的方式'
  58. mainClass = Utils.classForName(childMainClass)
  59. ..........
  60. //3.创建第3步类的实例,并将类型转换为SparkApplication
  61. val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
  62. mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
  63. } else {
  64. new JavaMainApplication(mainClass)
  65. }
  66. ............
  67. try {
  68. app.start(childArgs.toArray, sparkConf)
  69. .............
  70. }
  71. <*************************************start*****************************************>
  72. //此方法是一个抽象方法,找到实现类YarnClusterApplication的start方法
  73. private[spark] trait SparkApplication {
  74. def start(args: Array[String], conf: SparkConf): Unit
  75. }

2. yarn.YarnClusterApplication

  1. --作用:
  2. 1. 调用YarnClusterApplication的start方法,创建yarn的resourcemanagerClient,RM的客户端
  3. 2. 执行RM客户端执行run方法
  4. 3. 在run方法中,启动一个应用程序application,也就是一个进程,并提交应用程序,则会执行这个进程的main方法。
  1. <*************************************start*****************************************>
  2. private[spark] class YarnClusterApplication extends SparkApplication {
  3. override def start(args: Array[String], conf: SparkConf): Unit = {
  4. new Client(new ClientArguments(args), conf, null).run()
  5. new ClientArguments(args),是配置参数的封装
  6. new Client,在client类中的属性有:
  7. --val yarnClient = YarnClient.createYarnClient,点击createYarnClient方法,在这个方法中:
  8. -- YarnClient client = new YarnClientImpl(),点击YarnClientImpl类,在类中有一个属性
  9. rmclient:resourcemanagerClient
  10. -- protected ApplicationClientProtocol rmClient
  11. "如上就是创建RM客户端对象",接下来执行run方法
  12. run(),RM客户端对象执行run方法,点击run,在run方法的内部:
  13. }
  14. <*************************************run*****************************************>
  15. def run(): Unit = {
  16. //1. 提交应用,返回应用的id。点击submitApplication(),查看具体提交的过程
  17. this.appId = submitApplication()
  18. .................
  19. }
  20. <**********************************submitApplication************************************>
  21. def submitApplication(): ApplicationId = {
  22. .........
  23. //1. 初始化hadoop的环境
  24. yarnClient.init(hadoopConf)
  25. // 2. 启动yarn客户端,与yarn之间进行连接
  26. yarnClient.start()
  27. ...............
  28. // 3. yarn客户端创建一个应用application
  29. val newApp = yarnClient.createApplication()
  30. val newAppResponse = newApp.getNewApplicationResponse()
  31. //4. 获取应用的id,在yarn应用程序中,每一个应用都是有唯一的应用id
  32. appId = newAppResponse.getApplicationId()
  33. ...............
  34. //5. 提交yarn应用程序,提交的是什么呢?
  35. --yarnClient.submitApplication(appContext),点击appContext
  36. --// Set up the appropriate contexts to launch our AM
  37. 配置java虚拟机的启动参数,点击createContainerLaunchContext,在这个方法的内部进行了command的封装:, 【集群模式】command = bin/java org.apache.spark.deploy.yarn.ApplicationMaster, 【client模式】command = bin/java org.apache.spark.deploy.yarn.ExecutorLauncher
  38. --val containerContext = createContainerLaunchContext(newAppResponse)基本参数配置的封装
  39. --val appContext = createApplicationSubmissionContext(newApp,containerContext)
  40. }
  41. //命令的封装
  42. <******************************createContainerLaunchContext*****************************>
  43. private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
  44. : ContainerLaunchContext = {
  45. ...........
  46. val javaOpts = ListBuffer[String]()
  47. ............
  48. // Add Xmx for AM memory
  49. javaOpts += "-Xmx" + amMemory + "m"
  50. .............
  51. javaOpts += "-Djava.io.tmpdir=" + tmpDir
  52. ..............
  53. // In our expts, using (default) throughput collector has severe perf ramifications in
  54. // multi-tenant machines
  55. javaOpts += "-XX:+UseConcMarkSweepGC"
  56. javaOpts += "-XX:MaxTenuringThreshold=31"
  57. javaOpts += "-XX:SurvivorRatio=8"
  58. javaOpts += "-XX:+CMSIncrementalMode"
  59. javaOpts += "-XX:+CMSIncrementalPacing"
  60. javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
  61. javaOpts += "-XX:CMSIncrementalDutyCycle=10"
  62. }
  63. ....................
  64. // TODO: it would be nicer to just make sure there are no null commands here
  65. val printableCommands = commands.map(s => if (s == null) "null" else s).toList
  66. amContainer.setCommands(printableCommands.asJava)
  67. }

3. ApplicationMaster

  1. org.apache.spark.deploy.yarn.ApplicationMaster
  2. -- 作用
  3. 1. 封装ApplicationMaster的参数
  4. 2. 根据参数,创建ApplicationMaster对象
  5. 3. 执行ApplicationMaster的run方法,在run方法中,最后调用到runDriver方法,在这个方法中:
  6. a、启动用户的应用,并返回这个应用的"线程",具体实现如下:
  7. a、启动用户提交的应用程序;
  8. b、在ApplicationMaster中创建一个线程,线程的名称就是"Driver"
  9. c、启动这个线程,并执行run方法,在run方法中,就是执行我们提交的应用程序类的main方法
  10. d、返回这个"Driver"线程
  11. b、 执行一个方法,用于返回"sparkContext"的对象,如果没有返回,就不会执行下面的代码,当返回了这个上下文的对象以后:
  12. c、 ApplicationMaster通过ApplicationMaste的客户端,向ResourceManager注册自己,并申请资源
  13. d、 分配资源,具体实现如下:
  14. a、在ResourceManager端获取一个ApplicationMaster的客户端,返回一个分配器
  15. b、分配器进行资源的分配:
  16. a、ApplicationMaster的客户端申请一个分配器响应
  17. b、分配器响应返回所有被分配的容器container(资源列表)给到ApplicationMaster
  18. c、如果分配的资源列表的数量大于0,则对容器进行处理,处理的方式为:
  19. 1.AM内部会创建一个线程,并调用线程的run方法,在run方法中循环遍历RM返回的可用容器,然后进行
  20. 对每个容器进行匹配,此时涉及到首选位置,根据请求匹配选择哪些容器.首选位置的选择规则见首选位置说明。
  21. 2. 运行匹配后的资源,挨个遍历可用的容器,如果运行执行器的数量小于目标执行器的数量"假如需要4个执行
  22. 器,即为目标执行器,此时已经运行了2个执行器,即为运行执行器的数量,此时会启动下面的逻辑"
  23. 那么在这个容器中会创建一个线程池,一个线程池container对应一个ExecutorRunnable,并调用了这个对象的
  24. run方法,在这个线程池中,有一个nmClient(nameManagClient),说明AM能够找到NM,在这个run方法中,创建
  25. NM的客户端,初始化NM,并启动容器container,在启动容器中,封装一个指令, command:/bin/java
  26. /org.apache.spark.executor.CoarseGrainedExecutorBackend,并且启动了这个指令,显然是一个进程
  27. CoarseGrainedExecutorBackend,粗粒度的执行器后台。

◼  Driver的启动

  1. <****************************************main*****************************************>
  2. def main(args: Array[String]): Unit = {
  3. //1) 封装参数
  4. val amArgs = new ApplicationMasterArguments(args)
  5. ......
  6. //2)创建ApplicationMaster的对象
  7. master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
  8. ......
  9. //3)执行run方法,点击run方法
  10. ugi.doAs(new PrivilegedExceptionAction[Unit]() {
  11. override def run(): Unit = System.exit(master.run())
  12. })
  13. }
  14. <****************************************run*****************************************>
  15. //启动Driver(用户所写的主类)
  16. final def run(): Int = {
  17. ......
  18. // 如果是集群模式,执行,点击runDriver
  19. if (isClusterMode) {
  20. runDriver()
  21. } else {
  22. // 如果是client模式,执行:
  23. runExecutorLauncher()
  24. }
  25. ......
  26. }
  27. <************************************如何启Driver***************************************>
  28. //因为Driver的起动是在ApplicationMaster内启动的,所以YARn Cluster的Driver位于NodeMannger,至于在哪一个NM,决定于ApplicationMaster,
  29. <**************************************runDriver***************************************>
  30. private def runDriver(): Unit = {
  31. ......
  32. //1. 启动用户的程序,返回一个线程,点击startUserApplication
  33. userClassThread = startUserApplication()
  34. ......
  35. }
  36. <*********************************启动用户的Application*********************************>
  37. <*********************************startUserApplication*********************************>
  38. private def startUserApplication(): Thread = {
  39. //解析用户
  40. var userArgs = args.userArgs
  41. ......
  42. //通过反射的方法获取加载了用户自己所写的main方法
  43. val mainMethod = userClassLoader.loadClass(args.userClass)
  44. .getMethod("main", classOf[Array[String]])
  45. //创建了新的线程,在线程力执行自己的main方法(执行操作在后面,当调用userThread.start()时运行)
  46. val userThread = new Thread {
  47. override def run(): Unit = {
  48. ........
  49. mainMethod.invoke(null, userArgs.toArray)
  50. ........
  51. }
  52. }
  53. //4. 设定线程的名字为driver,说明driver就是一个applicationMaster的一个线程
  54. userThread.setName("Driver") //Driver时一个线程名
  55. //5. 启动线程,执行线程的run方法,其实就是执行类userClass的main方法,userClass是哪个类呢?通过查到,就是我们提交应用的--class,sparkpi,或者是我们自定的类
  56. //实际执行上面创建的线程的run方法
  57. userThread.start()
  58. //返回用户线程
  59. userThread
  60. }
  61. **用户所写的类为Driver,是一个线程名,Driver是一个线程。

4.  注册ApplicationMaster并申请资源启动ExecutorBackend

  1. private def runDriver(): Unit = {
  2. //此处是根据用户类线程启动Driver
  3. userClassThread = startUserApplication()
  4. .............
  5. try {
  6. if (sc != null) {
  7. val rpcEnv = sc.env.rpcEnv
  8. val userConf = sc.getConf
  9. val host = userConf.get(DRIVER_HOST_ADDRESS)
  10. val port = userConf.get(DRIVER_PORT)
  11. //5.注册AM
  12. //ApplicationMaster向ResouceMannager注册
  13. registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
  14. //6.申请资源
  15. //此处放driverRef(Driver引用),是因为当ExcutorBackend启动后要向Driver进行注册
  16. createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
  17. ............
  18. }
  19. <*********************************createAllocator*********************************>
  20. private def createAllocator(, , , , ): Unit = {
  21. ........
  22. // 1.AM的客户端,'在RM端',创建分配器,返回一个分配器
  23. allocator = client.createAllocator( , , , , , , )
  24. .........
  25. 2.获取分配资源,分配器分配资源,点击allocateResources
  26. allocator.allocateResources()
  27. .........
  28. }
  29. <*********************************allocateResources*********************************>
  30. //7.处理申请到的资源
  31. def allocateResources(): Unit = synchronized {
  32. ............
  33. // 1.AM的客户端,申请一个分配响应
  34. val allocateResponse = amClient.allocate(progressIndicator)
  35. // 2.分配器响应获取所有被分配的容器container(资源列表)
  36. val allocatedContainers = allocateResponse.getAllocatedContainers()
  37. ........
  38. // 3.如果可分配的容器数量大于0,则调用处理可用容器的方法,点击handle方法
  39. if (allocatedContainers.size > 0) {
  40. .........
  41. //处理已经分配到的资源
  42. handleAllocatedContainers(allocatedContainers.asScala)
  43. }
  44. ........
  45. }
  46. <*****************************handleAllocatedContainers*****************************>
  47. // 1.内部会创建一个线程,并调用线程的run方法,在run方法中循环遍历RM返回的可用容器,然后进行对每个容器进行匹配,此时涉及到首选位置,根据请求匹配选择哪些容器.首选位置的选择规则见首选位置说明。
  48. def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
  49. val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
  50. ........
  51. // Match incoming requests by host
  52. val remainingAfterHostMatches = new ArrayBuffer[Container]
  53. .......
  54. //2.运行已经分配到资源(封装好的container)。实则启动Excutorbackend进程
  55. //启动Container
  56. runAllocatedContainers(containersToUse)
  57. .......
  58. }
  59. <*****************************runAllocatedContainers*****************************>
  60. private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
  61. // 1. 挨个遍历可用的容器资源
  62. for (container <- containersToUse) {
  63. executorIdCounter += 1
  64. ...........
  65. // 2. 每个容器中,如果运行执行器的数量小于目标执行器的数量,执行如下代码
  66. if (runningExecutors.size() < targetNumExecutors) {
  67. numExecutorsStarting.incrementAndGet()
  68. ........
  69. // 3. 线程池,在线程池的内部有:
  70. --launcherPool.execute(new Runnable
  71. // 1.执行的池子是一个线程池
  72. --launcherPool = ThreadUtils.newDaemonCachedThreadPool
  73. // 2.一个线程container对应一个ExecutorRunnable,并调用了这个对象的run方法
  74. --new ExecutorRunnable...run()
  75. }
  76. <*******************************************run****************************************>
  77. //run方法在ExecutorRunnable中:说明AM能够找到NM
  78. //启动Excutor的资源容器,是由ApplicationMaster与NM进行通信启动的(通过client建立通信)
  79. def run(): Unit = {
  80. // 创建NM的客户端
  81. nmClient = NMClient.createNMClient()
  82. // 初始化NM
  83. nmClient.init(conf)
  84. // 启动NM
  85. nmClient.start()
  86. // 启动容器,点击--startContainer()
  87. startContainer()
  88. }
  89. <*************************************startContainer**********************************>
  90. def startContainer(): java.util.Map[String, ByteBuffer] = {
  91. ........
  92. //封装启动Container的命令
  93. val commands = prepareCommand()
  94. //将封装好的指令传递到参数中
  95. ctx.setCommands(commands.asJava)
  96. ........
  97. //告诉NM启动EXcutorbackend
  98. //AM向NM提交申请,启动YarnCoarseGrainedExecutorBackend-->粗粒度的执行器后台,是一个进程
  99. nmClient.startContainer(container.get, ctx)
  100. }
  101. }
  102. <*************************************prepareCommand**********************************>
  103. //封装命令
  104. private def prepareCommand(): List[String] = {
  105. // Extra options for the JVM
  106. val javaOpts = ListBuffer[String]()
  107. // Set the JVM memory
  108. val executorMemoryString = executorMemory + "m"
  109. javaOpts += "-Xmx" + executorMemoryString
  110. ......................
  111. val commands = prefixEnv ++
  112. Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
  113. javaOpts ++
  114. Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
  115. "--driver-url", masterAddress,
  116. "--executor-id", executorId,
  117. "--hostname", hostname,
  118. "--cores", executorCores.toString,
  119. "--app-id", appId,
  120. "--resourceProfileId", resourceProfileId.toString) ++
  121. userClassPath ++
  122. ..................
  123. }
  124. //封装好以后执行startContainer中的 nmClient.startContainer(container.get, ctx)

◼ 首选位置说明

  1. -- 首选位置说明
  2.         --1. 移动数据不如移动计算。 
  3.         --2. 首选位置:有多个,和本地化级别有关。
  4.         --3. 本地化级别:将数据和计算所在的位置称之为本地化
  5.                1. 计算和数据在同一个Executor中,称之进程本地化
  6.                2. 计算和数据在同一个节点中,称之节点本地化
  7.                3. 计算和数据在同一个机架中,称之机架本地化
  8.                4. 任意

5. CoarseGrainedExecutorBackend

  1. org.apache.spark.executor.CoarseGrainedExecutorBackend:通信后台
  2. 执行一次bin/java就会执行一个新的进程,则是属于并行执行的感觉,和之前执行的内容是分开的。类似我们在Windows中开了一个微信和qq程序一样,各自执行,互不影响。
  3. - 作用:
  4. 执行CoarseGrainedExecutorBackend"执行器后台"的main方法,在main方法中:
  5. 1. 首先封装一些参数
  6. 2. 执行run方法,在run方法中:
  7. 1. 通过driver的URI,使得CoarseGrainedExecutorBackendDriver进行关联
  8. 2. 通过通信环境创建了一个终端,名字为executor,创建一个CoarseGrainedExecutorBackend对象并调用onstart方法:
  9. 1. 获取driver的引用
  10. 2. ExecutorBackend向driver发送消息,注册executor的消息,也称之为反向注册
  11. 3. 在driver端会接收到这个消息,通过executor的引用,发送消息给到ExecutorBackend,注册executor成功
  12. 4. ExecutorBackend接收driver返回的executor注册成功的消息,
  13. -- 说明:
  14. executor是一个计算对象,在这个对象里面有一个线程池,每一个线程来处理一个从driver端发送过来的任务
  1. //YarnCoarseGrainedExecutorBackend 就是通过调用CoarseGrainedExecutorBackend的run方法
  2. private[spark] class YarnCoarseGrainedExecutorBackend(.......) {
  3. CoarseGrainedExecutorBackend.run(backendArgs, createFn)
  4. System.exit(0)
  5. }
  6. }
  7. <****************************************run*****************************************>
  8. //1.run方法
  9. def run(
  10. ........
  11. env.rpcEnv.setupEndpoint("Executor",
  12. backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
  13. arguments.workerUrl.foreach { url =>
  14. env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
  15. }
  16. env.rpcEnv.awaitTermination()
  17. }
  18. }
  19. <***************************************onStart****************************************>
  20. //2.onStart(反向注册)
  21. override def onStart(): Unit = {
  22. .......
  23. //3.3.获取Driver客户端对象并发送注册消息
  24. // This is a very fast action so we can use "ThreadUtils.sameThread"
  25. //获取driver的引用
  26. driver = Some(ref)
  27. //ExecutorBackend向driver发送消息,注册executor的消息,也称之为反向注册
  28. ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
  29. .......
  30. //在driver端会接收到这个消息,因为在driver端,有一个上下文的对象,sparkcontext,在这个类有一个属性: private var _schedulerBackend: SchedulerBackend = _,点击SchedulerBackend,是一个trait,找到实现类:CoarseGrainedSchedulerBackend,在这个类中,有一个方法:receiveAndReply(): ............
  31. case Success(_) =>
  32. //4.Driver回成功消息,则通知自己启动Executor
  33. self.send(RegisteredExecutor)
  34. // ExecutorBackend类中有一个recive方法,用来接收driver返回的executor注册成功的消息,executor是一个计算对象,在这个对象里面有一个线程池,每一个线程来处理一个从driver端发送过来的任务
  35. --executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
  36. }
  37. <**********************************RegisteredExecutor***********************************>
  38. override def receive: PartialFunction[Any, Unit] = {
  39. case RegisteredExecutor =>
  40. logInfo("Successfully registered with driver")
  41. try {
  42. //5.启动Executor
  43. //Executor负责执行任务
  44. executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
  45. resources = _resources)
  46. //6.向Driver发送消息,请求执行任务
  47. driver.get.send(LaunchedExecutor(executorId))
  48. ................
  49. //7.Driver对任务排序(FIFO,FAIR),如果任务不为空,则向ExecutorBackend发送启动任务消息,ExecutorBackend接收到该消息,则执行任务
  50. executor.launchTask(this, taskDesc)
  51. }
  52. <**********************************launchTask***********************************>
  53. def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
  54. //8.启动线程执行任务
  55. val tr = new TaskRunner(context, taskDescription)
  56. runningTasks.put(taskDescription.taskId, tr)
  57. //执行的进士tr(TaskRunner)里面的run方法
  58. threadPool.execute(tr)
  59. }
  60. <*****************************************run***************************************>
  61. //9.任务执行
  62. override def run(): Unit = {
  63. val value = Utils.tryWithSafeFinally {
  64. val res = task.run(
  65. taskAttemptId = taskId,
  66. attemptNumber = taskDescription.attemptNumber,
  67. metricsSystem = env.metricsSystem,
  68. resources = taskDescription.resources)
  69. threwException = false
  70. res
  71. }
  72. <***************************************task.run**************************************>
  73. //9.任务执行
  74. final def run( , , , , , ){
  75. try {
  76. runTask(context)
  77. } catch {
  78. }
  79. }
  80. <***************************************runTask**************************************>
  81. runtask是一个抽象类,其实现类ShuffleMapTaskResultTAsk,任务的提交也到此结束。

6.  总结

  1. -- 1. application是在一个nodemanager中container中,并且在这个container中创建了一个driver线程
  2. -- 2. 在一个nodemanager中,可以创建多个container,在每个container中,会创建ExecutorBackend对象,在这个对象中,会创建一个executor对象,在这个对象中一个线程池,一个线程用来处理driver发来的一个task,至于能同时执行多少个task,和executor中的core数量有关。
  3. -- 3. ApplicationMaster周旋于Driver和ResourceManager之间
  4. -- 4. spark有两个进程,也就是两个分支
  5. 创建RM的客户端,创建AM,在AM中,创建Driver的线程
  6. "分支1":此时会执行Driver线程的run方法,在run方法中就是执行了应用程序的main方法
  7. "分支2":构建SparkContext上下文的对象,再向RM注册AM,然后申请资源和返回可用的资源,最后Driver进行资源的选择,按照首选位置的原则。
  8. 所以如下图片有一个错误:资源满足以后才执行main方法,实际上是创建了driver线程,还没有申请资源就已经开始执行main方法了。
  9. -- 5. 进程、线程、对象
  10. "进程":SparkSubmit、ApplicationMaster和CoarseGrainedExecutorBackend
  11. "线程":Driver,但是我们一般称SparkContext称之为Driver
  12. "对象":Executor和YarnClusterApplication
  13. -- 6. client和cluster模式的区别:
  14. Driver的位置不同,其余的逻辑是一样的。
  15. Cluster:在集群中,在nodemanager中的AM对象中,是一个线程
  16. client:在集群之外

 

 

7.  Spark通用运行流程概述

  1. 1) 任务提交后,都会先启动Driver程序;
  2. 2) 随后Driver向集群管理器注册应用程序;
  3. 3) 之后集群管理器根据此任务的配置文件分配Executor并启动;
  4. 4) Driver开始执行main函数,Spark查询为懒执行,当执行到Action算子时开始反向推算,根据宽依赖进行Stage的划分,随后每一个Stage对应一个TasksetTaskset中有多个Task,查找可用资源Executor进行调度;
  5. 5) 根据本地化原则,Task会被分发到指定的Executor去执行,在任务执行的过程中,Executor也会不断与Driver进行通信,报告任务运行情况。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/789150
推荐阅读
相关标签
  

闽ICP备14008679号