当前位置:   article > 正文

Rdd操作partitionBy、mapValues、flatMapValues_rdd.partitionby(int, hashfunc).

rdd.partitionby(int, hashfunc).

combineByKey
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。

其中的参数:

createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C

mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C

mergeCombiners:合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C

numPartitions:结果RDD分区数,默认保持原有的分区数

partitioner:分区函数,默认为HashPartitioner

mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true

看下面例子:

在这里插入代码片
```scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21
 
scala> rdd1.combineByKey(
     |       (v : Int) => v + "_",   
     |       (c : String, v : Int) => c + "@" + v,  
     |       (c1 : String, c2 : String) => c1 + "$" + c2
     |     ).collect
res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/466790
推荐阅读
相关标签
  

闽ICP备14008679号