当前位置:   article > 正文

2-2SparkContext的submit以及原理大致剖析_context.submit

context.submit

1打包一个spark工程
2在spark安装目录的bin下,有一个spark-submit脚本
3exec “ S P A R K H O M E " / b i n / s p a r k − c l a s s o r g . a p a c h e . s p a r k . d e p l o y . S p a r k S u b m i t " {SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit " SPARKHOME"/bin/sparkclassorg.apache.spark.deploy.SparkSubmit"@”
可以将程序提交给各种模式
–master
不设置,local模式
设置spark://开头的URL就是standalonem哦是,会提交到制定的URL的master进程上去
如果设置Yarn开头的那就是yarn模式,会读取hadoop的配置文件链接resourcemanager


梳理一下自己写的spark程序提交上去,spark做了什么


1 自己写的application通过spark-submit提交上去
2 Driver通过反射的方式构建一个driverActor出来
3 初始化的时候,SparkContext构造DAGScheduler和TaskScheduler,TaskScheduler去找master向其注册application,底层相关代码是这样的:

**一:TaskScheduler的初始化机制**
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
①createTaskScheduler()=>TaskSchedulerImpl这就是我们所说的TaskScheduler。
②TaskSchedulerImpl 类:
1底层通过操作一个SchedulerBackend针对不同种类cluster(standalone,yarn,mesos),调度task
2它也可以通过一个LocalBackend并且将isLocal的参数设置为true,来在本地模式下工作
3它负责处理一些通用的逻辑,比如决定多个Job的调度顺序创建SchedulerPool调度池它有不同的优先策略,比如FIFO(先进先出),启动推测任务执行
4客户端首先调用它的initalize()方法和start()方法,然后通过runTasks()提交task sets
调用一些方法 封装一些参数与master通信
registerWithMaster()   与master通信成功
③又创建了一个SparkDeploySchedulerBackend,它在底层接收  的控制,实际上负责与Master的注册,Executor
的反注册,task发送到Executor等操作
Master接收到了,启动worker->executor反向注册到SparkDeploySchedulerBackend
//TaskScheduler底层主要基于SparkDeploySchedulerBackend组件来工作

**二:DAGScheduler**
private[spark] def dagScheduler_=(ds: DAGScheduler): Unit = {
  _dagScheduler = ds
}
DAGScheduler类描述:实现了面向Stage的调度机制的高层次的调度层,它会为每一个job计算一个stage的DAG(有向无环图),追踪RDD和stage的输出是否被物化(写入到了磁盘或者内存等)并且寻找一个消耗最优调度机制来运行Job
它会将Stage作为tasksets提交到底层的TaskSchedulerImpl上,在集群上运行task
除了处理stage的DAG,还负责运行每个task的最佳位置,基于当前的缓存状态,将这些最佳位置提交给底层的TaskSchedulerImpl。
此外,它还会处理由于shuffle输出文件丢失导致的失败,在这种情况下,旧的stage可能就会被重新提交,一个stage内部的失败,如果不是由于shuffle文件丢失导致的会被TaskScheduler处理,它会多次重试每一个task,直到最后才取消整个stage
DAGScheduler底层基于 DAGSchedulerEventProcessLoop组件进行通信(线程)

三:sparkUI:
4040端口,显示Application的信息=》启动一个jetty服务器来提供web服务从而显示网页
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

4 master接收到注册application的请求后,在worker上位这个application启动多个executor
master机制:

**一:主备切换机制**
基于zookeeper的切换——自动切换master。
步骤1: 使用持久化引擎读取持久化的storeApps、storeDrivers、storeWorkers,持久化引擎有FileSystemPersistenceEngine和ZookeeperPersistenceEngine,判读如果storedApps、storedDrivers、store的Workers有任何一个非空就继续向后执行. 持久化引擎的Application、Driver、Worker的信息重新注册到Master内部的内存缓存结构中
步骤2:将application和workers的状态都修改为Unkown,然后向应用程序对应的driver和worker发送standby master的地址,“让小弟知道谁是新的老大”。
步骤3:正在正常运行的driver和worker在上一步收到地址后,会向standby master节点发送响应信息。
步骤4:standby节点收到响应信息后,会调用自身的completeRecovery()方法处理未发送响应信息的driver和worker,把它们滤掉,造成的结果是不对那些driver和对应的application分配资源。
步骤5:此时,新的active master已经产生了,那么master会调用自身的schedule()方法,对那些等待资源调度的driver、application进行调度。例如为application在worker节点启动该application所需的Executor、在某个worker节点启动drive
**二:注册机理机制**
worker在启动之后,就会主动向master进行注册,master将讲状态为dead状态的worker过滤掉,对于状态为unknown的worker清除掉旧的worker信息替换为新的,然后将worker加入到内存缓存中(hashmap),用持久化引擎将worker信息进行持久化(文件系统,zookeeper等),调用schedule方法
spark-submit提交spark application会向master注册driver,将driver信息加入到内存缓存中(hashmap),加入等到调度队列(arraybuffer),用持久化引擎将driver信息进行持久化,调用schedule方法
application向master注册,将application信息加入到内存缓存中(hashmap),加入等到调度队列(arraybuffer),用持久化引擎将driver信息进行持久化,调用schedule方法
**三:改变状态机制**
case DriverStateChanged(driverId, state, exception) => {
  state match {
    case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
      removeDriver(driverId, state, exception)
    case _ =>
      throw new Exception(s"Received unexpected state update for driver $driverId: $state")
  }
}
**四:资源调度机制**
private def schedule(): Unit = {
  //首先判断master是不是alive
  if (state != RecoveryState.ALIVE) { return }
  //Random.shuffle对传入的集合的元素进行随机的打乱
  //去除了worker中所有之前来注册过的worker
  val shuffledWorkers = Random.shuffle(workers) 
  for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
    for (driver <- waitingDrivers) {
      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
        launchDriver(worker, driver)
        waitingDrivers -= driver
      }
    }
  }
application的调度算法  
1默认spreadOutApps 将每个application要启动的executor都平均分布到各个worker上
2非spreadOutApps 将每一个application尽可能少的分配到worker上
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

5 worker为application启动executor

1master向worker发送请求:LaunchDriver和LaunchExecutor
2worker在内部启动了一个线程driverrunner,driverrunner创建driver工作目录,会启动driver进程,并在之后进行管理
3worker在内部启动一个线程创建executor创建目录,然后启动executor
  • 1
  • 2
  • 3

6 Job,每有一个action就会创建一个Job,将job提交给DAGScheduler
7 DAGScheduler 调用runjob()会将Job划分为多个stage,每个stage创建一个TaskSet

DAGScheduler的stage划分算法:会从触发action操作的那个rdd开始往前倒推,首先为最后一个rdd创建一个stage,往前倒推的时候,如果发现某个rdd与其父RDD的依赖关系为宽依赖,那么就又划分一个stage,以此类推,继续往前倒推,直到所有的RDD全部遍历完
前面流程已经到了runJob()
这个里面有几个重要的方法
**方法1** private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
//使用触发job的最后一个RDD,创建finalStage
  var finalStage: ResultStage = null
  try {
    finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
  } catch {
    case e: Exception =>
      logWarning("Creating new stage failed due to exception - job: " + jobId, e)
      listener.jobFailed(e)
      return
  }
  //先尝试提交最后一个stage
  submitStage(finalStage)
**方法2** private def submitStage(stage: Stage) {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    logDebug("submitStage(" + stage + ")")
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
//关键方法1getMissingParentStages
      val missing = getMissingParentStages(stage).sortBy(_.id)
      logDebug("missing: " + missing)
//这里会反复递归调用  直到最初的stage,也就是他没有父stage了,那么此时就会提交这个最初的stage
      if (missing.isEmpty) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage, jobId.get)
      } else {
        for (parent <- missing) {
          submitStage(parent)
        }
//如果当前stage还有父stage,那么将这个stage放入waitingStages 等待执行的stage队列
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}
创建task并计算最佳位置
//获取某个stage的父stage
**方法3** private def getMissingParentStages(stage: Stage): List[Stage] = {
  val missing = new HashSet[Stage]
  val visited = new HashSet[RDD[_]]
  // We are manually maintaining a stack here to prevent StackOverflowError
  // caused by recursively visiting
  val waitingForVisit = new Stack[RDD[_]]
  def visit(rdd: RDD[_]) {
    if (!visited(rdd)) {
      visited += rdd
      val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
      if (rddHasUncachedPartitions) {
        for (dep <- rdd.dependencies) {
          dep match {
//如果是宽依赖
            case shufDep: ShuffleDependency[_, _, _] =>
//使用宽依赖的那个RDD创建一个stage,并且会将isShffleMap设置为true
//默认最够一个stage不是ShffleMap stage
//但是finalStage之前所有的stage都是ShffleMap stage
              val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
              if (!mapStage.isAvailable) {
                missing += mapStage
              }
//如果是窄依赖,递归继续推到栈中继续调用直到找到宽依赖或者遍历完
            case narrowDep: NarrowDependency[_] =>
              waitingForVisit.push(narrowDep.rdd)
          }
        }
      }
    }
  }
//首先往栈中推入了stage的最后一个RDD
  waitingForVisit.push(stage.rdd)
  while (waitingForVisit.nonEmpty) {
//对stage的最后一个RDD执行自己内部定义的visit方法
    visit(waitingForVisit.pop())
  }
  missing.toList
}
//小结:编写的spark application被划分为了几个job,每个job被划分成了几个stage,每个stage包含了哪些代码,当你发现某个stage执行的特别慢或者报错,才能针对的去排查或者性能调优
提交第一个stage,为stage创建一批task,数量等于partition数量
**方法4** private def submitMissingTasks(stage: Stage, jobId: Int) {
 //为stage创建指定数量的task,计算task的最佳位置
val tasks: Seq[Task[_]] = try {
    stage match {
      case stage: ShuffleMapStage =>
        partitionsToCompute.map { id =>
          val locs = taskIdToLocations(id) //计算task最佳位置
          val part = stage.rdd.partitions(id)
          new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, stage.internalAccumulators)
        }
      case stage: ResultStage =>
        val job = stage.activeJob.get
        partitionsToCompute.map { id =>
          val p: Int = stage.partitions(id)
          val part = stage.rdd.partitions(p)
          val locs = taskIdToLocations(id)
          new ResultTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, id, stage.internalAccumulators)
        }
    }
//最终提交了task
taskScheduler.submitTasks(new TaskSet(
  tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  }
**方法5** val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
  stage match {
    case s: ShuffleMapStage =>
      partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
    case s: ResultStage =>
      val job = s.activeJob.get
      partitionsToCompute.map { id =>
        val p = s.partitions(id)
        (id, getPreferredLocs(stage.rdd, p))
      }.toMap
  }
}
//就是从stage的最后一个RDD开始,寻找被cache或者checkpoint的RDD,task的最佳位置就是cache或者checkpoint的RDD的位置,以为这样的话,task就在那个节点上执行不需要计算之前的RDD
**方法6** private def getPreferredLocsInternal(
    rdd: RDD[_],
    partition: Int,
    visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
  if (!visited.add((rdd, partition))) {
    return Nil
  }
//寻到当前RDD的partition是否缓存了
  val cached = getCacheLocs(rdd)(partition)
  if (cached.nonEmpty) {
    return cached
  }
//寻到当前RDD的partition是否被checkpoint了
  val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
  if (rddPrefs.nonEmpty) {
    return rddPrefs.map(TaskLocation(_))
  }
//最后递归调用自己去寻找父RDD看看对象的partition是否被cache或者checkpoint
  rdd.dependencies.foreach {
    case n: NarrowDependency[_] =>
      for (inPart <- n.getParents(partition)) {
        val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
        if (locs != Nil) {
          return locs
        }
      }
    case _ =>
  }
//如果从头到尾都没有被缓存或者checkpoint 那么就没有最佳位置
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155

8 TaskScheduler将TaskSet中的每一个task提交到Executor上,Executor每接到一个TASK,都会用TaskRunner来封装task,然后从线程池取一个线程来执行这个Task.TaskRunner将要执行的算子,以及函数,拷贝反序列化,然后执行task

TaskSchedulerImpl实现taskScheduler
//给每一个taskset都会创建一个tasksetManager,在后面负责它的那个taskset的任务执行的监视和管理
val manager = createTaskSetManager(taskSet, maxTaskFailures)
//重点:backend负责创建appclient,向master注册application的
backend.reviveOffers()
//最后是调用,将task发送到对应executor上
private def makeOffers() {
  val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
  val workOffers = activeExecutors.map { case (id, executorData) =>
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
  }.toSeq
  launchTasks(scheduler.resourceOffers(workOffers))
}
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] {
//将可用的executor打散从而尽可能负载均衡
val shuffledOffers = Random.shuffle(offers)
//tasks二维数组,每个子ArrayBuffer数量固定,就是这个executor可用的cores数量
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue

}
    //对每个taskset从最好的一种本地化级别开始尝试将task在executor上启动,如果启动启动不了,调出来进入下一级别的本地化级别启动,以达到task在executor上全部启动
 var launchedTask = false
  for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
    do {
      launchedTask = resourceOfferSingleTaskSet(
          taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
    } while (launchedTask)
  }
  if (tasks.size > 0) {
    hasLaunchedTask = true
  }
  return tasks
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

9、ShuffleMapTask and ResultTask
task有两种,ShuffleMapTask ,ResultTask,只有最后一个stage是ResultTask,
所以整个saprk应用程序的执行,就是stage分批次作为taskset提交到Executor执行,每个task针对RDD的一个partition,执行我们自定义的算子和函数,大量的task并行的操作RDD
task原理:最核心的是RDD的iterator方法,针对task对应的partition执行我们自定义的function函数

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/102796
推荐阅读
相关标签
  

闽ICP备14008679号