当前位置:   article > 正文

Spark源码解析之By (groupBy,reduceByKey等)_spark group by源码

spark group by源码

一.groupByKey

  1. def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
  2. // groupByKey shouldn't use map side combine because map side combine does not
  3. // reduce the amount of data shuffled and requires all map side data be inserted
  4. // into a hash table, leading to more objects in the old gen.
  5. val createCombiner = (v: V) => CompactBuffer(v)
  6. val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
  7. val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  8. val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
  9. createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  10. bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  11. }
  12. /**
  13. 底层调用的是combineByKeyWithClassTag,这里边其实是创建了一个ShuffledRDD。
  14. 这里的三个函数就是这些By算子的核心,第一个函数createCombiner就是对应每一个分区内的每一个key
  15. 创建一个集合,用来存储value,并将第一个value存储进去。
  16. mergeValue就是将每一个分区内每一个key对应的除第一个值以外的值依次放进前边创建的集合。
  17. mergeCombiners是在shuffleRead以后,将每一个key对应的集合合并到一起。
  18. */

既然知道了groupByKey底层调用的是 combineByKeyWithClassTag,而 combineByKeyWithClassTag底层创建了一个ShuffledRDD来完成计算,那么就可以由浅入深的使用底层的代码实现groupByKey的逻辑。

一.1 使用combineByKey实现groupByKey

combineByKey和combineByKeyWithClassTag是一回事。

  1. import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
  2. import scala.collection.mutable.ArrayBuffer
  3. /**
  4. * def combineByKey[C](
  5. * createCombiner: V => C,
  6. * mergeValue: (C, V) => C,
  7. * mergeCombiners: (C, C) => C,
  8. * partitioner: Partitioner,
  9. * mapSideCombine: Boolean = true,
  10. * serializer: Serializer = null): RDD[(K, C)] = self.withScope {
  11. * combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
  12. * partitioner, mapSideCombine, serializer)(null)
  13. * }
  14. *
  15. * 看一下这个方法传入的参数,前四个是要传入的,后边两个根据需求来判断
  16. */
  17. object CombineByKeyDemo1 {
  18. def main(args: Array[String]): Unit = {
  19. val conf = new SparkConf().setMaster("local[*]").setAppName("by")
  20. val sc = new SparkContext(conf)
  21. val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("a", 4), ("b", 56), ("d", 6)))
  22. val createCombiner = (fv: Int) => ArrayBuffer[Int](fv)
  23. val mergeValue = (ab1: ArrayBuffer[Int], lv: Int) => ab1 += lv
  24. val mergeCombiners = (ab1: ArrayBuffer[Int], ab2: ArrayBuffer[Int]) => ab1 ++= ab2
  25. // groupByKey不能在map端进行聚合
  26. val rdd2 = rdd1.combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(rdd1.partitions.length), false)
  27. rdd2.foreach(println)
  28. sc.stop()
  29. }
  30. }

一.2 使用ShuffledRDD实现groupByKey

  1. import org.apache.spark.rdd.ShuffledRDD
  2. import org.apache.spark.{Aggregator, HashPartitioner, SparkConf, SparkContext}
  3. import scala.collection.mutable.ArrayBuffer
  4. /**
  5. * class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
  6. *
  7. * var prev: RDD[_ <: Product2[K, V]],
  8. * part: Partitioner)
  9. *
  10. * ShuffledRDD在创建的时候只需要传入父RDD和分区器,然后调用该类中的方法来实现我们的逻辑
  11. **/
  12. object ShuffledRddDemo1 {
  13. def main(args: Array[String]): Unit = {
  14. val conf = new SparkConf().setMaster("local[*]").setAppName("by")
  15. val sc = new SparkContext(conf)
  16. val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("a", 4), ("b", 56), ("d", 6)))
  17. val rdd2 = new ShuffledRDD[String, Int, ArrayBuffer[Int]](rdd1, new HashPartitioner(rdd1.partitions.length))
  18. rdd2.setMapSideCombine(false)
  19. val createCombiner = (fv: Int) => ArrayBuffer[Int](fv)
  20. val mergeValue = (ab1: ArrayBuffer[Int], lv: Int) => ab1 += lv
  21. val mergeCombiners = (ab1: ArrayBuffer[Int], ab2: ArrayBuffer[Int]) => ab1 ++= ab2
  22. rdd2.setAggregator(new Aggregator[String, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners))
  23. rdd2.foreach(println)
  24. sc.stop()
  25. }
  26. }

未完待续......

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号