赞
踩
1打包一个spark工程
2在spark安装目录的bin下,有一个spark-submit脚本
3exec “
S
P
A
R
K
H
O
M
E
"
/
b
i
n
/
s
p
a
r
k
−
c
l
a
s
s
o
r
g
.
a
p
a
c
h
e
.
s
p
a
r
k
.
d
e
p
l
o
y
.
S
p
a
r
k
S
u
b
m
i
t
"
{SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "
SPARKHOME"/bin/spark−classorg.apache.spark.deploy.SparkSubmit"@”
可以将程序提交给各种模式
–master
不设置,local模式
设置spark://开头的URL就是standalonem哦是,会提交到制定的URL的master进程上去
如果设置Yarn开头的那就是yarn模式,会读取hadoop的配置文件链接resourcemanager
1 自己写的application通过spark-submit提交上去
2 Driver通过反射的方式构建一个driverActor出来
3 初始化的时候,SparkContext构造DAGScheduler和TaskScheduler,TaskScheduler去找master向其注册application,底层相关代码是这样的:
**一:TaskScheduler的初始化机制** val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler) ①createTaskScheduler()=>TaskSchedulerImpl这就是我们所说的TaskScheduler。 ②TaskSchedulerImpl 类: 1底层通过操作一个SchedulerBackend针对不同种类cluster(standalone,yarn,mesos),调度task 2它也可以通过一个LocalBackend并且将isLocal的参数设置为true,来在本地模式下工作 3它负责处理一些通用的逻辑,比如决定多个Job的调度顺序创建SchedulerPool调度池它有不同的优先策略,比如FIFO(先进先出),启动推测任务执行 4客户端首先调用它的initalize()方法和start()方法,然后通过runTasks()提交task sets 调用一些方法 封装一些参数与master通信 registerWithMaster() 与master通信成功 ③又创建了一个SparkDeploySchedulerBackend,它在底层接收 的控制,实际上负责与Master的注册,Executor 的反注册,task发送到Executor等操作 Master接收到了,启动worker->executor反向注册到SparkDeploySchedulerBackend //TaskScheduler底层主要基于SparkDeploySchedulerBackend组件来工作 **二:DAGScheduler** private[spark] def dagScheduler_=(ds: DAGScheduler): Unit = { _dagScheduler = ds } DAGScheduler类描述:实现了面向Stage的调度机制的高层次的调度层,它会为每一个job计算一个stage的DAG(有向无环图),追踪RDD和stage的输出是否被物化(写入到了磁盘或者内存等)并且寻找一个消耗最优调度机制来运行Job 它会将Stage作为tasksets提交到底层的TaskSchedulerImpl上,在集群上运行task 除了处理stage的DAG,还负责运行每个task的最佳位置,基于当前的缓存状态,将这些最佳位置提交给底层的TaskSchedulerImpl。 此外,它还会处理由于shuffle输出文件丢失导致的失败,在这种情况下,旧的stage可能就会被重新提交,一个stage内部的失败,如果不是由于shuffle文件丢失导致的会被TaskScheduler处理,它会多次重试每一个task,直到最后才取消整个stage DAGScheduler底层基于 DAGSchedulerEventProcessLoop组件进行通信(线程) 三:sparkUI: 4040端口,显示Application的信息=》启动一个jetty服务器来提供web服务从而显示网页
4 master接收到注册application的请求后,在worker上位这个application启动多个executor
master机制:
**一:主备切换机制** 基于zookeeper的切换——自动切换master。 步骤1: 使用持久化引擎读取持久化的storeApps、storeDrivers、storeWorkers,持久化引擎有FileSystemPersistenceEngine和ZookeeperPersistenceEngine,判读如果storedApps、storedDrivers、store的Workers有任何一个非空就继续向后执行. 持久化引擎的Application、Driver、Worker的信息重新注册到Master内部的内存缓存结构中 步骤2:将application和workers的状态都修改为Unkown,然后向应用程序对应的driver和worker发送standby master的地址,“让小弟知道谁是新的老大”。 步骤3:正在正常运行的driver和worker在上一步收到地址后,会向standby master节点发送响应信息。 步骤4:standby节点收到响应信息后,会调用自身的completeRecovery()方法处理未发送响应信息的driver和worker,把它们滤掉,造成的结果是不对那些driver和对应的application分配资源。 步骤5:此时,新的active master已经产生了,那么master会调用自身的schedule()方法,对那些等待资源调度的driver、application进行调度。例如为application在worker节点启动该application所需的Executor、在某个worker节点启动drive **二:注册机理机制** worker在启动之后,就会主动向master进行注册,master将讲状态为dead状态的worker过滤掉,对于状态为unknown的worker清除掉旧的worker信息替换为新的,然后将worker加入到内存缓存中(hashmap),用持久化引擎将worker信息进行持久化(文件系统,zookeeper等),调用schedule方法 spark-submit提交spark application会向master注册driver,将driver信息加入到内存缓存中(hashmap),加入等到调度队列(arraybuffer),用持久化引擎将driver信息进行持久化,调用schedule方法 application向master注册,将application信息加入到内存缓存中(hashmap),加入等到调度队列(arraybuffer),用持久化引擎将driver信息进行持久化,调用schedule方法 **三:改变状态机制** case DriverStateChanged(driverId, state, exception) => { state match { case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => removeDriver(driverId, state, exception) case _ => throw new Exception(s"Received unexpected state update for driver $driverId: $state") } } **四:资源调度机制** private def schedule(): Unit = { //首先判断master是不是alive if (state != RecoveryState.ALIVE) { return } //Random.shuffle对传入的集合的元素进行随机的打乱 //去除了worker中所有之前来注册过的worker val shuffledWorkers = Random.shuffle(workers) for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { for (driver <- waitingDrivers) { if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver } } } application的调度算法 1默认spreadOutApps 将每个application要启动的executor都平均分布到各个worker上 2非spreadOutApps 将每一个application尽可能少的分配到worker上
5 worker为application启动executor
1master向worker发送请求:LaunchDriver和LaunchExecutor
2worker在内部启动了一个线程driverrunner,driverrunner创建driver工作目录,会启动driver进程,并在之后进行管理
3worker在内部启动一个线程创建executor创建目录,然后启动executor
6 Job,每有一个action就会创建一个Job,将job提交给DAGScheduler
7 DAGScheduler 调用runjob()会将Job划分为多个stage,每个stage创建一个TaskSet
DAGScheduler的stage划分算法:会从触发action操作的那个rdd开始往前倒推,首先为最后一个rdd创建一个stage,往前倒推的时候,如果发现某个rdd与其父RDD的依赖关系为宽依赖,那么就又划分一个stage,以此类推,继续往前倒推,直到所有的RDD全部遍历完 前面流程已经到了runJob() 这个里面有几个重要的方法 **方法1** private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { //使用触发job的最后一个RDD,创建finalStage var finalStage: ResultStage = null try { finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } //先尝试提交最后一个stage submitStage(finalStage) **方法2** private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { //关键方法1getMissingParentStages val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) //这里会反复递归调用 直到最初的stage,也就是他没有父stage了,那么此时就会提交这个最初的stage if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } //如果当前stage还有父stage,那么将这个stage放入waitingStages 等待执行的stage队列 waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } } 创建task并计算最佳位置 //获取某个stage的父stage **方法3** private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) { for (dep <- rdd.dependencies) { dep match { //如果是宽依赖 case shufDep: ShuffleDependency[_, _, _] => //使用宽依赖的那个RDD创建一个stage,并且会将isShffleMap设置为true //默认最够一个stage不是ShffleMap stage //但是finalStage之前所有的stage都是ShffleMap stage val mapStage = getShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage } //如果是窄依赖,递归继续推到栈中继续调用直到找到宽依赖或者遍历完 case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } } } } } //首先往栈中推入了stage的最后一个RDD waitingForVisit.push(stage.rdd) while (waitingForVisit.nonEmpty) { //对stage的最后一个RDD执行自己内部定义的visit方法 visit(waitingForVisit.pop()) } missing.toList } //小结:编写的spark application被划分为了几个job,每个job被划分成了几个stage,每个stage包含了哪些代码,当你发现某个stage执行的特别慢或者报错,才能针对的去排查或者性能调优 提交第一个stage,为stage创建一批task,数量等于partition数量 **方法4** private def submitMissingTasks(stage: Stage, jobId: Int) { //为stage创建指定数量的task,计算task的最佳位置 val tasks: Seq[Task[_]] = try { stage match { case stage: ShuffleMapStage => partitionsToCompute.map { id => val locs = taskIdToLocations(id) //计算task最佳位置 val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, stage.internalAccumulators) } case stage: ResultStage => val job = stage.activeJob.get partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, stage.internalAccumulators) } } //最终提交了task taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) } **方法5** val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => val job = s.activeJob.get partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } //就是从stage的最后一个RDD开始,寻找被cache或者checkpoint的RDD,task的最佳位置就是cache或者checkpoint的RDD的位置,以为这样的话,task就在那个节点上执行不需要计算之前的RDD **方法6** private def getPreferredLocsInternal( rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = { if (!visited.add((rdd, partition))) { return Nil } //寻到当前RDD的partition是否缓存了 val cached = getCacheLocs(rdd)(partition) if (cached.nonEmpty) { return cached } //寻到当前RDD的partition是否被checkpoint了 val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList if (rddPrefs.nonEmpty) { return rddPrefs.map(TaskLocation(_)) } //最后递归调用自己去寻找父RDD看看对象的partition是否被cache或者checkpoint rdd.dependencies.foreach { case n: NarrowDependency[_] => for (inPart <- n.getParents(partition)) { val locs = getPreferredLocsInternal(n.rdd, inPart, visited) if (locs != Nil) { return locs } } case _ => } //如果从头到尾都没有被缓存或者checkpoint 那么就没有最佳位置
8 TaskScheduler将TaskSet中的每一个task提交到Executor上,Executor每接到一个TASK,都会用TaskRunner来封装task,然后从线程池取一个线程来执行这个Task.TaskRunner将要执行的算子,以及函数,拷贝反序列化,然后执行task
TaskSchedulerImpl实现taskScheduler //给每一个taskset都会创建一个tasksetManager,在后面负责它的那个taskset的任务执行的监视和管理 val manager = createTaskSetManager(taskSet, maxTaskFailures) //重点:backend负责创建appclient,向master注册application的 backend.reviveOffers() //最后是调用,将task发送到对应executor上 private def makeOffers() { val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq launchTasks(scheduler.resourceOffers(workOffers)) } def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] { //将可用的executor打散从而尽可能负载均衡 val shuffledOffers = Random.shuffle(offers) //tasks二维数组,每个子ArrayBuffer数量固定,就是这个executor可用的cores数量 val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue } //对每个taskset从最好的一种本地化级别开始尝试将task在executor上启动,如果启动启动不了,调出来进入下一级别的本地化级别启动,以达到task在executor上全部启动 var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { do { launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) } if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
9、ShuffleMapTask and ResultTask
task有两种,ShuffleMapTask ,ResultTask,只有最后一个stage是ResultTask,
所以整个saprk应用程序的执行,就是stage分批次作为taskset提交到Executor执行,每个task针对RDD的一个partition,执行我们自定义的算子和函数,大量的task并行的操作RDD
task原理:最核心的是RDD的iterator方法,针对task对应的partition执行我们自定义的function函数
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。