赞
踩
分类 | 函数 |
---|---|
映射 | map()、mapPartitions()、flatMap() |
选择 | distinct()、sample() 、filter() |
集合 | cartesian()、union()、intersection() 、cogroup() |
连接 | join()、leftOuterJoin()、rightOuterJoin()、fullOuterJoin() |
ByKey | groupByKey() 、reduceByKey() 、aggregateByKey()、 combineByKey() |
重分区 | repartition()、coalesce() |
1、map(f, preservesPartitioning=False)
rdd = sc.parallelize(["b", "a", "c"])
rdd.map(lambda x: (x, 1)).collect()
# [('b', 1), ('a', 1), ('c', 1)]
2、mapPartitions(f, preservesPartitioning=False)
map的输入变换函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,另外注意的是一般要使用迭代器特性,否则很容易OOM。
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator): yield sum(iterator)
rdd.mapPartitions(f).collect()
# [3, 7]
data = [('James','Smith','M',3000), ('Anna','Rose','F',4100), ('Robert','Williams','M',6200)] columns = ["firstname","lastname","gender","salary"] df = spark.createDataFrame(data=data, schema = columns) # Example 1 mapPartitions() def reformat1(partitionData): for row in partitionData: yield [row.firstname+","+row.lastname,row.salary*10/100] df1 = df.rdd.mapPartitions(reformat1).toDF(["name","bonus"]) df1.show() # Example 2 mapPartitions() def reformat2(partitionData): updatedData = [] for row in partitionData: name = row.firstname+","+row.lastname bonus = row.salary*10/100 updatedData.append([name,bonus]) return iter(updatedData) df2 = df.rdd.mapPartitions(reformat2).toDF(["name","bonus"]) df2.show()
3、flatMap(f, preservesPartitioning=False)
flatMap与map区别在于map为“映射”,而flatMap“先映射,后扁平化”,map对每一次(func)都产生一个元素,返回一个对象,而flatMap多一步就是将所有对象合并为一个对象。
rdd = sc.parallelize([2, 3, 4])
rdd.map(lambda x: [(x, x), (x, x)]).collect()
# [[(2, 2), (2, 2)], [(3, 3), (3, 3)], [(4, 4), (4, 4)]]
rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()
# [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
4、distinct(numPartitions=None)
sc.parallelize([1, 1, 2, 3]).distinct().collect()
# [1, 2, 3]
5、sample(withReplacement, fraction, seed=None)
rdd = sc.parallelize(range(100), 4)
rdd.sample(False, 0.1, 615).collect()
# [14, 16, 24, 34, 45, 69, 76]
6、filter(f)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x % 2 == 0).collect()
# [2, 4]
7、cartesian(other)
rdd = sc.parallelize([1, 2])
rdd.cartesian(rdd).collect()
# [(1, 1), (1, 2), (2, 1), (2, 2)]
8、union(other)
求并集。注意,这里不会去重
rdd = sc.parallelize([1, 1, 2, 3])
rdd.union(rdd).collect()
# [1, 1, 2, 3, 1, 1, 2, 3]
9、intersection(other)
求交集。注意,这里会去重
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
rdd1.intersection(rdd2).collect()
# [1, 2, 3]
10、cogroup(other, numPartitions=None)
x = sc.parallelize([("a", 1), ("b", 4),("a",2)])
y = sc.parallelize([("a", 2)])
[(x, tuple(map(list, y))) for x, y in list(x.cogroup(y).collect())]
# [('b', ([4], [])), ('a', ([1, 2], [2]))]
11、join(other, numPartitions=None)
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
x.join(y).collect()
# [('a', (1, 2)), ('a', (1, 3))]
12、leftOuterJoin(other, numPartitions=None)
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
x.leftOuterJoin(y).collect()
# [('a', (1, 2)), ('b', (4, None))]
13、rightOuterJoin(other, numPartitions=None)
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
y.rightOuterJoin(x).collect()
# [('a', (2, 1)), ('b', (None, 4))]
14、fullOuterJoin(other, numPartitions=None)
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("c", 8)])
x.fullOuterJoin(y).collect()
# [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
15、groupByKey(numPartitions=None)
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.groupByKey().collect()
# [('a', ResultIterable), ('b', ResultIterable)]
rdd.groupByKey().mapValues(list).collect()
# [('a', [1, 1]), ('b', [1])]
rdd.groupByKey().mapValues(len).collect()
# [('a', 2), ('b', 1)]
16、reduceByKey(func, numPartitions=None)
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.reduceByKey(lambda a,b:a+b).collect()
# [('a', 2), ('b', 1)]
17、aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None)
rdd = sc.parallelize([(1,1),(1,2),(2,1),(2,3),(2,4),(1,7)], 2)
rdd.aggregateByKey(3, lambda a,b:max(a,b), lambda x,y:x + y).collect()
# [(2, 7), (1, 10)]
# [(2, 10), (1, 12)] if zeroValue=5
18、combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=f)
第一个函数createCombiner作用于每一个组的第一个元素上,将其变为初始值。
rdd = sc.parallelize([("A",1),("B",2),("B",3),("B",4),("B",5),("B",6),("C",7),("A",8)], 2)
rdd.combineByKey(lambda x:"%d_" %x, lambda a,b:"%s@%s" %(a,b), lambda a,b:"%s$%s" %(a,b)).collect()
# [('C', '7_'), ('A', '1_$8_'), ('B', '2_@3@4$5_@6')]
19、repartition(numPartitions)
20、coalesce(numPartitions, shuffle=False)
reduce() | |
take()、takeOrdered()、takeSample()、first() | |
collect()、foreach() | |
count() 、countByKey()、countByValue() |
1、reduce(f)
注意,后面没有跟collect方法
sc.parallelize([1, 2, 3, 4, 5]).reduce(lambda a,b:a+b)
# 15
2、take(num)
sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
# [91, 92, 93]
3、takeOrdered(num, key=None)
sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
# [10, 9, 7, 6, 5, 4]
4、takeSample(withReplacement, num, seed=None)
rdd = sc.parallelize(range(0, 10))
rdd.takeSample(False, 5, 2)
# [5, 9, 3, 4, 6]
5、first()
6、collect()
以list的形式返回RDD中的所有元素。
7、foreach(f)
8、count()
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.count()
# 3
9、countByKey()
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.countByKey()
# defaultdict(<class 'int'>, {'a': 2, 'b': 1})
10、countByValue()
rdd = sc.parallelize([1, 2, 1, 2, 2])
rdd.countByValue()
# defaultdict(<class 'int'>, {1: 2, 2: 3})
参考:
[1] Spark常用32个算子总结
[2] Spark with Python (PySpark) Tutorial For Beginners
[3] Spark算子之combineByKey详解
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。