赞
踩
-
- def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
- // groupByKey shouldn't use map side combine because map side combine does not
- // reduce the amount of data shuffled and requires all map side data be inserted
- // into a hash table, leading to more objects in the old gen.
- val createCombiner = (v: V) => CompactBuffer(v)
- val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
- val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
- val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
- createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
- bufs.asInstanceOf[RDD[(K, Iterable[V])]]
- }
-
- /**
- 底层调用的是combineByKeyWithClassTag,这里边其实是创建了一个ShuffledRDD。
- 这里的三个函数就是这些By算子的核心,第一个函数createCombiner就是对应每一个分区内的每一个key
- 创建一个集合,用来存储value,并将第一个value存储进去。
- mergeValue就是将每一个分区内每一个key对应的除第一个值以外的值依次放进前边创建的集合。
- mergeCombiners是在shuffleRead以后,将每一个key对应的集合合并到一起。
- */
-
既然知道了groupByKey底层调用的是 combineByKeyWithClassTag,而 combineByKeyWithClassTag底层创建了一个ShuffledRDD来完成计算,那么就可以由浅入深的使用底层的代码实现groupByKey的逻辑。
combineByKey和combineByKeyWithClassTag是一回事。
- import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
-
- import scala.collection.mutable.ArrayBuffer
-
- /**
- * def combineByKey[C](
- * createCombiner: V => C,
- * mergeValue: (C, V) => C,
- * mergeCombiners: (C, C) => C,
- * partitioner: Partitioner,
- * mapSideCombine: Boolean = true,
- * serializer: Serializer = null): RDD[(K, C)] = self.withScope {
- * combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
- * partitioner, mapSideCombine, serializer)(null)
- * }
- *
- * 看一下这个方法传入的参数,前四个是要传入的,后边两个根据需求来判断
- */
-
- object CombineByKeyDemo1 {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[*]").setAppName("by")
- val sc = new SparkContext(conf)
- val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("a", 4), ("b", 56), ("d", 6)))
-
- val createCombiner = (fv: Int) => ArrayBuffer[Int](fv)
- val mergeValue = (ab1: ArrayBuffer[Int], lv: Int) => ab1 += lv
- val mergeCombiners = (ab1: ArrayBuffer[Int], ab2: ArrayBuffer[Int]) => ab1 ++= ab2
-
- // groupByKey不能在map端进行聚合
- val rdd2 = rdd1.combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(rdd1.partitions.length), false)
-
- rdd2.foreach(println)
-
- sc.stop()
- }
- }
- import org.apache.spark.rdd.ShuffledRDD
- import org.apache.spark.{Aggregator, HashPartitioner, SparkConf, SparkContext}
-
- import scala.collection.mutable.ArrayBuffer
-
- /**
- * class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
- *
- * var prev: RDD[_ <: Product2[K, V]],
- * part: Partitioner)
- *
- * ShuffledRDD在创建的时候只需要传入父RDD和分区器,然后调用该类中的方法来实现我们的逻辑
- **/
- object ShuffledRddDemo1 {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[*]").setAppName("by")
- val sc = new SparkContext(conf)
- val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("a", 4), ("b", 56), ("d", 6)))
-
- val rdd2 = new ShuffledRDD[String, Int, ArrayBuffer[Int]](rdd1, new HashPartitioner(rdd1.partitions.length))
- rdd2.setMapSideCombine(false)
- val createCombiner = (fv: Int) => ArrayBuffer[Int](fv)
- val mergeValue = (ab1: ArrayBuffer[Int], lv: Int) => ab1 += lv
- val mergeCombiners = (ab1: ArrayBuffer[Int], ab2: ArrayBuffer[Int]) => ab1 ++= ab2
-
- rdd2.setAggregator(new Aggregator[String, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners))
-
- rdd2.foreach(println)
- sc.stop()
- }
- }
未完待续......
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。