当前位置:   article > 正文

spark executor内幕解密_spark extors

spark extors

 

 

 

一:Spark Executor工作原理

1, 需要特别注意是在CoarseGrainedExecutorBackend启动时向Driver注册Executor其实质是注册ExecutorBackend实例,和Executor实例之间没有直接的关系!!!

2, CoarseGrainedExecutorBackendExecutor运行所在的进程名称,Executor才是正在处理Task的对象,Executor内部是通过线程池的方式来完成Task的计算的;

3, CoarseGrainedExecutorBackendExecutor是一一对应的;

4, CoarseGrainedExecutorBackend是一个消息通信体(其具体实现了ThreadSafeRPCEndpoint,可以发送信息给Driver并可以接受Driver中发过来的指令,例如启动Task等;

5, Driver进程有两个至关重要的Endpoint

a) ClientEndpoint:主要负责向Master注册当前的程序,是AppClient的内部成员;

b) DriverEndpoint:这是整个程序运行时候的驱动器,是CoarseGrainedExecutorBackend的内部成员;

6, Driver中通过ExecutorData封装并注册ExecutorBackend的信息到Driver的内存数据结构executorMapData中:

private[cluster] class ExecutorData(
   val executorEndpoint: RpcEndpointRef,
   val executorAddress: RpcAddress,
   override val executorHost: String,
   var freeCores: Int,
   override val totalCores: Int,
   override val logUrlMap: Map[String, String]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)

 

7, 实际在执行的时候DriverEndpoint会把信息吸入CoarseGrainedSchedulerBackend的内存数据结构executorMapData,所以说最终是注册给了CoarseGrainedSchedulerBackend,也就是说CoarseGrainedSchedulerBackend掌握了为当前程序分配的所有的ExecutorBackend进程,而在每一个ExecutorBackend进行实例中会通过Executor对象来负责具体Task的运行。在运行的时候使用synchronized关键字来保证executorMapData安全的并发写操作。

8, CoarseGrainedExecutorBackend收到DriverEndpoint发送过来的RegisteredExecutor消息后会启动Executor实例对象,而Executor实例对象是事实上负责真正Task计算的;

二:Executor具体是如何工作的?

1, Driver发送过来Task的时候,其实是发送给了CoarseGrainedExecutorBackend这个RpcEndpoint,而不是直接发送给了ExecutorExecutor由于不是消息循环体,所以永远也无法直接接受远程发过来的信息);

 

case LaunchTask(data) =>
  if (executor == null) {
    logError("Received LaunchTask command but executor was null")
    System.exit(1)
  } else {
    val taskDesc = ser.deserialize[TaskDescription](data.value)
    logInfo("Got assigned task " + taskDesc.taskId)
    executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
      taskDesc.name, taskDesc.serializedTask)
  }

 

2, ExecutorBackend在收到Driver中发送过来的消息后会提供调用launchTask来交给Executor去执行:

 

case LaunchTask(data) =>
  if (executor == null) {
    logError("Received LaunchTask command but executor was null")
    System.exit(1)
  } else {
    val taskDesc = ser.deserialize[TaskDescription](data.value)
    logInfo("Got assigned task " + taskDesc.taskId)
    executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
      taskDesc.name, taskDesc.serializedTask)
  }

 

 

3, 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/265891
推荐阅读
相关标签
  

闽ICP备14008679号