赞
踩
前面几篇文章
对DagScheduler的源码进行了学习。本篇开始,将对Spark的Task执行组件Executor进行学习。
首先我们看一下Executor包中都有哪些类和对象:
不是非常地复杂。
接下来进入正题,看下Executor的内部结构,同样是包含Executor本身和它的伴生对象:
Spark 的Executor是一个执行tasks的线程池。除Mesos的细粒度处理模式外,其他模式下内部使用RPC和Driver进行通信。
看一下里面的数据结构:
主要的数据结构有线程池threadPool。线程kill ,task kill的跟踪数据taskReaperPool,taskReaperForTask,心跳发送和接收的结构heartbeater,heartbeatReceiverRef。 执行的task的跟踪结构runningTasks。
初始化的时候,创建了一个线程池:
// Start worker thread pool private val threadPool = { val threadFactory = new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("Executor task launch worker-%d") .setThreadFactory(new ThreadFactory { override def newThread(r: Runnable): Thread = // Use UninterruptibleThread to run tasks so that we can allow running codes without being // interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622, // will hang forever if some methods are interrupted. new UninterruptibleThread(r, "unused") // thread name will be set by ThreadF,actoryBuilder }) .build() Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] }
实际就是一个ThreadPoolExecutor,,内部设置了一个ThreadFactory,它是java中用于创建后台线程的工厂类。
下面是其他几个数据结构的初始化:
// Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] // Executor for the heartbeat task. private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater") // must be initialized before running startDriverHeartbeat() private val heartbeatReceiverRef = RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
同时executor初始化时,会启动一个任务去像driver反馈心跳:
/** * Schedules a task to report heartbeat and partial metrics for active tasks to driver. */ private def startDriverHeartbeater(): Unit = { val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") // Wait a random interval so the heartbeats don't end up in sync val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] val heartbeatTask = new Runnable() { override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat()) } heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS) } }
看一下主要的方法
这几个方法都是在CoarseGrainedExecutorBackend的相应方法中调用的。
CoarseGrainedExecutorBackend持有一个Executor对象
看下CoarseGrainedExecutorBackend启动方法: override def onStart() { logInfo("Connecting to driver: " + driverUrl) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => // Always receive `true`. Just ignore it case Failure(e) => exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) }(ThreadUtils.sameThread) }
其实就是去Driver上注册了一个Executor。
看下receive 方法中怎么处理的:
override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") try { // 注册成功后在本地创建对应的Executor executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch { case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } case RegisterExecutorFailed(message) => exitExecutor(1, "Slave registration failed: " + message) case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc) } case KillTask(taskId, _, interruptThread, reason) => if (executor == null) { exitExecutor(1, "Received KillTask command but executor was null") } else { executor.killTask(taskId, interruptThread, reason) } case StopExecutor => stopping.set(true) logInfo("Driver commanded a shutdown") // Cannot shutdown here because an ack may need to be sent back to the caller. So send // a message to self to actually do the shutdown. self.send(Shutdown) case Shutdown => stopping.set(true) new Thread("CoarseGrainedExecutorBackend-stop-executor") { override def run(): Unit = { // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally. // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180). // Therefore, we put this line in a new thread. executor.stop() } }.start() case UpdateDelegationTokens(tokenBytes) => logInfo(s"Received tokens of ${tokenBytes.length} bytes") SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) }
跟DagScheduler的doOnReceive方法差不多,通过模式匹配分发任务,处理具体的工作。我们以launchTask为例来看一下,在driver返回的是LaunchTask指令的情况下,就执行LaunchTask操作。在executor不为null 的情况下,解码driver返回的任务信息,调用executor.launchTask方法去启动task。
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { val tr = new TaskRunner(context, taskDescription) runningTasks.put(taskDescription.taskId, tr) threadPool.execute(tr) }
上述代码就是executor启动任务的实现。新建一个taskRunner,记录task信息到runningTasks中,然后在线程池中调用线程去执行task。
TaskRunner实际就是一个线程:
class TaskRunner( execBackend: ExecutorBackend, private val taskDescription: TaskDescription) extends Runnable
包含了任务的一些信息
至此,executor的任务执行流程也就清楚了,其他的过程大致类似。
executor的心跳反馈:
最后梳理一下:
executor实际是CoarseGrainedExecutorBackend的一个代理窗口。driver端发送的命令都是在CoarseGrainedExecutorBackend里面处理,在receive方法中对driver的command进行分发,完成注册executor,调用executor的接口进行任务的管理等操作。executor负责实际的任务执行和管理工作,管理执行任务的线程池,记录任务的状态。
Executor的源码学习暂时就到这里吧。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。