Spark
关键词:spark计算引擎,资源调度(申请资源),任务调度(执行task)
累加器,广播变量。
spark计算引擎,资源调度(申请资源),任务调度(执行task)
注:此此流程使用 yarn-client 模式
- 1-7 为资源调度(申请资源)
- 1在本地启动Driver程序
- 2.向RM申请启动AM
- 3. AM随机分配一个节点启动AM
- 4.启动AM
- 5.AM向RM申请启动Executor
- 6.AM分配一批节点启动Executor
- 7.Executor反向注册给Driver端
- 8-最后为任务调度
- 8.当代码中遇到一个action算子时,开始执行调度任务
- 9.构建DAG有向无环图
- 10.DAGSheduler构建宽窄依赖,将DAG有向无环图切分成多个Stage,Stage: 是一组可以并行计算的task
- 11.将stage以taskSet的形式发送给TaskScheuler
- 12.TaskScheduler将TaskSet的任务task发送到Executor中去执行。会尽量将task发送到数据所在的节点执行
- 13.发送task任务到Executor执行
其中还涉及到两个机制
重试机制:
1.如果task执行失败时TaskScheduler重试3次
2.如果还是失败DAGScheduler重试Stage4次
推测机制:
如果spark发现有task执行的很慢,会在发送一个一样的task去竞争
此图很重要
累加器
无累加器时
存在累加器
代码实现
- package com.core.day3
- import org.apache.spark.rdd.RDD
- import org.apache.spark.util.LongAccumulator
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Demo21Accumulator {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- conf.setAppName("Demo21Accumulator")
- conf.setMaster("local")
-
- val sc = new SparkContext(conf)
-
- val studentsRDD: RDD[String] = sc.textFile("data/students.txt")
-
- var count = 0
-
- studentsRDD.foreach(stu => {
-
- count += 1
-
- //在算子内部对算子外的一个普通变量进行累加,在算子外面读不到累加的结果
- //因为算子内的代码运行在Executor,算子外面的代码云行在Driver端
- //算子内的变量只是算子外面的一个副本
-
- //println(s"里面的:$count")
- })
- println(s"外面的:$count")
-
- /**
- * 累加器
- *
- */
- //1.定义累加器
- val accumulator: LongAccumulator = sc.longAccumulator
-
- val mapRDD: RDD[String] = studentsRDD.map(stu =>{
- accumulator.add(1)
- stu
- })
-
- mapRDD.foreach(println)
-
- println(s"accumulator:${accumulator.value}")
- }
- }
广播变量
代码实现
- package com.core.day3
-
- import org.apache.spark.broadcast.Broadcast
- import org.apache.spark.metrics.source
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- import scala.io.Source
-
- //noinspection SourceNotClosed
- object Demo23Broadcast {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
-
- conf.setMaster("local")
-
- conf.setAppName("wc")
-
- val sc = new SparkContext(conf)
-
- /**
- * 广播变量
- * 当在算子内使用算子外的一个比较大的变量时,可以将这个变量广播出去,可以减少变量的副本数
- *
- */
-
- //读取学生表,以学号为key 构建一个map集合
- val studentMap: Map[String, String] = Source
- .fromFile("data/students.txt")
- .getLines()
- .toList
- .map(stu => {
- val id: String = stu.split(",")(0)
- (id,stu)
- }).toMap
-
- val scoresRDD: RDD[String] = sc.textFile("data/score.txt", 10)
-
- println(s"scoresRDD:${scoresRDD.getNumPartitions}")
-
- /**
- * 关联学生表和分数表
- * 循环分数表,使用学号到学生表的mao集合中查询学生的信息
- *
- */
-
- /**
- * 将Driver端的一个普通变量广播到Executor端
- *
- */
- val broadCastMap: Broadcast[Map[String, String]] = sc.broadcast(studentMap)
-
- val joinRDD: RDD[(String, String)] = scoresRDD.map(sco => {
- val id: String = sco.split(",")(0)
-
- //使用学号到学生表中获取学生的信息
-
- /**
- * 在算子内使用广播变量
- * 1、当第一个task在执行过程中如果使用了广播变量,会向Executor获取广播变量
- * 2、如果Executor中没有这个广播变量,Executor会去Driver端获取
- * 3、如果下一个task再使用到这个广播变量就可以直接用了
- *
- */
- //在算子内获取广播变量
- val map: Map[String, String] = broadCastMap.value
- val studentInfo: String = map.getOrElse(id, "默认值")
-
- (sco, studentInfo)
- })
-
-
- joinRDD.foreach(println)
- }
- }