赞
踩
在Worker中启动Executor,实际上是启动了oarseGrainedExecutorBackend
其中有个receive方法,是接收Driver注册成功executor之后返回的下消息。
override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") try { executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch { case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } case RegisterExecutorFailed(message) => exitExecutor(1, "Slave registration failed: " + message) case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) } case KillTask(taskId, _, interruptThread) => if (executor == null) { exitExecutor(1, "Received KillTask command but executor was null") } else { executor.killTask(taskId, interruptThread) } case StopExecutor => stopping.set(true) logInfo("Driver commanded a shutdown") // Cannot shutdown here because an ack may need to be sent back to the caller. So send // a message to self to actually do the shutdown. self.send(Shutdown) case Shutdown => stopping.set(true) new Thread("CoarseGrainedExecutorBackend-stop-executor") { override def run(): Unit = { // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally. // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180). // Therefore, we put this line in a new thread. executor.stop() } }.start() }
其中的RegisteredExecutor说明可以创建Executor
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
这个完成后就实现了Executor 创建
再看看如何在Executor上创建Task
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
//进来先反序列化task
//在TaskScheduler中,是将task序列化后交给executor的,所以在executor中要先反序列化
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
//用executor执行launchTask
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
进入executor的launchtask方法
def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer): Unit = { //对每个task创建一个taskRunner,继承了Runnable接口,是一个线程池 val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) //然后放入内存缓存 runningTasks.put(taskId, tr) /* // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] 可以看到,是一个ConcurrentHashMap的数据结构 */ //java线程池 //这里将task封装在taskRunner中,丢到线程池里去执行 //线程池是自动实现了排序机制的 threadPool.execute(tr) }
(图片来源:北风网)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。