赞
踩
目录
2. yarn.YarnClusterApplication
4. 注册ApplicationMaster并申请资源启动ExecutorBackend
5. CoarseGrainedExecutorBackend
由于在实际工厂环境下使用的绝大多数的集群管理器是Hadoop YARN,因此我们关注的重点是Hadoop YARN模式下的Spark集群部署的YARN Cluste模式源码详解。
一、YARN Cluster
模式图解- 执行脚本提交任务,实际是启动一个SparkSubmit的JVM进程;
- SparkSubmit类中的main方法反射调用YarnClusterApplication的main方法;
- YarnClusterApplication创建Yarn客户端,然后向Yarn服务器发送执行指令:bin/java ApplicationMaster;
- Yarn框架收到指令后会在指定的NM中启动ApplicationMaster;
- ApplicationMaster启动Driver线程,执行用户的作业;
- AM向RM注册,申请资源;
- 获取资源后AM向NM发送指令:bin/java YarnCoarseGrainedExecutorBackend;
- CoarseGrainedExecutorBackend进程会接收消息,跟Driver通信,注册已经启动的Executor;然后启动计算对象Executor等待接收任务
- Driver线程继续执行完成作业的调度和任务的执行。
- Driver分配任务并监控任务的执行。
- 注意:SparkSubmit、ApplicationMaster和CoarseGrainedExecutorBackend是独立的进程;
- Driver是独立的线程;
- Executor和YarnClusterApplication是对象。
Yarn cluster
运行机制源码分析◼ 概述
- org.apache.spark.deploy.SparkSubmit
-
- --主要讲述job提交应用以后,环境的准备工作。主要包含以下:
- 1. spark向yarn提交job的过程
- 2. yarn中application、driver、executor、container是如何相互响应
◼ 提交应用
- bin/spark-submit \
- --class org.apache.spark.examples.SparkPi \
- --master yarn \
- --deploymode cluster \ 表示yarn的集群模式
- ./examples/jars/spark-examples_2.12-2.4.5.jar \
- 10
-
- -- 说明:
- --master yarn 默认是采用yarn的客户端模式,但是在实际过程中,我们都是使用yarn的集群模式。
- 所以增加:--deploymode cluster \
◼ 源码解析
- org.apache.spark.deploy.SparkSubmit
- 点击Submit中的main方法
- <****************************************main******************************************>
- override def main(args: Array[String]): Unit = {
- val submit = new SparkSubmit() {
- .......
- submit.doSubmit(args) -->执行提交程序,点击doSubmit
- }
- <**************************************doSubmit*****************************************>
- def doSubmit(args: Array[String]): Unit = {
- ............
- //解析参数,便于处理
- ①、 val appArgs = parseArguments(args) -->解析参数,解析应用提交的参数,点击parseArguments
- a、parse(args.asJava) -->具体进行参数的解析,点击parse,返回参数的解析,方法的内部调用了handle方法
- action = Option(action).getOrElse(SUBMIT),-->默认值为submit
- b、handle(opt: String, value: String) -->opt:参数的名称,value:参数的值。
- 左边是参数 => 右边是赋值的变量
- // --master yarn => master
- // --deploy-mode cluster => deployMode
- // --class SparkPI(WordCount) => 【mainClass】
-
- "如上为解析参数"
- ......
- // 如果没有指定 action, 则 action 的默认值是: action = Option(action).getOrElse(SUBMIT)
- appArgs.action match {
- case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
- case SparkSubmitAction.KILL => kill(appArgs)
- case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
- case SparkSubmitAction.PRINT_VERSION => printVersion()
- }
- }
- <**************************************Submit*****************************************>
- private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
-
- def doRunMain(): Unit = {
- ........
- runMain(args, uninitLog)
- ........
- }
- if (args.isStandaloneCluster && args.useRest) {
- .......
- } else {
- doRunMain()
- }
- }
-
- <*************************************doRunMain*****************************************>
- def doRunMain(): Unit = {
- ......
- runMain(args, uninitLog) -- 运行主程序,在runmain()方法中
- .......
- }
- <*************************************RunMain*****************************************>
- private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
- //1.准备提交环境
- val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
- .............
- var mainClass: Class[_] = null
- try {
- //2.通过类名加载这个类,'反射的方式'
- mainClass = Utils.classForName(childMainClass)
- ..........
- //3.创建第3步类的实例,并将类型转换为SparkApplication
- val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
- mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
- } else {
- new JavaMainApplication(mainClass)
- }
- ............
- try {
- app.start(childArgs.toArray, sparkConf)
- .............
- }
- <*************************************start*****************************************>
- //此方法是一个抽象方法,找到实现类YarnClusterApplication的start方法
- private[spark] trait SparkApplication {
- def start(args: Array[String], conf: SparkConf): Unit
- }
- --作用:
- 1. 调用YarnClusterApplication的start方法,创建yarn的resourcemanagerClient,RM的客户端
- 2. 执行RM客户端执行run方法
- 3. 在run方法中,启动一个应用程序application,也就是一个进程,并提交应用程序,则会执行这个进程的main方法。
- <*************************************start*****************************************>
- private[spark] class YarnClusterApplication extends SparkApplication {
- override def start(args: Array[String], conf: SparkConf): Unit = {
- new Client(new ClientArguments(args), conf, null).run()
- ①new ClientArguments(args),是配置参数的封装
- ②new Client,在client类中的属性有:
- --val yarnClient = YarnClient.createYarnClient,点击createYarnClient方法,在这个方法中:
- -- YarnClient client = new YarnClientImpl(),点击YarnClientImpl类,在类中有一个属性
- rmclient:resourcemanagerClient
- -- protected ApplicationClientProtocol rmClient
- "如上就是创建RM客户端对象",接下来执行run方法
- ③run(),RM客户端对象执行run方法,点击run,在run方法的内部:
- }
- <*************************************run*****************************************>
- def run(): Unit = {
- //1. 提交应用,返回应用的id。点击submitApplication(),查看具体提交的过程
- this.appId = submitApplication()
- .................
- }
- <**********************************submitApplication************************************>
- def submitApplication(): ApplicationId = {
- .........
- //1. 初始化hadoop的环境
- yarnClient.init(hadoopConf)
- // 2. 启动yarn客户端,与yarn之间进行连接
- yarnClient.start()
- ...............
- // 3. yarn客户端创建一个应用application
- val newApp = yarnClient.createApplication()
- val newAppResponse = newApp.getNewApplicationResponse()
- //4. 获取应用的id,在yarn应用程序中,每一个应用都是有唯一的应用id
- appId = newAppResponse.getApplicationId()
- ...............
- //5. 提交yarn应用程序,提交的是什么呢?
- --yarnClient.submitApplication(appContext),点击appContext
- --// Set up the appropriate contexts to launch our AM
- 配置java虚拟机的启动参数,点击createContainerLaunchContext,在这个方法的内部进行了command的封装:, 【集群模式】command = bin/java org.apache.spark.deploy.yarn.ApplicationMaster, 【client模式】command = bin/java org.apache.spark.deploy.yarn.ExecutorLauncher
- --val containerContext = createContainerLaunchContext(newAppResponse)基本参数配置的封装
- --val appContext = createApplicationSubmissionContext(newApp,containerContext)
- }
- //命令的封装
- <******************************createContainerLaunchContext*****************************>
- private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
- : ContainerLaunchContext = {
- ...........
- val javaOpts = ListBuffer[String]()
- ............
- // Add Xmx for AM memory
- javaOpts += "-Xmx" + amMemory + "m"
- .............
- javaOpts += "-Djava.io.tmpdir=" + tmpDir
- ..............
- // In our expts, using (default) throughput collector has severe perf ramifications in
- // multi-tenant machines
- javaOpts += "-XX:+UseConcMarkSweepGC"
- javaOpts += "-XX:MaxTenuringThreshold=31"
- javaOpts += "-XX:SurvivorRatio=8"
- javaOpts += "-XX:+CMSIncrementalMode"
- javaOpts += "-XX:+CMSIncrementalPacing"
- javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
- javaOpts += "-XX:CMSIncrementalDutyCycle=10"
- }
- ....................
- // TODO: it would be nicer to just make sure there are no null commands here
- val printableCommands = commands.map(s => if (s == null) "null" else s).toList
- amContainer.setCommands(printableCommands.asJava)
- }
- org.apache.spark.deploy.yarn.ApplicationMaster
- -- 作用
- 1. 封装ApplicationMaster的参数
- 2. 根据参数,创建ApplicationMaster对象
- 3. 执行ApplicationMaster的run方法,在run方法中,最后调用到runDriver方法,在这个方法中:
- a、启动用户的应用,并返回这个应用的"线程",具体实现如下:
- a、启动用户提交的应用程序;
- b、在ApplicationMaster中创建一个线程,线程的名称就是"Driver"
- c、启动这个线程,并执行run方法,在run方法中,就是执行我们提交的应用程序类的main方法
- d、返回这个"Driver"线程
- b、 执行一个方法,用于返回"sparkContext"的对象,如果没有返回,就不会执行下面的代码,当返回了这个上下文的对象以后:
- c、 ApplicationMaster通过ApplicationMaste的客户端,向ResourceManager注册自己,并申请资源
- d、 分配资源,具体实现如下:
- a、在ResourceManager端获取一个ApplicationMaster的客户端,返回一个分配器
- b、分配器进行资源的分配:
- a、ApplicationMaster的客户端申请一个分配器响应
- b、分配器响应返回所有被分配的容器container(资源列表)给到ApplicationMaster
- c、如果分配的资源列表的数量大于0,则对容器进行处理,处理的方式为:
- 1.AM内部会创建一个线程,并调用线程的run方法,在run方法中循环遍历RM返回的可用容器,然后进行
- 对每个容器进行匹配,此时涉及到首选位置,根据请求匹配选择哪些容器.首选位置的选择规则见首选位置说明。
- 2. 运行匹配后的资源,挨个遍历可用的容器,如果运行执行器的数量小于目标执行器的数量"假如需要4个执行
- 器,即为目标执行器,此时已经运行了2个执行器,即为运行执行器的数量,此时会启动下面的逻辑",
- 那么在这个容器中会创建一个线程池,一个线程池container对应一个ExecutorRunnable,并调用了这个对象的
- run方法,在这个线程池中,有一个nmClient(nameManagClient),说明AM能够找到NM,在这个run方法中,创建
- NM的客户端,初始化NM,并启动容器container,在启动容器中,封装一个指令, command:/bin/java
- /org.apache.spark.executor.CoarseGrainedExecutorBackend,并且启动了这个指令,显然是一个进程
- ,CoarseGrainedExecutorBackend,粗粒度的执行器后台。
◼ Driver的启动
- <****************************************main*****************************************>
- def main(args: Array[String]): Unit = {
- //1) 封装参数
- val amArgs = new ApplicationMasterArguments(args)
- ......
- //2)创建ApplicationMaster的对象
- master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
- ......
- //3)执行run方法,点击run方法
- ugi.doAs(new PrivilegedExceptionAction[Unit]() {
- override def run(): Unit = System.exit(master.run())
- })
- }
- <****************************************run*****************************************>
- //启动Driver(用户所写的主类)
- final def run(): Int = {
- ......
- // 如果是集群模式,执行,点击runDriver
- if (isClusterMode) {
- runDriver()
- } else {
- // 如果是client模式,执行:
- runExecutorLauncher()
- }
- ......
- }
- <************************************如何启Driver***************************************>
- //因为Driver的起动是在ApplicationMaster内启动的,所以YARn Cluster的Driver位于NodeMannger,至于在哪一个NM,决定于ApplicationMaster,
- <**************************************runDriver***************************************>
- private def runDriver(): Unit = {
- ......
- //1. 启动用户的程序,返回一个线程,点击startUserApplication
- userClassThread = startUserApplication()
- ......
- }
- <*********************************启动用户的Application*********************************>
- <*********************************startUserApplication*********************************>
-
- private def startUserApplication(): Thread = {
- //解析用户
- var userArgs = args.userArgs
- ......
- //通过反射的方法获取加载了用户自己所写的main方法
- val mainMethod = userClassLoader.loadClass(args.userClass)
- .getMethod("main", classOf[Array[String]])
-
- //创建了新的线程,在线程力执行自己的main方法(执行操作在后面,当调用userThread.start()时运行)
- val userThread = new Thread {
- override def run(): Unit = {
- ........
- mainMethod.invoke(null, userArgs.toArray)
- ........
- }
- }
-
- //4. 设定线程的名字为driver,说明driver就是一个applicationMaster的一个线程
- userThread.setName("Driver") //Driver时一个线程名
- //5. 启动线程,执行线程的run方法,其实就是执行类userClass的main方法,userClass是哪个类呢?通过查到,就是我们提交应用的--class,sparkpi,或者是我们自定的类
- //实际执行上面创建的线程的run方法
- userThread.start()
- //返回用户线程
- userThread
- }
- **用户所写的类为Driver,是一个线程名,Driver是一个线程。
- private def runDriver(): Unit = {
- //此处是根据用户类线程启动Driver
- userClassThread = startUserApplication()
- .............
- try {
- if (sc != null) {
- val rpcEnv = sc.env.rpcEnv
- val userConf = sc.getConf
- val host = userConf.get(DRIVER_HOST_ADDRESS)
- val port = userConf.get(DRIVER_PORT)
-
- //5.注册AM
- //ApplicationMaster向ResouceMannager注册
- registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
- //6.申请资源
- //此处放driverRef(Driver引用),是因为当ExcutorBackend启动后要向Driver进行注册
- createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
- ............
- }
- <*********************************createAllocator*********************************>
- private def createAllocator(, , , , ): Unit = {
- ........
- // 1.AM的客户端,'在RM端',创建分配器,返回一个分配器
- allocator = client.createAllocator( , , , , , , )
- .........
- 2.获取分配资源,分配器分配资源,点击allocateResources
- allocator.allocateResources()
- .........
- }
- <*********************************allocateResources*********************************>
- //7.处理申请到的资源
- def allocateResources(): Unit = synchronized {
- ............
- // 1.AM的客户端,申请一个分配响应
- val allocateResponse = amClient.allocate(progressIndicator)
- // 2.分配器响应获取所有被分配的容器container(资源列表)
- val allocatedContainers = allocateResponse.getAllocatedContainers()
- ........
- // 3.如果可分配的容器数量大于0,则调用处理可用容器的方法,点击handle方法
- if (allocatedContainers.size > 0) {
- .........
- //处理已经分配到的资源
- handleAllocatedContainers(allocatedContainers.asScala)
- }
- ........
- }
- <*****************************handleAllocatedContainers*****************************>
- // 1.内部会创建一个线程,并调用线程的run方法,在run方法中循环遍历RM返回的可用容器,然后进行对每个容器进行匹配,此时涉及到首选位置,根据请求匹配选择哪些容器.首选位置的选择规则见首选位置说明。
- def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
-
- val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
- ........
- // Match incoming requests by host
- val remainingAfterHostMatches = new ArrayBuffer[Container]
- .......
- //2.运行已经分配到资源(封装好的container)。实则启动Excutorbackend进程
- //启动Container
- runAllocatedContainers(containersToUse)
- .......
- }
-
- <*****************************runAllocatedContainers*****************************>
- private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
- // 1. 挨个遍历可用的容器资源
- for (container <- containersToUse) {
- executorIdCounter += 1
- ...........
- // 2. 每个容器中,如果运行执行器的数量小于目标执行器的数量,执行如下代码
- if (runningExecutors.size() < targetNumExecutors) {
- numExecutorsStarting.incrementAndGet()
- ........
- // 3. 线程池,在线程池的内部有:
- --launcherPool.execute(new Runnable
- // 1.执行的池子是一个线程池
- --launcherPool = ThreadUtils.newDaemonCachedThreadPool
- // 2.一个线程container对应一个ExecutorRunnable,并调用了这个对象的run方法
- --new ExecutorRunnable...run()
- }
- <*******************************************run****************************************>
- //run方法在ExecutorRunnable中:说明AM能够找到NM
- //启动Excutor的资源容器,是由ApplicationMaster与NM进行通信启动的(通过client建立通信)
- def run(): Unit = {
- // 创建NM的客户端
- nmClient = NMClient.createNMClient()
- // 初始化NM
- nmClient.init(conf)
- // 启动NM
- nmClient.start()
- // 启动容器,点击--startContainer()
- startContainer()
- }
- <*************************************startContainer**********************************>
- def startContainer(): java.util.Map[String, ByteBuffer] = {
- ........
- //封装启动Container的命令
- val commands = prepareCommand()
- //将封装好的指令传递到参数中
- ctx.setCommands(commands.asJava)
- ........
- //告诉NM启动EXcutorbackend
- //AM向NM提交申请,启动YarnCoarseGrainedExecutorBackend-->粗粒度的执行器后台,是一个进程
- nmClient.startContainer(container.get, ctx)
- }
- }
- <*************************************prepareCommand**********************************>
- //封装命令
- private def prepareCommand(): List[String] = {
- // Extra options for the JVM
- val javaOpts = ListBuffer[String]()
- // Set the JVM memory
- val executorMemoryString = executorMemory + "m"
- javaOpts += "-Xmx" + executorMemoryString
- ......................
- val commands = prefixEnv ++
- Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
- javaOpts ++
- Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
- "--driver-url", masterAddress,
- "--executor-id", executorId,
- "--hostname", hostname,
- "--cores", executorCores.toString,
- "--app-id", appId,
- "--resourceProfileId", resourceProfileId.toString) ++
- userClassPath ++
- ..................
- }
- //封装好以后执行startContainer中的 nmClient.startContainer(container.get, ctx)
◼ 首选位置说明
- -- 首选位置说明
- --1. 移动数据不如移动计算。
- --2. 首选位置:有多个,和本地化级别有关。
- --3. 本地化级别:将数据和计算所在的位置称之为本地化
- 1. 计算和数据在同一个Executor中,称之进程本地化
- 2. 计算和数据在同一个节点中,称之节点本地化
- 3. 计算和数据在同一个机架中,称之机架本地化
- 4. 任意
- org.apache.spark.executor.CoarseGrainedExecutorBackend:通信后台
- 执行一次bin/java就会执行一个新的进程,则是属于并行执行的感觉,和之前执行的内容是分开的。类似我们在Windows中开了一个微信和qq程序一样,各自执行,互不影响。
-
- - 作用:
- 执行CoarseGrainedExecutorBackend"执行器后台"的main方法,在main方法中:
- 1. 首先封装一些参数
- 2. 执行run方法,在run方法中:
- 1. 通过driver的URI,使得CoarseGrainedExecutorBackend与Driver进行关联
- 2. 通过通信环境创建了一个终端,名字为executor,创建一个CoarseGrainedExecutorBackend对象并调用onstart方法:
- 1. 获取driver的引用
- 2. ExecutorBackend向driver发送消息,注册executor的消息,也称之为反向注册
- 3. 在driver端会接收到这个消息,通过executor的引用,发送消息给到ExecutorBackend,注册executor成功
- 4. ExecutorBackend接收driver返回的executor注册成功的消息,
-
- -- 说明:
- executor是一个计算对象,在这个对象里面有一个线程池,每一个线程来处理一个从driver端发送过来的任务
- //YarnCoarseGrainedExecutorBackend 就是通过调用CoarseGrainedExecutorBackend的run方法
- private[spark] class YarnCoarseGrainedExecutorBackend(.......) {
- CoarseGrainedExecutorBackend.run(backendArgs, createFn)
- System.exit(0)
- }
- }
- <****************************************run*****************************************>
- //1.run方法
- def run(
- ........
- env.rpcEnv.setupEndpoint("Executor",
- backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
- arguments.workerUrl.foreach { url =>
- env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
- }
- env.rpcEnv.awaitTermination()
- }
- }
-
- <***************************************onStart****************************************>
- //2.onStart(反向注册)
- override def onStart(): Unit = {
- .......
- //3.3.获取Driver客户端对象并发送注册消息
- // This is a very fast action so we can use "ThreadUtils.sameThread"
- //获取driver的引用
- driver = Some(ref)
- //ExecutorBackend向driver发送消息,注册executor的消息,也称之为反向注册
- ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
- .......
- //在driver端会接收到这个消息,因为在driver端,有一个上下文的对象,sparkcontext,在这个类有一个属性: private var _schedulerBackend: SchedulerBackend = _,点击SchedulerBackend,是一个trait,找到实现类:CoarseGrainedSchedulerBackend,在这个类中,有一个方法:receiveAndReply(): ............
- case Success(_) =>
- //4.Driver回成功消息,则通知自己启动Executor
- self.send(RegisteredExecutor)
-
- // ExecutorBackend类中有一个recive方法,用来接收driver返回的executor注册成功的消息,executor是一个计算对象,在这个对象里面有一个线程池,每一个线程来处理一个从driver端发送过来的任务
- --executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
- }
-
- <**********************************RegisteredExecutor***********************************>
- override def receive: PartialFunction[Any, Unit] = {
- case RegisteredExecutor =>
- logInfo("Successfully registered with driver")
- try {
- //5.启动Executor
- //Executor负责执行任务
- executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
- resources = _resources)
- //6.向Driver发送消息,请求执行任务
- driver.get.send(LaunchedExecutor(executorId))
- ................
- //7.Driver对任务排序(FIFO,FAIR),如果任务不为空,则向ExecutorBackend发送启动任务消息,ExecutorBackend接收到该消息,则执行任务
- executor.launchTask(this, taskDesc)
- }
- <**********************************launchTask***********************************>
- def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
- //8.启动线程执行任务
- val tr = new TaskRunner(context, taskDescription)
- runningTasks.put(taskDescription.taskId, tr)
- //执行的进士tr(TaskRunner)里面的run方法
- threadPool.execute(tr)
- }
- <*****************************************run***************************************>
- //9.任务执行
- override def run(): Unit = {
- val value = Utils.tryWithSafeFinally {
- val res = task.run(
- taskAttemptId = taskId,
- attemptNumber = taskDescription.attemptNumber,
- metricsSystem = env.metricsSystem,
- resources = taskDescription.resources)
- threwException = false
- res
- }
- <***************************************task.run**************************************>
- //9.任务执行
- final def run( , , , , , ){
- try {
- runTask(context)
- } catch {
- }
- }
-
- <***************************************runTask**************************************>
- runtask是一个抽象类,其实现类ShuffleMapTask、ResultTAsk,任务的提交也到此结束。
- -- 1. application是在一个nodemanager中container中,并且在这个container中创建了一个driver线程
- -- 2. 在一个nodemanager中,可以创建多个container,在每个container中,会创建ExecutorBackend对象,在这个对象中,会创建一个executor对象,在这个对象中一个线程池,一个线程用来处理driver发来的一个task,至于能同时执行多少个task,和executor中的core数量有关。
- -- 3. ApplicationMaster周旋于Driver和ResourceManager之间
- -- 4. spark有两个进程,也就是两个分支
- 创建RM的客户端,创建AM,在AM中,创建Driver的线程
- "分支1":此时会执行Driver线程的run方法,在run方法中就是执行了应用程序的main方法
- "分支2":构建SparkContext上下文的对象,再向RM注册AM,然后申请资源和返回可用的资源,最后Driver进行资源的选择,按照首选位置的原则。
- 所以如下图片有一个错误:资源满足以后才执行main方法,实际上是创建了driver线程,还没有申请资源就已经开始执行main方法了。
- -- 5. 进程、线程、对象
- "进程":SparkSubmit、ApplicationMaster和CoarseGrainedExecutorBackend
- "线程":Driver,但是我们一般称SparkContext称之为Driver
- "对象":Executor和YarnClusterApplication
-
- -- 6. client和cluster模式的区别:
- Driver的位置不同,其余的逻辑是一样的。
- Cluster:在集群中,在nodemanager中的AM对象中,是一个线程
- client:在集群之外
- 1) 任务提交后,都会先启动Driver程序;
- 2) 随后Driver向集群管理器注册应用程序;
- 3) 之后集群管理器根据此任务的配置文件分配Executor并启动;
- 4) Driver开始执行main函数,Spark查询为懒执行,当执行到Action算子时开始反向推算,根据宽依赖进行Stage的划分,随后每一个Stage对应一个Taskset,Taskset中有多个Task,查找可用资源Executor进行调度;
- 5) 根据本地化原则,Task会被分发到指定的Executor去执行,在任务执行的过程中,Executor也会不断与Driver进行通信,报告任务运行情况。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。