赞
踩
CoarseGrainedExecutorBackend向CoarseGrainedSchedulerBackend端发送的消息主要如下:
CoarseGrainedExecutorBackend处理来自CoarseGrainedSchedulerBackend端的消息主要如下:
- override def receive: PartialFunction[Any, Unit] = {
- case RegisteredExecutor =>
- logInfo("Successfully registered with driver")
- try {
- executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
- } catch {
- case NonFatal(e) =>
- exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
- }
-
- case RegisterExecutorFailed(message) =>
- exitExecutor(1, "Slave registration failed: " + message)
-
- case LaunchTask(data) =>
- if (executor == null) {
- exitExecutor(1, "Received LaunchTask command but executor was null")
- } else {
- val taskDesc = TaskDescription.decode(data.value)
- logInfo("Got assigned task " + taskDesc.taskId)
- executor.launchTask(this, taskDesc)
- }
-
- case KillTask(taskId, _, interruptThread, reason) =>
- if (executor == null) {
- exitExecutor(1, "Received KillTask command but executor was null")
- } else {
- executor.killTask(taskId, interruptThread, reason)
- }
-
- case StopExecutor =>
- stopping.set(true)
- logInfo("Driver commanded a shutdown")
- // Cannot shutdown here because an ack may need to be sent back to the caller. So send
- // a message to self to actually do the shutdown.
- self.send(Shutdown)
-
- case Shutdown =>
- stopping.set(true)
- new Thread("CoarseGrainedExecutorBackend-stop-executor") {
- override def run(): Unit = {
- // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.
- // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to
- // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).
- // Therefore, we put this line in a new thread.
- executor.stop()
- }
- }.start()
-
- case UpdateDelegationTokens(tokenBytes) =>
- logInfo(s"Received tokens of ${tokenBytes.length} bytes")
- SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
- }

registerExecutor
- override def onStart() {
- logInfo("Connecting to driver: " + driverUrl)
- rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
- // This is a very fast action so we can use "ThreadUtils.sameThread"
- driver = Some(ref)
- //发送registerExecutor消息
- ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
- }(ThreadUtils.sameThread).onComplete {
- // This is a very fast action so we can use "ThreadUtils.sameThread"
- case Success(msg) =>
- // Always receive `true`. Just ignore it
- case Failure(e) =>
- exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
- }(ThreadUtils.sameThread)
- }
statusUpdate
- override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
- val msg = StatusUpdate(executorId, taskId, state, data)
- driver match {
- case Some(driverRef) => driverRef.send(msg)
- case None => logWarning(s"Drop $msg because has not yet connected to driver")
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。