赞
踩
CoarseGrainedExecutorBackend是Executor的守护进程,用于Executor的创建和维护。CoarseGrainedExecutorBackend在spark运行期是一个单独的进程,我们可以在运行spark的期间,用jps命令查看一下,就可以看到这个进程。
下面我们就来看一下CoarseGrainedExecutorBackend的启动。源码版本spark-2.4.0
在上一篇博客中,我们讲到Executor的启动,会调用FetchAndRunExecutor的方法,然后创建ProcessBuilder的命令进行启动CoarseGrainedExecutorBackend。
首先进入到CoarseGrainedExecutorBackend的类中,
private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, driverUrl: String, executorId: String, hostname: String, cores: Int, userClassPath: Seq[URL], env: SparkEnv) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need // to be changed so that we don't share the serializer instance across threads private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()
可以看到:
1,CoarseGrainedExecutorBackend是继承了ThreadSafeRpcEndpoint和ExecutorBackend。所以是RpcEndpoint的子类,能够和其他的Endpoint进行通信,它的生命周期同样是onStart->receive*->stop。
2,维护了两个属性,executor和Driver, executor负责运行task,driver负责和Driver通信。
再进入到main方法里面:
def main(args: Array[String]) { //声明一些变量 var driverUrl: String = null var executorId: String = null var hostname: String = null var cores: Int = 0 var appId: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() //获取命令的列表 var argv = args.toList while (!argv.isEmpty) { //根据参数进行模式匹配 argv match { case ("--driver-url") :: value :: tail => driverUrl = value argv = tail case ("--executor-id") :: value :: tail => executorId = value argv = tail case ("--hostname") :: value :: tail => hostname = value argv = tail case ("--cores") :: value :: tail => cores = value.toInt argv = tail case ("--app-id") :: value :: tail => appId = value argv = tail case ("--worker-url") :: value :: tail => // Worker url is used in spark standalone mode to enforce fate-sharing with worker workerUrl = Some(value) argv = tail case ("--user-class-path") :: value :: tail => userClassPath += new URL(value) argv = tail case Nil => case tail => // scalastyle:off println System.err.println(s"Unrecognized options: ${tail.mkString(" ")}") // scalastyle:on println printUsageAndExit() } } if (driverUrl == null || executorId == null || hostname == null || cores <= 0 || appId == null) { printUsageAndExit() } //调用run方法,传入赋值后的参数 run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) System.exit(0) }
下面看一下run方法:
private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath: Seq[URL]) { Utils.initDaemon(log) SparkHadoopUtil.get.runAsSparkUser { () => // Debug code Utils.checkHost(hostname) // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf //创建RpcEnv val fetcher = RpcEnv.create( "driverPropsFetcher", hostname, -1, executorConf, new SecurityManager(executorConf), clientMode = true) //创建driver val driver = fetcher.setupEndpointRefByURI(driverUrl) //向driver发送消息,RetrieveSparkAppConfig,获取spark的属性 val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig) val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId)) //关闭fetcher fetcher.shutdown() // Create SparkEnv using properties we fetched from the driver. val driverConf = new SparkConf() for ((key, value) <- props) { // this is required for SSL in standalone mode if (SparkConf.isExecutorStartupConf(key)) { driverConf.setIfMissing(key, value) } else { //将获取的消息添加到driverConf中 driverConf.set(key, value) } } cfg.hadoopDelegationCreds.foreach { tokens => SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf) } //创建Executor的SparkEnv val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) // 创建CoarseGrainedExecutorBackend实例,并注册到自身的Executor Env的rpcEnv中 env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) //创建workerwarcher,在worker出现异常的时候会关闭CoarseGrainedExecutorBackend workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } //阻塞等待RpcEnv的退出 env.rpcEnv.awaitTermination() } }
上面的方法中完成以下几件事:
1,创建RpcEnv
2,获取Spark的配置信息
3,创建Executor的sparkEnv
4, 创建CoarseGrainedExecutorBackend实例,并注册到自身的Executor Env的rpcEnv中。
5,若Worker 出现异常的时候,关闭CoarseGrainedExecutorBackend。
CoarseGrainedExecutorBackend在注册到rpcEnv中的时候,会调用CoarseGrainedExecutorBackend的onStart方法,看一下这个方法:
override def onStart() { logInfo("Connecting to driver: " + driverUrl) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) //向driver发送RegisterExecutor的消息, ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => // Always receive `true`. Just ignore it case Failure(e) => exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) }(ThreadUtils.sameThread) } 发送注册Executor的消息,会发送到CoarseGrainedSchedulerBackend端,看一下这里的方法: case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => //判断是否已经存在要注册的Executor if (executorDataMap.contains(executorId)) { //如果存在,就向executor发送注册失败的消息 executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) //判断是否在黑名单里 } else if (scheduler.nodeBlacklist.contains(hostname)) { // If the cluster manager gives us an executor on a blacklisted node (because it // already started allocating those resources before we informed it of our blacklist, // or if it ignored our blacklist), then we reject that executor immediately. //如果在黑名单里,则发送注册失败 logInfo(s"Rejecting $executorId as it has been blacklisted.") executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId")) context.reply(true) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. val executorAddress = if (executorRef.address != null) { executorRef.address } else { context.senderAddress } logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId") //向hashMap中添加executor id与rpc Address信息 addressToExecutorId(executorAddress) = executorId //更新总的cores数量 totalCoreCount.addAndGet(cores) //更新总的注册的Executor的数量 totalRegisteredExecutors.addAndGet(1) //创建ExecutorData val data = new ExecutorData(executorRef, executorAddress, hostname, cores, cores, logUrls) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) if (currentExecutorIdCounter < executorId.toInt) { currentExecutorIdCounter = executorId.toInt } if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } // 向CoarseGrainedExecutorBackend发送消息Executor注册成功 executorRef.send(RegisteredExecutor) // Note: some tests expect the reply to come after we put the executor in the map context.reply(true) // 向listenerBus投递SparkListenerExecutorAdded事件 listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) // 调用makeOffer方法,给Task分配资源 makeOffers() }
上面的方法中完成了以下事情:
1.判断executor是否重复注册
2.向addressToExecutorId hashMap中添加executor id与rpc Address信息
3.更新totalCoreCount总数,
4.更新executor总数
5.封装ExecutorData对象
6.将分装的ExecutorData对象,executor Id添加到executorDataMap缓存中
7.更新currentExecutorIdCounter、numPendingExecutors信息
8.向CoarseGrainedExecutorBackend发送消息Executor注册成功
9.向listenerBus投递SparkListenerExecutorAdded事件
10.调用makeOffer方法,给Task分配资源
再回到CoarseGrainedExecutorBackend端,
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
接收到注册Executor的信息后就会创建Executor。至此CoarseGrainedExecutorBackend的启动流程就结束了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。