赞
踩
累加器是分布式共享只写变量
累加器可以用来把 Executor 端的变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge
/** 常见的系统累加器:longAccumulator/doubleAccumulator/collectionAccumulator 说明:累加器一般放在行动算子中进行操作 */ object TestRDDAcc { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("Acc") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(1,2,3,4), 2) // 创建累加器 val accSum = sc.longAccumulator("sum") rdd.foreach(num => { accSum.add(num) }) println(accSum.value) sc.stop() } }
自定义累加器实现 WordCount 案例,避免 shuffle 操作
/** 1.继承 AccumulatorV2[IN, OUT] 抽象类,定义输入输出的泛型类型 1.1 IN 表述累加器 add 的数据的类型 1.2 OUT 表示累加器 value 的返回类型 2.重写累加器的抽象方法 */ object TestAccWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("WCAcc") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List( "hello", "hive", "hello", "spark" )) // 创建自定义累加器 val wcAcc = new MyAccumulator() // 向 spark 进行注册 sc.register(wcAcc, "wordCountAcc") // 循环遍历 rdd rdd.foreach(word => { // 使用累加器 wcAcc.add(word) }) // 输出累加器的值 println(wcAcc.value) sc.stop() } } /* 自定义累加器 */ class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] { // 定义累加器的返回结果 Map private var resultMap = mutable.Map[String, Long]() // 判断是否为初始状态 override def isZero: Boolean = resultMap.isEmpty() // 复制累加器 override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = { this } // 重置累加器 override def reset(): Unit = resultMap.clear() // 获取累加器输入的数据进行操作 override def add(word: String): Unit = { // 向 resultMap 中添加新值或累加旧值 val count = resultMap.getOrElse(word, 0L) + 1 resultMap.update(word, count) } // 合并多个累加器的结果 override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = { other.value.foreach({ case (word, count) => { val newCount = this.resultMap.getOrElse(word, 0L) + 1 this.resultMap.update(word, newCount) } }) } // 返回累加器的结果 override def value: mutable.Map[String, Long] = resultMap }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。