赞
踩
groupByKey([numTasks])
在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks])
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
groupByKey
全局聚合算子,将所有map task中的数据都拉取到shuffle中将key相同的数据进行聚合,它存在很多弊端,例如:将大量的数据进行网络传输,浪费大量的资源,最重要的是如果数据量太大还会出现GC和OutOfMemoryError的错误,如果数据某个key的数据量远大于其他key的数据,在进行全局聚合的时候还会出现数据倾斜的问题。
reduceByKey
在map阶段进行本地聚合以后才会到shuffle中进行全局聚合,相当于是进入shuffle之前已经做了一部分聚合,那么它的网络传输速度会比groupbykey快很多而且占用资源也会减少很多,但是算子本身就如它的名字一样,主要是进行计算的将相同key的数据进行计算,返回计算结果。
reduceByKey相当于groupByKey+mapValues
mapValues
mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD
场景:按照key分组求对应的value之和
# 数据准备rdd1 scala> val rdd1 = sc.parallelize(List(("Tom",1),("Jerry",3),("Kitty",2))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24 # 数据准备rdd2 scala> val rdd2 = sc.parallelize(List(("Jerry",2),("Tom",2),("Lily",4))) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24 # 数据准备多个相同key的对应不同value scala> val rdd3 = rdd1 union rdd2 rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[2] at union at <console>:27 scala> rdd3.collect res0: Array[(String, Int)] = Array((Tom,1), (Jerry,3), (Kitty,2), (Jerry,2), (Tom,2), (Lily,4)) # 对数据进行groupByKey获取相同key下的Value的集合 scala> val rdd4 = rdd3.groupByKey() rdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[3] at groupByKey at <console>:25 scala> rdd4.collect res1: Array[(String, Iterable[Int])] = Array((Jerry,CompactBuffer(3, 2)), (Lily,CompactBuffer(4)), (Tom,CompactBuffer(1, 2)), (Kitty,CompactBuffer(2))) # mapValues对<K,V>结构数据中的V进行操作 scala> val rdd5 = rdd4.mapValues(x=>x.sum) rdd5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10] at mapValues at <console>:25 # 通过groupByKey+mapValues获取key对应的value之和 scala> rdd5.collect res13: Array[(String, Int)] = Array((Jerry,5), (Lily,4), (Tom,3), (Kitty,2)) # 通过reduceByKey直接操作相同key下的不同value获取key对应的value之和 scala> val rdd6 = rdd3.reduceByKey(_+_) rdd6: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:25 scala> rdd6.collect res9: Array[(String, Int)] = Array((Jerry,5), (Lily,4), (Tom,3), (Kitty,2))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。