赞
踩
在研究spark的源码的时候,个人认为最容易理解的方式就是按照一个程序在spark框架中的执行流程,一步一步的探索源码。
那么,我们就要弄清楚,一个程序究竟是怎么执行的。
针对一个spark application:
1.在自己本地的机器上编写spark代码
2.将代码打成jar包,通过spark-sunbmit方式提交到spark集群上去执行
3.spark-submit会通过反射创建一个driveActor出来,这个driveActor会去执行我们的代码(所以,一个application会对应一个drive)
4.driveActor在执行我们编写的spark代码的时候,第一行就会创建一个sparkcontext实例,这是spark程序的入口,也是探究源码的起点。
首先找到spark-2.4.0\core\SparkContext.scala 这个包 进入 SparkContext 这个类
SparkContext的官方定义 :
/**
stop()
the active SparkContext before在我们初始化SparkContext 的时候会创建一个taskScheduler 和 DAGscheduler
创建taskscheduler :createTaskScheduler() 根据不同的cluster master类型进行模式匹配 返回一个(bankend,taskscheduerimpl)
1).实例化一个 TaskschedulerImpl 主要作用是对task进行管理调度 (默认task最大重试次数为4)
a.底层通过bankend,可以针对不同的cluster进行task调度
对master进行模式匹配,不同的master创建不同的Impl 和 bankend
b.负责处理一些通用逻辑,比如决定多个job的调度顺序 (再次提醒,这里的决定多个job的调度顺序是针对同一个application来说的,也就是调度一个任务根据action算子划分成的不同的job)
其实就是调度池决定了多个task的调度顺序(默认FIFO),所以前面的task先执行,也就是前面的job先执行
c.客户端需要先调用他的start() 和 initialize()
initialize()决定了task的调度模式(FIFO FAIR) start()决定了向master注册
2).创建一个 bankend 主要作用是向master注册application
1.Backend 主动去连接master注册application
注册是从 TaskschedulerImpl.start()开始的 -> bankend.start() ->
创建appDesc(spark-submit设置的参数) ->
实例化AppClient,负责application与集群之间的通信 ->
AppClient.onStart()开始通信 ->
registerWithMaster() ->
tryRegisterAllMasters()异步通过线程池一次性向所有master注册
下面来看源码
```scala /** * SparkContext初始化的时候首先初始化了一个schedulerBankend 和 taskscheduler */ val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 进入createTaskScheduler()这个方法 /** * Create a task scheduler based on a given master URL. * Return a 2-tuple of the scheduler backend and the task scheduler. */ /** * createTaskScheduler(): * 1.模式匹配,根据不同的集群模式创建不同的 TaskScheduler 和 bankend * 2.实例化一个TaskSchedulerImpl, * 首先调用它的initialize()方法创建一个调度池进行task调度 FIFO FAIR(也就决定了job的执行顺序) * 然后调用start() 里面啥也没做,就是在里面调用了bankend.start() * 3.实例化一个bankend,通过start()创建一个appdescription(app的所有基本信息),然后封装成一个appClient * 通过tryregisterwithmaster向master进行注册 */ master match {
case "local" => // 本地模式 多线程运行spark任务 case LOCAL_N_REGEX(threads) => def localCpuCount: Int = Runtime.getRuntime.availableProcessors() // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. val threadCount = if (threads == "*") localCpuCount else threads.toInt if (threadCount <= 0) { throw new SparkException(s"Asked to run locally with $threadCount threads") } val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) // 初始化调度池的线程数为本地cpu数量 val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) /** * standalone模式 * 首先实例化一个TaskSchedulerImpl 底层通过bankend创建调度池 * SchedulerBackend本身是个接口,后面的所有bankend都是继承自这个接口 */ case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler) // 集群模式 case masterUrl => val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) } }
standalone模式和集群模式的主要区别是谁去创建 (backend, scheduler) 这里以standalone模式为例探索源码 首先进入 TaskSchedulerImpl 这个类看看是如何对task进行调度的,具体的路径是spark-2.4.0\core\src\main\scala\org\apache\spark\scheduler\TaskSchedulerImpl 官方定义 : /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. * It can also work with a local setup by using a `LocalSchedulerBackend` and setting * isLocal to true. It handles common logic, like determining a scheduling order across jobs, waking * up to launch speculative tasks, etc. *底层通过SchedulerBackend可以针对多种类型的集群进行task调度 还处理一些常见逻辑像决定task的执行顺序 唤醒推测执行等 * Clients should first call initialize() and start(), then submit task sets through the * submitTasks method. *客户端应该首先执行他的initialize()方法and start()方法 然后通过submitTasks提交task */ ```scala private[spark] class TaskSchedulerImpl( val sc: SparkContext, val maxTaskFailures: Int, isLocal: Boolean = false) extends TaskScheduler with Logging { def this(sc: SparkContext) = { this(sc, sc.conf.get(config.MAX_TASK_FAILURES)) // 这里定义了task最大失败次数为4 } // CPUs to request per task 每个task请求的cpu 默认为1 最后手动设置为2-4 val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) // default scheduler is FIFO task调度方式默认为FIFO private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString) /** * @param backend 接受一个bankend 调用initialize()方法创建调度池 */ def initialize(backend: SchedulerBackend) { this.backend = backend schedulableBuilder = { // 对调度模型进行匹配 schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) // 创建FIFO调度池 case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) // FAIR调度 case _ => throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " + s"$schedulingMode") } } schedulableBuilder.buildPools() // 默认创建的FIFO调度 }
initialize()方法调用到此完毕,这里已经创建了一个默认为FIFO的task调度池。接下来调用start()方法向master注册这个application
override def start() {
backend.start()
}
这里的bankend是一个接口,在实现类里面重写了start()方法,所以我们找到它对应的实现类,具体路径是 spark-2.4.0\core\src\main\scala\org\apache\spark\scheduler\cluster\StandaloneSchedulerBackend.scala
override def start() {
super.start()
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) // 每个executor需要几个core
// 通过我们自己配置的一系列application信息比如appName,coresPerExecutor 等等创建一个appDesc
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
// 通过appDesc 创建一个appClient,appClient负责向master注册这个application
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
// appClient 向master注册application
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED) // 改变这个application的状态为已提交
waitForRegistration() // 等待注册这个application
launcherBackend.setState(SparkAppHandle.State.RUNNING) // 注册完毕将application的状态改变为正在运行
}
这里我们看看 client.start()这个方法是怎么想master进行注册的。首先找到StandaloneAppClient这个类
官方定义:
/**
Register with all masters asynchronously. It will call registerWithMaster
every
REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
*异步向所有master进行注册,每隔20秒注册一次,最多注册3次
nthRetry means this is the nth attempt to register with master.
接受一个参数,表示第几次向masetr进行注册
*/
private def registerWithMaster(nthRetry: Int) { // registerMasterFutures接受一个还未注册的master的集合,然后向这些master进行注册 registerMasterFutures.set(tryRegisterAllMasters()) registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable { override def run(): Unit = { // 如果这个已经注册过了,就应该停止向这个master再次进行注册 if (registered.get) { registerMasterFutures.get.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() // 如果注册次数大于3次 放弃注册 } else if (nthRetry >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { // 如果还没有注册成功,放弃这次注册,注册次数+1 registerMasterFutures.get.foreach(_.cancel(true)) registerWithMaster(nthRetry + 1) } } }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) }
registerWithMaster()方法中首先调用了 tryRegisterAllMasters()这个方法 向所有master进行注册
/**
* Register with all masters asynchronously and returns an array Future
s for cancellation.
* 通过线程池异步向所有的master注册,返回一个数组,里面放的是没有注册成功的master地址
*/
private def tryRegisterAllMasters(): Array[JFuture[_]] = { for (masterAddress <- masterRpcAddresses) yield { registerMasterThreadPool.submit(new Runnable { override def run(): Unit = try { if (registered.get) { return } logInfo("Connecting to master " + masterAddress.toSparkURL + "...") val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) // 向master发送appDesc 等待master的处理 masterRef.send(RegisterApplication(appDescription, self)) } catch { case ie: InterruptedException => // Cancelled 没有注册成功的 case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } }) } }
到这里客户端向master注册application就已经完成了,后面就需要master和客户端保持通信,处理客户端的注册请求。
这也是下一篇博客的主要内容。
整体来说,sparkContext初始化时,taskScheduler的初始化已经完成,另外还有DAGScheduler的初始化。由于DAGScheduler源码在执行job的时候才会涉及,所以我们后面进行探究。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。