赞
踩
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- conf.setMaster("local[*]").setAppName(this.getClass.getName)
- val sc = new SparkContext(conf)
- sc.setLogLevel("WARN")
-
- //
- val set = Set("hello")
- val broadcast: Broadcast[Set[String]] = sc.broadcast(set)
-
- val rdd1: RDD[String] = sc.textFile("hdfs://hadoop10:9000/a.txt")
- rdd1.flatMap(_.split(" ")).filter(v => {
- //通过广播变量获取set集合
- broadcast.value.contains(v)
- }).collect().foreach(println)
-
- }
cache
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- conf.setMaster("local[*]").setAppName(this.getClass.getName)
- val sc = new SparkContext(conf)
- sc.setLogLevel("WARN")
-
- //cache 缓存机制,可以将前面的计算结果缓存,当rdd的计算结果再此被调用时,可以直接从缓存中获取,不必重复计算,提升效率
- //val rdd1: RDD[String] = sc.makeRDD(List("张三")).map(_ + System.currentTimeMillis()) //输出结果不一样
- val rdd1: RDD[String] = sc.makeRDD(List("张三")).map(_ + System.currentTimeMillis()).persist() //输出结果一样,说明存在缓存机制
- rdd1.foreach(println)
- rdd1.foreach(println)
- }
- object checkPoint {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- conf.setMaster("local[*]").setAppName(this.getClass.getName)
- val sc = new SparkContext(conf)
- sc.setLogLevel("WARN")
-
- //
- sc.setCheckpointDir("hdfs://hadoop10:9000/rdd-checkpoint")
- val rdd1: RDD[String] = sc.makeRDD(List("张三")).map(_ + System.currentTimeMillis())
- rdd1.checkpoint() //
- rdd1.foreach(println)
- rdd1.foreach(println)
- rdd1.foreach(println)
- }
- }
- object accumulators {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- conf.setMaster("local[*]").setAppName(this.getClass.getName)
- val sc = new SparkContext(conf)
- sc.setLogLevel("WARN")
-
- //实时累加,executer端对对分区内数据做累加,然后将数据拉取到Driver端做分区间累加
- var count = sc.longAccumulator("c1")
- sc.makeRDD(1 to 5).foreach(v => count.add(v))
- sc.makeRDD(1 to 6).foreach(v => count.add(v))
- println(count)
- sc.stop()
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。