赞
踩
目录
前面学习了Spark作业提交到Yarn的执行流程: Spark内核(执行原理)之环境准备
现在学习一下Driver的工作流程。Driver线程主要是初始化SparkContext对象,准备运行所需的上下文,然后一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster申请资源;另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲Executor上。
当ResourceManager向ApplicationMaster返回Container资源时,ApplicationMaster就尝试在对应的Container上启动Executor进程,Executor进程起来后,会向Driver反向注册,注册成功后保持与Driver的心跳,同时等待Driver分发任务,当分发的任务执行完毕后,将任务状态上报给Driver。
一个 Spark 应用程序包括 SparkContext、Job、Stage 以及 Task 四个概念:
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
Spark 的任务调度总体来说分两路进行,一路是 Stage 级的调度,一路是 Task 级的调度,总
体调度流程如下图所示:
其中,Spark RDD 通过其 Transactions 操作,形成了 RDD 血缘(依赖)关系图,即 DAG,最后通过 Action 的调用,触发 Job 并调度执行,执行过程中会创建两个调度器:DAGScheduler 和 TaskScheduler。
Driver 初始化 SparkContext 过程中,会分别初始化 DAGScheduler、TaskScheduler、
SchedulerBackend 以及 HeartbeatReceiver,并启动 SchedulerBackend 以及 HeartbeatReceiver。
SchedulerBackend 通过 ApplicationMaster 申请资源,并不断从 TaskScheduler 中拿到合适的Task 分发到 Executor 执行。
HeartbeatReceiver 负责接收 Executor 的心跳信息,监控 Executor的存活状况,并通知到 TaskScheduler。
在了解 Stage 划分之前,先了解下RDD的依赖关系。
相邻的两个RDD的关系称为依赖关系,如 val rdd1 = rdd.map(_*2) 表示rdd1依赖于rdd,即新的RDD依赖于旧的RDD,多个连续的RDD的依赖关系,称之为血缘关系。
作用:
看这个例子,有这样的依赖关系 :RDD1->RDD2->RDD3->RDD4,RDD4是通过读取数据源数据创建得到的,代码层面的逻辑为:
- val RDD4 = sc.textFile("sssssssss")
- val RDD3 = RDD4.flatMap()
- val RDD2 = RDD3.map()
- val RDD1 = RDD2.reduceByKey()
假设程序运行到val RDD1 = RDD2.reduceByKey()时失败了,那么这个Task将要面临重启。那问题来了,RDD1怎么知道我的上一步是什么呢?从头开始重新运行的话,头在哪儿呢?我们知道RDD是不会保存数据的,它只保存了数据结构和计算逻辑,如果遇到task失败,那不得整个作业重跑?
所以RDD为了提供容错性,需要将RDD之间的依赖关系保存下来,这样一旦出现错误,可以根据血缘关系将数据源重新读取进行计算。这也就是依赖关系的作用所在。
先看一个示意图:
解读:
那么程序中如何查看依赖关系呢?
通过一个demo来测试一下,可以通过rdd.toDebugString来打印当前rdd的依赖信息:
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local").setAppName("TRXTest")
- val sc = new SparkContext(sparkConf) //环境对象
- val lines = sc.textFile("datas/word.txt")
- // 打印血缘关系
- println("RDD4的依赖关系:")
- println(lines.toDebugString)
- println("**********************")
- val words = lines.flatMap(_.split(" "))
- println("RDD3的依赖关系:")
- println(words.toDebugString)
- println("**********************")
- val wordToOne = words.map(word => (word, 1))
- println("RDD2的依赖关系:")
- println(wordToOne.toDebugString)
- println("**********************")
- val wordToSum = wordToOne.reduceByKey(_ + _)
- println("RDD1的依赖关系:")
- println(wordToSum.toDebugString)
- println("**********************")
- val array = wordToSum.collect() // 收集
- array.foreach(println)
- sc.stop()
- }
运行结果:
- RDD4的依赖关系:
- (1) datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
- | datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
- **********************
- RDD3的依赖关系:
- (1) MapPartitionsRDD[2] at flatMap at Spark01_RDD_dep.scala:20 []
- | datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
- | datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
- **********************
- RDD2的依赖关系:
- (1) MapPartitionsRDD[3] at map at Spark01_RDD_dep.scala:24 []
- | MapPartitionsRDD[2] at flatMap at Spark01_RDD_dep.scala:20 []
- | datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
- | datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
- **********************
- RDD1的依赖关系:
- (1) ShuffledRDD[4] at reduceByKey at Spark01_RDD_dep.scala:28 []
- +-(1) MapPartitionsRDD[3] at map at Spark01_RDD_dep.scala:24 []
- | MapPartitionsRDD[2] at flatMap at Spark01_RDD_dep.scala:20 []
- | datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
- | datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
- **********************
从运行结果上看,跟前面画的图展示的一致。
从上面的demo返回结果可以看到,在依赖关系中有MapPartitionsRDD和ShuffledRDD两种,这是什么意思呢?
MapPartitionsRDD类型的依赖有
val RDD4 = sc.textFile("sssssssss")
val RDD3 = RDD4.flatMap()
val RDD2 = RDD3.map()
可以发现,假设RDD只有一个分区,这几个RDD在转化中都没有Shuffle操作,他们都各自只依赖于一个RDD,我们称之为OneToOne依赖(窄依赖)。即窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。
那么对应的val RDD1 = RDD2.reduceByKey() 由于产生了Shuffle操作,RDD1的依赖于上游RDD多个分区的数据,这个依赖,称之为Shuffle依赖(宽依赖)。即同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。
1)对于窄依赖:
窄依赖的多个分区可以并行计算;
窄依赖的一个分区的数据如果丢失,只需要重新计算对应的分区的数据就可以了。
2)对于宽依赖:
它是Stage划分的依据,对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。
那么窄依赖和宽依赖有什么作用呢?这就是我们接下来要学习的:Stage 划分。
Spark 的任务调度是从 DAG 切割开始,主要是由 DAGScheduler 来完成。当遇到一个Action 操作后就会触发一个 Job 的计算,并交给 DAGScheduler 来提交。
一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能 执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个 DAG 划分成多 个 Stage/阶段,在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。
看一个简单的例子 WordCount 的Stage划分流程:
在程序执行过程中,一个Stage是否被提交,需要判断它的父 Stage 是否执行完毕,只有在父 Stage 执行完毕才能提交当前 Stage,如果一个 Stage 没有父 Stage,那么从该 Stage 开始提交。
Stage 提交时会将 Task 信息(分区信息以及方法等)序列化并被打包成 TaskSet 交给 TaskScheduler,一个Partition 对应一个 Task,另一方面 TaskScheduler 会监控 Stage 的运行状态,只有 Executor 丢失或者 Task 由于 Fetch 失败才需要重新提交失败的 Stage 以调度运行失败的任务,其他类型的 Task 失败会在 TaskScheduler 的调度过程中重试。
相对来说 DAGScheduler 做的事情较为简单,仅仅是在 Stage 层面上划分 DAG,提交
Stage 并监控相关状态信息。TaskScheduler 则相对较为复杂。
如何将划分好的Task发送到Executor中执行呢?
TaskSetManager 负责监控管理同一个Stage中的Tasks,TaskScheduler就是TaskSetManager为单元来调度任务。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。