当前位置:   article > 正文

spark源码(一)--sparkContext的初始化_sc spark.sparkcontext

sc spark.sparkcontext

在研究spark的源码的时候,个人认为最容易理解的方式就是按照一个程序在spark框架中的执行流程,一步一步的探索源码。
那么,我们就要弄清楚,一个程序究竟是怎么执行的。

针对一个spark application:
1.在自己本地的机器上编写spark代码
2.将代码打成jar包,通过spark-sunbmit方式提交到spark集群上去执行
3.spark-submit会通过反射创建一个driveActor出来,这个driveActor会去执行我们的代码(所以,一个application会对应一个drive)
4.driveActor在执行我们编写的spark代码的时候,第一行就会创建一个sparkcontext实例,这是spark程序的入口,也是探究源码的起点。

首先找到spark-2.4.0\core\SparkContext.scala 这个包 进入 SparkContext 这个类
SparkContext的官方定义 :
/**

  • Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
  • cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
    *spark程序的主要入口 代表着与spark集群的通信 并且能够创建 rdd 累加器 已经广播变量
  • Only one SparkContext may be active per JVM. You must stop() the active SparkContext before
  • creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
    *一个JVM里面只能有一个sparkcontext 所以在创建新的sparkcontext之前必须调用stop()方法关闭上一个sc
  • @param config a Spark Config object describing the application configuration. Any settings in
  • this config overrides the default configs as well as system properties.
    *sparkconf 是描述一个程序的配置信息 设置的任何配置都会覆盖系统的默认配置
    */

在我们初始化SparkContext 的时候会创建一个taskScheduler 和 DAGscheduler
创建taskscheduler :createTaskScheduler() 根据不同的cluster master类型进行模式匹配 返回一个(bankend,taskscheduerimpl)
1).实例化一个 TaskschedulerImpl 主要作用是对task进行管理调度 (默认task最大重试次数为4)
a.底层通过bankend,可以针对不同的cluster进行task调度
对master进行模式匹配,不同的master创建不同的Impl 和 bankend
b.负责处理一些通用逻辑,比如决定多个job的调度顺序 (再次提醒,这里的决定多个job的调度顺序是针对同一个application来说的,也就是调度一个任务根据action算子划分成的不同的job)
其实就是调度池决定了多个task的调度顺序(默认FIFO),所以前面的task先执行,也就是前面的job先执行
c.客户端需要先调用他的start() 和 initialize()
initialize()决定了task的调度模式(FIFO FAIR) start()决定了向master注册
2).创建一个 bankend 主要作用是向master注册application
1.Backend 主动去连接master注册application
注册是从 TaskschedulerImpl.start()开始的 -> bankend.start() ->
创建appDesc(spark-submit设置的参数) ->
实例化AppClient,负责application与集群之间的通信 ->
AppClient.onStart()开始通信 ->
registerWithMaster() ->
tryRegisterAllMasters()异步通过线程池一次性向所有master注册

下面来看源码

```scala
/**
      * SparkContext初始化的时候首先初始化了一个schedulerBankend 和 taskscheduler
      */
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
  
进入createTaskScheduler()这个方法
  /**
   * Create a task scheduler based on a given master URL.
   * Return a 2-tuple of the scheduler backend and the task scheduler.
   */
  /**
    * createTaskScheduler():
    * 1.模式匹配,根据不同的集群模式创建不同的 TaskScheduler 和 bankend
    * 2.实例化一个TaskSchedulerImpl,
    *   首先调用它的initialize()方法创建一个调度池进行task调度 FIFO FAIR(也就决定了job的执行顺序)
    *    然后调用start() 里面啥也没做,就是在里面调用了bankend.start()
    * 3.实例化一个bankend,通过start()创建一个appdescription(app的所有基本信息),然后封装成一个appClient
    *   通过tryregisterwithmaster向master进行注册
    */
    
 master match {
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  case "local" =>
    // 本地模式 多线程运行spark任务
  case LOCAL_N_REGEX(threads) =>
    def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
    // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
    val threadCount = if (threads == "*") localCpuCount else threads.toInt
    if (threadCount <= 0) {
      throw new SparkException(s"Asked to run locally with $threadCount threads")
    }
    val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
    // 初始化调度池的线程数为本地cpu数量
    val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
    scheduler.initialize(backend)
    (backend, scheduler)

  /**
    * standalone模式
    * 首先实例化一个TaskSchedulerImpl 底层通过bankend创建调度池
    * SchedulerBackend本身是个接口,后面的所有bankend都是继承自这个接口
    */
  case SPARK_REGEX(sparkUrl) =>
    val scheduler = new TaskSchedulerImpl(sc)
    val masterUrls = sparkUrl.split(",").map("spark://" + _)
    val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
    scheduler.initialize(backend)
    (backend, scheduler)

    // 集群模式
  case masterUrl =>
    val cm = getClusterManager(masterUrl) match {
      case Some(clusterMgr) => clusterMgr
      case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
    }
    try {
      val scheduler = cm.createTaskScheduler(sc, masterUrl)
      val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
      cm.initialize(scheduler, backend)
      (backend, scheduler)
    } catch {
      case se: SparkException => throw se
      case NonFatal(e) =>
        throw new SparkException("External scheduler cannot be instantiated", e)
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

standalone模式和集群模式的主要区别是谁去创建 (backend, scheduler)
这里以standalone模式为例探索源码
首先进入 TaskSchedulerImpl 这个类看看是如何对task进行调度的,具体的路径是spark-2.4.0\core\src\main\scala\org\apache\spark\scheduler\TaskSchedulerImpl
官方定义 : 
/**
 * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
 * It can also work with a local setup by using a `LocalSchedulerBackend` and setting
 * isLocal to true. It handles common logic, like determining a scheduling order across jobs, waking
 * up to launch speculative tasks, etc.
 *底层通过SchedulerBackend可以针对多种类型的集群进行task调度 还处理一些常见逻辑像决定task的执行顺序 唤醒推测执行等
 * Clients should first call initialize() and start(), then submit task sets through the
 * submitTasks method.
 *客户端应该首先执行他的initialize()方法and start()方法 然后通过submitTasks提交task
 */
 

```scala
private[spark] class TaskSchedulerImpl(
  	val sc: SparkContext,
  	val maxTaskFailures: Int,
  	isLocal: Boolean = false)
 extends TaskScheduler with Logging {
    
    def this(sc: SparkContext) = {
    this(sc, sc.conf.get(config.MAX_TASK_FAILURES)) // 这里定义了task最大失败次数为4
  	 }
  
  	// CPUs to request per task 每个task请求的cpu 默认为1 最后手动设置为2-4
  	val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
  	
  	// default scheduler is FIFO  task调度方式默认为FIFO
    private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)

	  /**
	    * @param backend  接受一个bankend 调用initialize()方法创建调度池
	    */
	  def initialize(backend: SchedulerBackend) {
	    this.backend = backend
	    schedulableBuilder = {
	      // 对调度模型进行匹配
	      schedulingMode match {
	        case SchedulingMode.FIFO =>
	          new FIFOSchedulableBuilder(rootPool)  // 创建FIFO调度池
	        case SchedulingMode.FAIR =>
	          new FairSchedulableBuilder(rootPool, conf) // FAIR调度
	        case _ =>
	          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
	          s"$schedulingMode")
	      }
	    }
	    schedulableBuilder.buildPools() // 默认创建的FIFO调度
	  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

initialize()方法调用到此完毕,这里已经创建了一个默认为FIFO的task调度池。接下来调用start()方法向master注册这个application

  override def start() {
    backend.start()
    }
  • 1
  • 2
  • 3

这里的bankend是一个接口,在实现类里面重写了start()方法,所以我们找到它对应的实现类,具体路径是 spark-2.4.0\core\src\main\scala\org\apache\spark\scheduler\cluster\StandaloneSchedulerBackend.scala

  override def start() {
    super.start()
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)  // 每个executor需要几个core
    // 通过我们自己配置的一系列application信息比如appName,coresPerExecutor 等等创建一个appDesc
    val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
      // 通过appDesc 创建一个appClient,appClient负责向master注册这个application
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    // appClient 向master注册application 
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED) 	    // 改变这个application的状态为已提交
    waitForRegistration()  	    // 等待注册这个application
    launcherBackend.setState(SparkAppHandle.State.RUNNING)  	    // 注册完毕将application的状态改变为正在运行
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

这里我们看看 client.start()这个方法是怎么想master进行注册的。首先找到StandaloneAppClient这个类
官方定义:
/**

  • Interface allowing applications to speak with a Spark standalone cluster manager.
  • 应用程序和spark集群的通信接口
  • Takes a master URL, an app description, and a listener for cluster events, and calls
  • back the listener when various events occur.
  • 接受一个master的URL appDesc 和 一个事件侦听器 并且在发生各种事件的时候回调侦听器
    /
    找到client的 onStart()方法:
    override def onStart(): Unit = {
    try {
    registerWithMaster(1)
    }
    }
    这里主要是调用了registerWithMaster()这个方法,找到该方法
    /
    *
    • Register with all masters asynchronously. It will call registerWithMaster every

    • REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.

    • Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
      *异步向所有master进行注册,每隔20秒注册一次,最多注册3次

    • nthRetry means this is the nth attempt to register with master.

    • 接受一个参数,表示第几次向masetr进行注册
      */

         private def registerWithMaster(nthRetry: Int) {
            // registerMasterFutures接受一个还未注册的master的集合,然后向这些master进行注册
            registerMasterFutures.set(tryRegisterAllMasters())
            registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
              override def run(): Unit = {
                // 如果这个已经注册过了,就应该停止向这个master再次进行注册
                if (registered.get) {
                  registerMasterFutures.get.foreach(_.cancel(true))
                  registerMasterThreadPool.shutdownNow()
                  // 如果注册次数大于3次 放弃注册
                } else if (nthRetry >= REGISTRATION_RETRIES) {
                  markDead("All masters are unresponsive! Giving up.")
                } else {
                  // 如果还没有注册成功,放弃这次注册,注册次数+1
                  registerMasterFutures.get.foreach(_.cancel(true))
                  registerWithMaster(nthRetry + 1)
                }
              }
            }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20

registerWithMaster()方法中首先调用了 tryRegisterAllMasters()这个方法 向所有master进行注册
/**
* Register with all masters asynchronously and returns an array Futures for cancellation.
* 通过线程池异步向所有的master注册,返回一个数组,里面放的是没有注册成功的master地址
*/

	    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
	      for (masterAddress <- masterRpcAddresses) yield {
	        registerMasterThreadPool.submit(new Runnable {
	          override def run(): Unit = try {
	            if (registered.get) {
	              return
	            }
	            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
	            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
	            // 向master发送appDesc 等待master的处理
	            masterRef.send(RegisterApplication(appDescription, self))
	          } catch {
	            case ie: InterruptedException => // Cancelled 没有注册成功的
	            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
	          }
	        })
	      }
	    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

到这里客户端向master注册application就已经完成了,后面就需要master和客户端保持通信,处理客户端的注册请求。
这也是下一篇博客的主要内容。

整体来说,sparkContext初始化时,taskScheduler的初始化已经完成,另外还有DAGScheduler的初始化。由于DAGScheduler源码在执行job的时候才会涉及,所以我们后面进行探究。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/655837
推荐阅读
相关标签
  

闽ICP备14008679号