当前位置:   article > 正文

spark2.3源码分析之CoarseGrainedExecutorBackend

coarsegrainedexecutorbackend

概述

CoarseGrainedExecutorBackend向CoarseGrainedSchedulerBackend端发送的消息主要如下:

  • registerExecutor:注册executor。SchedulerBackend在确认execId可以注册后,回复注册成功的消息。
  • statusUpdate:汇报Executor中运行的task的状态。

 CoarseGrainedExecutorBackend处理来自CoarseGrainedSchedulerBackend端的消息主要如下:

  • registeredExecutor:回复ExecutorBackend注册功能,ExecutorBackend接到后会创建Executor。
  • LaunchTask:通知Executor启动一个task,消息中包含序列化的task信息,Executor通过该信息启动task。

 CoarseGrainedExecutorBackend处理来自CoarseGrainedSchedulerBackend端的消息

 

  1. override def receive: PartialFunction[Any, Unit] = {
  2. case RegisteredExecutor =>
  3. logInfo("Successfully registered with driver")
  4. try {
  5. executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
  6. } catch {
  7. case NonFatal(e) =>
  8. exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
  9. }
  10. case RegisterExecutorFailed(message) =>
  11. exitExecutor(1, "Slave registration failed: " + message)
  12. case LaunchTask(data) =>
  13. if (executor == null) {
  14. exitExecutor(1, "Received LaunchTask command but executor was null")
  15. } else {
  16. val taskDesc = TaskDescription.decode(data.value)
  17. logInfo("Got assigned task " + taskDesc.taskId)
  18. executor.launchTask(this, taskDesc)
  19. }
  20. case KillTask(taskId, _, interruptThread, reason) =>
  21. if (executor == null) {
  22. exitExecutor(1, "Received KillTask command but executor was null")
  23. } else {
  24. executor.killTask(taskId, interruptThread, reason)
  25. }
  26. case StopExecutor =>
  27. stopping.set(true)
  28. logInfo("Driver commanded a shutdown")
  29. // Cannot shutdown here because an ack may need to be sent back to the caller. So send
  30. // a message to self to actually do the shutdown.
  31. self.send(Shutdown)
  32. case Shutdown =>
  33. stopping.set(true)
  34. new Thread("CoarseGrainedExecutorBackend-stop-executor") {
  35. override def run(): Unit = {
  36. // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.
  37. // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to
  38. // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).
  39. // Therefore, we put this line in a new thread.
  40. executor.stop()
  41. }
  42. }.start()
  43. case UpdateDelegationTokens(tokenBytes) =>
  44. logInfo(s"Received tokens of ${tokenBytes.length} bytes")
  45. SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
  46. }

CoarseGrainedExecutorBackend向CoarseGrainedSchedulerBackend端发送消息

registerExecutor

  1. override def onStart() {
  2. logInfo("Connecting to driver: " + driverUrl)
  3. rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
  4. // This is a very fast action so we can use "ThreadUtils.sameThread"
  5. driver = Some(ref)
  6. //发送registerExecutor消息
  7. ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
  8. }(ThreadUtils.sameThread).onComplete {
  9. // This is a very fast action so we can use "ThreadUtils.sameThread"
  10. case Success(msg) =>
  11. // Always receive `true`. Just ignore it
  12. case Failure(e) =>
  13. exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
  14. }(ThreadUtils.sameThread)
  15. }

statusUpdate

  1. override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
  2. val msg = StatusUpdate(executorId, taskId, state, data)
  3. driver match {
  4. case Some(driverRef) => driverRef.send(msg)
  5. case None => logWarning(s"Drop $msg because has not yet connected to driver")
  6. }
  7. }

 

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号