当前位置:   article > 正文

spark计算引擎,资源调度,任务调度,累加器,广播变量

spark org.apache.spark.metrics.source.accumulatorsource

Spark

关键词:spark计算引擎,资源调度(申请资源),任务调度(执行task)

累加器,广播变量。

spark计算引擎,资源调度(申请资源),任务调度(执行task)

注:此此流程使用 yarn-client 模式

  1. 1-7 为资源调度(申请资源)
  2. 1在本地启动Driver程序
  3. 2.向RM申请启动AM
  4. 3. AM随机分配一个节点启动AM
  5. 4.启动AM
  6. 5.AM向RM申请启动Executor
  7. 6.AM分配一批节点启动Executor
  8. 7.Executor反向注册给Driver端
  9. 8-最后为任务调度
  10. 8.当代码中遇到一个action算子时,开始执行调度任务
  11. 9.构建DAG有向无环图
  12. 10.DAGSheduler构建宽窄依赖,将DAG有向无环图切分成多个Stage,Stage: 是一组可以并行计算的task
  13. 11.将stage以taskSet的形式发送给TaskScheuler
  14. 12.TaskScheduler将TaskSet的任务task发送到Executor中去执行。会尽量将task发送到数据所在的节点执行
  15. 13.发送task任务到Executor执行

其中还涉及到两个机制

重试机制:

1.如果task执行失败时TaskScheduler重试3次

2.如果还是失败DAGScheduler重试Stage4次

推测机制:

如果spark发现有task执行的很慢,会在发送一个一样的task去竞争

此图很重要

累加器

无累加器时

存在累加器

代码实现
  1. package com.core.day3
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.util.LongAccumulator
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. object Demo21Accumulator {
  6. def main(args: Array[String]): Unit = {
  7. val conf = new SparkConf()
  8. conf.setAppName("Demo21Accumulator")
  9. conf.setMaster("local")
  10. val sc = new SparkContext(conf)
  11. val studentsRDD: RDD[String] = sc.textFile("data/students.txt")
  12. var count = 0
  13. studentsRDD.foreach(stu => {
  14. count += 1
  15. //在算子内部对算子外的一个普通变量进行累加,在算子外面读不到累加的结果
  16. //因为算子内的代码运行在Executor,算子外面的代码云行在Driver端
  17. //算子内的变量只是算子外面的一个副本
  18. //println(s"里面的:$count")
  19. })
  20. println(s"外面的:$count")
  21. /**
  22. * 累加器
  23. *
  24. */
  25. //1.定义累加器
  26. val accumulator: LongAccumulator = sc.longAccumulator
  27. val mapRDD: RDD[String] = studentsRDD.map(stu =>{
  28. accumulator.add(1)
  29. stu
  30. })
  31. mapRDD.foreach(println)
  32. println(s"accumulator:${accumulator.value}")
  33. }
  34. }

广播变量

代码实现
  1. package com.core.day3
  2. import org.apache.spark.broadcast.Broadcast
  3. import org.apache.spark.metrics.source
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark.{SparkConf, SparkContext}
  6. import scala.io.Source
  7. //noinspection SourceNotClosed
  8. object Demo23Broadcast {
  9. def main(args: Array[String]): Unit = {
  10. val conf = new SparkConf()
  11. conf.setMaster("local")
  12. conf.setAppName("wc")
  13. val sc = new SparkContext(conf)
  14. /**
  15. * 广播变量
  16. * 当在算子内使用算子外的一个比较大的变量时,可以将这个变量广播出去,可以减少变量的副本数
  17. *
  18. */
  19. //读取学生表,以学号为key 构建一个map集合
  20. val studentMap: Map[String, String] = Source
  21. .fromFile("data/students.txt")
  22. .getLines()
  23. .toList
  24. .map(stu => {
  25. val id: String = stu.split(",")(0)
  26. (id,stu)
  27. }).toMap
  28. val scoresRDD: RDD[String] = sc.textFile("data/score.txt", 10)
  29. println(s"scoresRDD:${scoresRDD.getNumPartitions}")
  30. /**
  31. * 关联学生表和分数表
  32. * 循环分数表,使用学号到学生表的mao集合中查询学生的信息
  33. *
  34. */
  35. /**
  36. * 将Driver端的一个普通变量广播到Executor端
  37. *
  38. */
  39. val broadCastMap: Broadcast[Map[String, String]] = sc.broadcast(studentMap)
  40. val joinRDD: RDD[(String, String)] = scoresRDD.map(sco => {
  41. val id: String = sco.split(",")(0)
  42. //使用学号到学生表中获取学生的信息
  43. /**
  44. * 在算子内使用广播变量
  45. * 1、当第一个task在执行过程中如果使用了广播变量,会向Executor获取广播变量
  46. * 2、如果Executor中没有这个广播变量,Executor会去Driver端获取
  47. * 3、如果下一个task再使用到这个广播变量就可以直接用了
  48. *
  49. */
  50. //在算子内获取广播变量
  51. val map: Map[String, String] = broadCastMap.value
  52. val studentInfo: String = map.getOrElse(id, "默认值")
  53. (sco, studentInfo)
  54. })
  55. joinRDD.foreach(println)
  56. }
  57. }

Executor 不仅有线程池,还有blockManager

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/776391
推荐阅读
相关标签
  

闽ICP备14008679号