当前位置:   article > 正文

Spark编程模型(三):Spark算子(3)-Key-Value型(键值对)算子_spark针对[('a', [1, 1, 1]), ('b', [1, 1, 1])]列表中的数据进

spark针对[('a', [1, 1, 1]), ('b', [1, 1, 1])]列表中的数据进行求和,最终呈现结

5-2.1  rdd.partitionBy

partitionBy(numPartitionspartitionFunc=<function portable_hash>)

作用:对pairRDD进行分区操作,如果原有的partitionRDD和现有的partitionRDD是一致的话就不进行分区,否则会生成shuffleRDD,即会产生shuffle过程。

  1. >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
  2. >>> sets = pairs.partitionBy(2).glom().collect()
  3. >>> len(set(sets[0]).intersection(set(sets[1])))
  4. 0

5-2.2  rdd.groupByKey

groupByKey(numPartitions=NonepartitionFunc=<function portable_hash>)

作用:groupByKey也是对每个key进行操作,但只生成一个sequence。类似groupBy方法,作用是把每一个相同Key的Value聚集起来形成一个序列。可以使用默认分区器和自定义分区器,但是这个方法开销比较大,如果想对相同Key的Value聚合或求平均,则推荐使用aggregateByKey或者reduceByKey。

  1. >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
  2. >>> sorted(rdd.groupByKey().mapValues(len).collect())
  3. [('a', 2), ('b', 1)]
  4. >>> sorted(rdd.groupByKey().mapValues(list).collect())
  5. [('a', [1, 1]), ('b', [1])]

这个例子先创建包含List集合对象的RDD,然后调用groupByKey方法将相同的Key的Value集合,最后调用collect方法以数组形式输出。

5-2.3  rdd.reduceByKey

reduceByKey(funcnumPartitions=NonepartitionFunc=<function portable_hash>)

作用:在一个(K,V)的RDD上调用,返回一个(K, V)的RDD,使用指定的reduce函数,将相同的key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

  1. >>> from operator import add
  2. >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
  3. >>> sorted(rdd.reduceByKey(add).collect())
  4. [('a', 2), ('b', 1)]

reduceByKey和groupByKey的区别:

①reduceByKey按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k, v],如果shuflle过程之前有预聚合操作,则性能会得到提升;

②groupByKey:按照Key进行分组,直接进行shuffle;

③并发指导:reduceByKey比groupByKey建议使用,但是需要注意是否会影响业务逻辑。

5-2.4  rdd.aggregateByKey

aggregateByKey(zeroValueseqFunc, combFuncnumPartitions=NonepartitionFunc=<function portable_hash>)

作用:在kv对的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

参数:zeroValue:给每个分区中的每一个key一个初始值,只会在seqFunc中使用,若是aggregate算子,在combFunc中也会被使用;

seqOp:函数用于在每一个分区中按照key分别进行用初始值逐步迭代value的函数定义的操作

combOp:对经过seqFunc处理过的数据按照key分别进行函数定义,函数用于合并每个分区中的结果

5-2.5  rdd.foldByKey

foldByKey(zeroValuefuncnumPartitions=NonepartitionFunc=<function portable_hash>)

作用:aggregateByKey的简化操作,seqop和combop相同

  1. >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
  2. >>> from operator import add
  3. >>> sorted(rdd.foldByKey(0, add).collect())
  4. [('a', 2), ('b', 1)]

5-2.6  rdd.combineByKey

combineByKey(createCombinermergeValuemergeCombiners, numPartitions=NonepartitionFunc=<function portable_hash>)

作用:对于相同的key值,把value合并成一个集合

参数:①createCombiner:创建初始规则,把元素V转换到另个一类型元素C,combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值。

②mergeValue:把元素V合并到元素C中,如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。

③mergeCombiners:将两个C元素合并,由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。

  1. >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
  2. >>> def to_list(a):  # 把key的value转换为列表
  3. return [a]  
  4. >>> def append(a, b):  # 把相同key的value合并到列表中
  5. a.append(b)    
  6. return a    
  7. >>> def extend(a, b):  # 把不同key进行合并  
  8. a.extend(b)    
  9. return a    
  10. >>> sorted(x.combineByKey(to_list, append, extend).collect())
  11. [('a', [1, 2]), ('b', [1])]

5-2.7  rdd.sortByKey

sortByKey(ascending=True, numPartitions=Nonekeyfunc=<function RDD.<lambda>>)

作用:在一个(K, V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K, V)的RDD。

参数:ascending=True,按照升序排列,False为降序排列。

  1. >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
  2. >>> sc.parallelize(tmp).sortByKey().first()
  3. ('1', 3)
  4. >>> sc.parallelize(tmp).sortByKey(True, 1).collect()
  5. [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
  6. >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
  7. [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
  8. >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
  9. >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
  10. >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
  11. [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]

5-2.8  rdd.mapValues

作用:针对(K, V)形式的类型,只对V进行操作

  1. >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
  2. >>> def f(x):  
  3. return len(x)  
  4. >>> x.mapValues(f).collect()
  5. [('a', 3), ('b', 1)]

5-2.9  rdd.join

join(othernumPartitions=None)

作用:在类型为(K, V)和(K, W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K, (V, W))的RDD

  1. >>> x = sc.parallelize([("a", 1), ("b", 4)])
  2. >>> y = sc.parallelize([("a", 2), ("a", 3)])
  3. >>> sorted(x.join(y).collect())
  4. [('a', (1, 2)), ('a', (1, 3))]

5-2.10  rdd.cogroup

cogroup(othernumPartitions=None)

cogroup方法是一个比较高级的函数,能根据Key值聚集最多三个键值对的RDD,并把相同Key值对应的Value聚集起来。

  1. >>> x = sc.parallelize([("a", 1), ("b", 4)])
  2. >>> y = sc.parallelize([("a", 2)])
  3. >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
  4. [('a', ([1], [2])), ('b', ([4], []))]

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/819829
推荐阅读
相关标签
  

闽ICP备14008679号