当前位置:   article > 正文

broadCast、cache、checkpoint、累加器_cache broadcast

cache broadcast

broadCast

  1. def main(args: Array[String]): Unit = {
  2. val conf = new SparkConf()
  3. conf.setMaster("local[*]").setAppName(this.getClass.getName)
  4. val sc = new SparkContext(conf)
  5. sc.setLogLevel("WARN")
  6. //
  7. val set = Set("hello")
  8. val broadcast: Broadcast[Set[String]] = sc.broadcast(set)
  9. val rdd1: RDD[String] = sc.textFile("hdfs://hadoop10:9000/a.txt")
  10. rdd1.flatMap(_.split(" ")).filter(v => {
  11. //通过广播变量获取set集合
  12. broadcast.value.contains(v)
  13. }).collect().foreach(println)
  14. }

cache

  1. def main(args: Array[String]): Unit = {
  2. val conf = new SparkConf()
  3. conf.setMaster("local[*]").setAppName(this.getClass.getName)
  4. val sc = new SparkContext(conf)
  5. sc.setLogLevel("WARN")
  6. //cache 缓存机制,可以将前面的计算结果缓存,当rdd的计算结果再此被调用时,可以直接从缓存中获取,不必重复计算,提升效率
  7. //val rdd1: RDD[String] = sc.makeRDD(List("张三")).map(_ + System.currentTimeMillis()) //输出结果不一样
  8. val rdd1: RDD[String] = sc.makeRDD(List("张三")).map(_ + System.currentTimeMillis()).persist() //输出结果一样,说明存在缓存机制
  9. rdd1.foreach(println)
  10. rdd1.foreach(println)
  11. }

checkpoint

  1. object checkPoint {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. conf.setMaster("local[*]").setAppName(this.getClass.getName)
  5. val sc = new SparkContext(conf)
  6. sc.setLogLevel("WARN")
  7. //
  8. sc.setCheckpointDir("hdfs://hadoop10:9000/rdd-checkpoint")
  9. val rdd1: RDD[String] = sc.makeRDD(List("张三")).map(_ + System.currentTimeMillis())
  10. rdd1.checkpoint() //
  11. rdd1.foreach(println)
  12. rdd1.foreach(println)
  13. rdd1.foreach(println)
  14. }
  15. }

累加器

  1. object accumulators {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. conf.setMaster("local[*]").setAppName(this.getClass.getName)
  5. val sc = new SparkContext(conf)
  6. sc.setLogLevel("WARN")
  7. //实时累加,executer端对对分区内数据做累加,然后将数据拉取到Driver端做分区间累加
  8. var count = sc.longAccumulator("c1")
  9. sc.makeRDD(1 to 5).foreach(v => count.add(v))
  10. sc.makeRDD(1 to 6).foreach(v => count.add(v))
  11. println(count)
  12. sc.stop()
  13. }
  14. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/725046
推荐阅读
相关标签
  

闽ICP备14008679号