赞
踩
partitionBy(numPartitions, partitionFunc=<function portable_hash>)
作用:对pairRDD进行分区操作,如果原有的partitionRDD和现有的partitionRDD是一致的话就不进行分区,否则会生成shuffleRDD,即会产生shuffle过程。
- >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
- >>> sets = pairs.partitionBy(2).glom().collect()
- >>> len(set(sets[0]).intersection(set(sets[1])))
- 0
groupByKey(numPartitions=None, partitionFunc=<function portable_hash>)
作用:groupByKey也是对每个key进行操作,但只生成一个sequence。类似groupBy方法,作用是把每一个相同Key的Value聚集起来形成一个序列。可以使用默认分区器和自定义分区器,但是这个方法开销比较大,如果想对相同Key的Value聚合或求平均,则推荐使用aggregateByKey或者reduceByKey。
- >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
- >>> sorted(rdd.groupByKey().mapValues(len).collect())
- [('a', 2), ('b', 1)]
- >>> sorted(rdd.groupByKey().mapValues(list).collect())
- [('a', [1, 1]), ('b', [1])]
这个例子先创建包含List集合对象的RDD,然后调用groupByKey方法将相同的Key的Value集合,最后调用collect方法以数组形式输出。
reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)
作用:在一个(K,V)的RDD上调用,返回一个(K, V)的RDD,使用指定的reduce函数,将相同的key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
- >>> from operator import add
- >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
- >>> sorted(rdd.reduceByKey(add).collect())
- [('a', 2), ('b', 1)]
reduceByKey和groupByKey的区别:
①reduceByKey按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k, v],如果shuflle过程之前有预聚合操作,则性能会得到提升;
②groupByKey:按照Key进行分组,直接进行shuffle;
③并发指导:reduceByKey比groupByKey建议使用,但是需要注意是否会影响业务逻辑。
aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None, partitionFunc=<function portable_hash>)
作用:在kv对的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
参数:zeroValue:给每个分区中的每一个key一个初始值,只会在seqFunc中使用,若是aggregate算子,在combFunc中也会被使用;
seqOp:函数用于在每一个分区中按照key分别进行用初始值逐步迭代value的函数定义的操作
combOp:对经过seqFunc处理过的数据按照key分别进行函数定义,函数用于合并每个分区中的结果
foldByKey(zeroValue, func, numPartitions=None, partitionFunc=<function portable_hash>)
作用:aggregateByKey的简化操作,seqop和combop相同
- >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
- >>> from operator import add
- >>> sorted(rdd.foldByKey(0, add).collect())
- [('a', 2), ('b', 1)]
combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash>)
作用:对于相同的key值,把value合并成一个集合
参数:①createCombiner:创建初始规则,把元素V转换到另个一类型元素C,combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值。
②mergeValue:把元素V合并到元素C中,如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。
③mergeCombiners:将两个C元素合并,由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。
- >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
-
- >>> def to_list(a): # 把key的value转换为列表
- return [a]
-
- >>> def append(a, b): # 把相同key的value合并到列表中
- a.append(b)
- return a
-
- >>> def extend(a, b): # 把不同key进行合并
- a.extend(b)
- return a
-
- >>> sorted(x.combineByKey(to_list, append, extend).collect())
- [('a', [1, 2]), ('b', [1])]
sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>)
作用:在一个(K, V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K, V)的RDD。
参数:ascending=True,按照升序排列,False为降序排列。
- >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
- >>> sc.parallelize(tmp).sortByKey().first()
- ('1', 3)
- >>> sc.parallelize(tmp).sortByKey(True, 1).collect()
- [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
- >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
- [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
- >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
- >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
- >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
- [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
作用:针对(K, V)形式的类型,只对V进行操作
- >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
- >>> def f(x):
- return len(x)
- >>> x.mapValues(f).collect()
- [('a', 3), ('b', 1)]
join(other, numPartitions=None)
作用:在类型为(K, V)和(K, W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K, (V, W))的RDD
- >>> x = sc.parallelize([("a", 1), ("b", 4)])
- >>> y = sc.parallelize([("a", 2), ("a", 3)])
- >>> sorted(x.join(y).collect())
- [('a', (1, 2)), ('a', (1, 3))]
cogroup(other, numPartitions=None)
cogroup方法是一个比较高级的函数,能根据Key值聚集最多三个键值对的RDD,并把相同Key值对应的Value聚集起来。
- >>> x = sc.parallelize([("a", 1), ("b", 4)])
- >>> y = sc.parallelize([("a", 2)])
- >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
- [('a', ([1], [2])), ('b', ([4], []))]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。