当前位置:   article > 正文

SparkCore中groupByKey和reduceByKey_groupbykey([numtasks]

groupbykey([numtasks]

一、概念

groupByKey([numTasks])
在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置

二、性能

groupByKey
全局聚合算子,将所有map task中的数据都拉取到shuffle中将key相同的数据进行聚合,它存在很多弊端,例如:将大量的数据进行网络传输,浪费大量的资源,最重要的是如果数据量太大还会出现GC和OutOfMemoryError的错误,如果数据某个key的数据量远大于其他key的数据,在进行全局聚合的时候还会出现数据倾斜的问题。

reduceByKey
在map阶段进行本地聚合以后才会到shuffle中进行全局聚合,相当于是进入shuffle之前已经做了一部分聚合,那么它的网络传输速度会比groupbykey快很多而且占用资源也会减少很多,但是算子本身就如它的名字一样,主要是进行计算的将相同key的数据进行计算,返回计算结果。

reduceByKey相当于groupByKey+mapValues

mapValues
mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD

三、应用

场景:按照key分组求对应的value之和

# 数据准备rdd1
scala> val rdd1 = sc.parallelize(List(("Tom",1),("Jerry",3),("Kitty",2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

# 数据准备rdd2
scala> val rdd2 = sc.parallelize(List(("Jerry",2),("Tom",2),("Lily",4)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24

# 数据准备多个相同key的对应不同value
scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[2] at union at <console>:27

scala> rdd3.collect
res0: Array[(String, Int)] = Array((Tom,1), (Jerry,3), (Kitty,2), (Jerry,2), (Tom,2), (Lily,4))

# 对数据进行groupByKey获取相同key下的Value的集合
scala> val rdd4 = rdd3.groupByKey()
rdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[3] at groupByKey at <console>:25

scala> rdd4.collect
res1: Array[(String, Iterable[Int])] = Array((Jerry,CompactBuffer(3, 2)), (Lily,CompactBuffer(4)), (Tom,CompactBuffer(1, 2)), (Kitty,CompactBuffer(2)))

# mapValues对<K,V>结构数据中的V进行操作
scala> val rdd5 = rdd4.mapValues(x=>x.sum)
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10] at mapValues at <console>:25

# 通过groupByKey+mapValues获取key对应的value之和
scala> rdd5.collect
res13: Array[(String, Int)] = Array((Jerry,5), (Lily,4), (Tom,3), (Kitty,2))

# 通过reduceByKey直接操作相同key下的不同value获取key对应的value之和
scala> val rdd6 = rdd3.reduceByKey(_+_)
rdd6: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:25

scala> rdd6.collect
res9: Array[(String, Int)] = Array((Jerry,5), (Lily,4), (Tom,3), (Kitty,2))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/466756
推荐阅读
相关标签
  

闽ICP备14008679号