当前位置:   article > 正文

pyspark的常用算子_pysqpark 算子

pysqpark 算子

本文通过pyspark(基于python)演示spark常用算子
官方文档上列举共有32种常见算子,包括Transformation的20种操作和Action的12种操作。

Transformation:

1.map

map的输入变换函数应用于RDD中所有元素

#演示rdd
rdd=sc.parallelize(range(0,10))
rdd.collect()
输出: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  • 1
  • 2
  • 3
  • 4
rdd.map(lambda x:x+1).collect()
输出:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  • 1
  • 2

2.filter

过滤操作,满足filter内function函数为true的RDD内所有元素组成一个新的数据集

rdd.filter(lambda x: x>5).collect()
输出: [6, 7, 8, 9]
  • 1
  • 2
rdd.filter(lambda x: x%2==0).collect()
输出: [0, 2, 4, 6, 8]
  • 1
  • 2

3.flatMap

map是对RDD中元素逐一进行函数操作映射为另外一个RDD,而flatMap操作是将函数应用于RDD之中的每一个元素,将返回的迭代器的所有内容构成新的RDD。而flatMap操作是将函数应用于RDD中每一个元素,将返回的迭代器的所有内容构成RDD。

flatMap与map区别在于map为“映射”,而flatMap“先映射,后扁平化”,map对每一次(func)都产生一个元素,返回一个对象,而flatMap多一步就是将所有对象合并为一个对象。
如果还没看懂,就看看下面的代码吧:

#3 flatMap
rdd1=sc.parallelize([[1,2,3],[7,8,9]])
rdd1.collect()
输出: [[1, 2, 3], [7, 8, 9]]
  • 1
  • 2
  • 3
  • 4
rdd1.map(lambda x:x).collect()
输出:[[1, 2, 3], [7, 8, 9]]
  • 1
  • 2
rdd1.flatMap(lambda x:x).collect()
输出: [1, 2, 3, 7, 8, 9]
  • 1
  • 2

4.mapPartitions

针对每一个分区进行map操作
但由于单独运行于RDD的每个分区上(block),所以在一个类型为T的RDD上运行时,(function)必须是Iterator => Iterator类型的方法

rdd3=sc.parallelize(range(0,10),3)
rdd3.glom().collect()
输出:[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
#glom()可以把每个分区数据列表作为元素,组成新列表
#getNumPartitions()可以查看rdd有几个分区
  • 1
  • 2
  • 3
  • 4
  • 5
rdd3.map(lambda x:x+1).collect()
输出 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  • 1
  • 2
def func(par):
    yield sum(par)
rdd3.mapPartitions(lambda x:func(x)).collect()
输出: [3, 12, 30]
  • 1
  • 2
  • 3
  • 4

这样map 和mapPartitions的区别就很明显了吧

5.mapPartitionsWithIndex

与mapPartitions类似,但需要提供一个表示分区索引值的整型值作为参数,因此function必须是(int, Iterator)=>Iterator类型的。

def func2(index,par):
    yield index,sum(par)
rdd3.mapPartitionsWithIndex(lambda index,x:func2(index,x)).collect()
输出:[(0, 3), (1, 12), (2, 30)]
  • 1
  • 2
  • 3
  • 4

mapPartitions与mapPartitionsWithIndex的区别也很明显了吧

6.sample(withReplacement, fraction, seed)

采样操作,用于从样本中取出部分数据。withReplacement是否放回,fraction采样比例,seed用于指定的随机数生成器的种子。(是否放回抽样分true和false,fraction取样比例为(0, 1]。seed种子为整型实数。)

rdd.sample(True,0.5,1).collect()
输出: [1, 2, 3, 4, 7, 9, 9, 9, 9]
  • 1
  • 2

其实我也搞不清有什么用…

7.union(otherRdd)

对于源数据集和其他数据集合并,不去重。

rdd4=sc.parallelize([0,1,2,20,21,22])
rdd.union(rdd4).collect()
#去重使用 distinct()
输出:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 20, 21, 22]
  • 1
  • 2
  • 3
  • 4

8.intersection(otherRDD)

对于源数据集和其他数据集求交集,并去重,且无序返回。

rdd.intersection(rdd4).collect()
输出: [0, 1, 2]
  • 1
  • 2

9.distinct([numTasks])

返回一个在源数据集去重之后的新数据集,即去重,并局部无序而整体有序返回。

rdd5=sc.parallelize([1,1,1,2,2,3,4])
rdd5.distinct().collect()
[2, 4, 1, 3]
  • 1
  • 2
  • 3

10.groupByKey()

在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
Iterator[V]可以在map中操作

rdd6 = sc.parallelize([("a", 1), ("b", 3), ("a", 2), ("b", 4)])
rdd6.groupByKey().map(lambda x:(x[0],sum(x[1]))).collect()
输出: [('b', 7), ('a', 3)]
  • 1
  • 2
  • 3

11.reduceByKey

将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置

rdd6.reduceByKey(lambda x,y:x+y).collect()
输出: [('b', 7), ('a', 3)]
  • 1
  • 2

12.aggregateByKey(zeroValue, seqOp, combOp)

类似reduceByKey,对pairRDD中想用的key值进行聚合操作,使用初始值(seqOp中使用,而combOpenCL中未使用)对应返回值为pairRDD,而区于aggregate(返回值为非RDD)

rdd = sc.parallelize([1, 2, 3, 4, 5,6],2)
rdd.glom().collect() #[[1, 2, 3], [4, 5, 6]]
rdd.aggregate(0, lambda x,y:y if y>x else x,lambda x,y:x+y)
输出:9
#分区数不同的时候结果也会不同
  • 1
  • 2
  • 3
  • 4
  • 5
  • 每个分区的每个值y与zorevalue x在第一个函数seqOp中操作,返回值做下一个zorevalue 与分区的下一个元素在seqOp操作,完成后在
  • combOp用原 zerovalue 与 seqOp最后的结果操作

13.sortByKey

rdd6.sortByKey(False).collect()
输出: [('b', 3), ('b', 4), ('a', 1), ('a', 2)]
  • 1
  • 2

14.join(otherRDD)

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
x.join(y).collect()
输出: [('a', (1, 2)), ('a', (1, 3))]
  • 1
  • 2
  • 3
  • 4

15.cogroup

合并两个RDD,生成一个新的RDD。实例中包含两个Iterable值,第一个表示RDD1中相同值,第二个表示RDD2中相同值(key值),这个操作需要通过partitioner进行重新分区,因此需要执行一次shuffle操作。(若两个RDD在此之前进行过shuffle,则不需要)

x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
输出:[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
  • 1
  • 2
  • 3

16.cartesian

求笛卡尔乘积。该操作不会执行shuffle操作

x = sc.parallelize([1,2,3])
y = sc.parallelize([4,5,6])
x.cartesian(y).collect()
[(1, 4), (1, 5), (1, 6), (2, 4), (3, 4), (2, 5), (2, 6), (3, 5), (3, 6)]
  • 1
  • 2
  • 3
  • 4

17.pipe(command,[envVars])

通过一个shell命令来对RDD各分区进行“管道化”。通过pipe变换将一些shell命令用于Spark中生成的新RDD

rdd=sc.parallelize(range(0,8),4)
rdd.glom().collect()#[[0, 1], [2, 3], [4, 5], [6, 7]]
rdd.pipe("head -n 1").collect()
输出: ['0', '2', '4', '6']
  • 1
  • 2
  • 3
  • 4

18.coalesce(numPartitions)

重新分区,减少RDD中分区的数量到numPartitions

rdd=sc.parallelize(range(0,8),4)
rdd.getNumPartitions()   #4
rdd.coalesce(2).getNumPartitions()  #2
  • 1
  • 2
  • 3

19.repartition(numPartitions)

repartition是coalesce接口中shuffle为true的简易实现,即Reshuffle RDD并随机分区,使各分区数据量尽可能平衡。若分区之后分区数远大于原分区数,则需要shuffle。

rdd=sc.parallelize(range(0,8),4)
rdd.repartition(2).getNumPartitions()
输出:  2
  • 1
  • 2
  • 3

20.repartitionAndSortWithinPartitions(partitioner)

该方法根据partitioner对RDD进行分区,并且在每个结果分区中按key进行排序。

Action:

1.reduce(function)

reduce将RDD中元素两两传递给输入函数,同时产生一个新值,新值与RDD中下一个元素再被传递给输入函数,直到最后只有一个值为止。

rdd=sc.parallelize([1,2,3,4])
rdd.reduce(lambda x,y:x+y)#即实现 1+2+3+4
输出: 10
  • 1
  • 2
  • 3

2.collect()

将一个RDD以一个Array数组形式返回其中的所有元素

rdd
输出 : ParallelCollectionRDD[322] at parallelize at PythonRDD.scala:195
rdd.collect()
输出  :[1,2,3,4]

  • 1
  • 2
  • 3
  • 4
  • 5

3.count()

返回数据集中元素个数,默认Long类型。

rdd=sc.parallelize([1,2,3,4])
rdd.count()
输出: 4

  • 1
  • 2
  • 3
  • 4

4.first()

返回数据集的第一个元素(类似于take(1))

rdd=sc.parallelize(["a","b","c"])
rdd.first()
输出:   "a"
  • 1
  • 2
  • 3

5.takeSample(withReplacement, num, [seed])

对于一个数据集进行随机抽样,返回一个包含num个随机抽样元素的数组,withReplacement表示是否有放回抽样,参数seed指定生成随机数的种子。

该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver端的内存中

6.take(n)

返回一个包含数据集前n个元素的数组(从0下标到n-1下标的元素),不排序。

rdd=sc.parallelize(["a","b","c"])
rdd.take(2)
输出:  ['a', 'b']
  • 1
  • 2
  • 3

7.takeOrdered(n,[ordering])

返回RDD中前n个元素,并按默认顺序排序(升序)或者按自定义比较器顺序排序。

rdd=sc.parallelize(["c","b","a","d"])
rdd.takeOrdered(2)#先排序,再取值
输出:  ['a', 'b']
  • 1
  • 2
  • 3

8.saveAsTextFile(path)

将dataSet中元素以文本文件的形式写入本地文件系统或者HDFS等。Spark将对每个元素调用toString方法,将数据元素转换为文本文件中的一行记录。

若将文件保存到本地文件系统,那么只会保存在executor所在机器的本地目录。

9.saveAsSequenceFile(path)(Java and Scala)

将dataSet中元素以Hadoop SequenceFile的形式写入本地文件系统或者HDFS等。(对pairRDD操作)

10.saveAsObjectFile(path)(Java and Scala)

将数据集中元素以ObjectFile形式写入本地文件系统或者HDFS等。

11.countByKey()

用于统计RDD[K,V]中每个K的数量,返回具有每个key的计数的(k,int)pairs的hashMap。

rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 2), ("b", 4)])
rdd.countByKey()
defaultdict(int, {'a': 2, 'b': 2})
  • 1
  • 2
  • 3

12.foreach(function)

对数据集中每一个元素运行函数function,。

其他常用的算子

1 sortBy

返回排序的数据,默认升序

rdd=sc.parallelize(["f","c","b","a","d"])
rdd.sortBy(lambda x:x).collect()
输出: ['a', 'b', 'c', 'd', 'f']
  • 1
  • 2
  • 3

2 top(n)

返回降序的数据前N个元素

rdd=sc.parallelize(["f","c","b","a","d"])
rdd.top(3)
输出: ['f', 'd', 'c']
  • 1
  • 2
  • 3

3 subtract(RDD)

y减去y与x的交集

x = sc.parallelize([1,2,3,4])
y = sc.parallelize([3,4,5,6])
y.subtract(x).collect()
输出: [5, 6]
  • 1
  • 2
  • 3
  • 4

4 groupBy

rdd= sc.parallelize([1, 1, 2, 3, 5, 8])
rdd.groupBy(lambda x:"偶数" if x%2==0 else "奇数").map(lambda x : (x[0],list(x[1])) ).collect()
输出: [('奇数', [1, 1, 3, 5]), ('偶数', [2, 8])]
  • 1
  • 2
  • 3

5 flatMapValues(f)

对键值对RDD中每个值应用返回一个迭代器的函数,然后对每个元素生成一个对应的键值对。

rdd = sc.parallelize([("fruites", ["apple", "banana", "lemon"]), ("vegetables", ["tomato","cabbage"])])
rdd.flatMapValues(lambda x:x).collect()
输出:
[('fruites', 'apple'),
 ('fruites', 'banana'),
 ('fruites', 'lemon'),
 ('vegetables', 'tomato'),
 ('vegetables', 'cabbage')]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

6 aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None)

使用给定的combine(组合)函数和一个中性的“零值”来聚合每个键的值

rdd = sc.parallelize([ ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)], 2)
rdd.aggregateByKey(0,lambda x,y:x+y,lambda x,y:x+y).collect()
输出: [('mouse', 6), ('cat', 19), ('dog', 12)]
  • 1
  • 2
  • 3

7 keys()

获取所有key

rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 2), ("b", 4)])
rdd.keys().collect()
输出: ['a', 'b', 'a', 'b']
  • 1
  • 2
  • 3

8 values()

获取所有value

rdd = sc.parallelize([("a", 2), ("b", 3), ("a", 2), ("b", 4)])
rdd.values().collect()
输出: [2, 3, 2, 4]
  • 1
  • 2
  • 3

9 countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。与count类似,但是是以key为单位进行统计。

r1 = sc.parallelize([("fruites", ["apple", "banana", "lemon"]), ("vegetables", ["tomato","cabbage"])]).flatMapValues(lambda x:x) r1.countByKey() 
输出: defaultdict(int, {'fruites': 3, 'vegetables': 2})
  • 1
  • 2

10 collectAsMap

与collect相关的算子是将结果返回到driver端。collectAsMap算子是将Kev-Value结构的RDD收集到driver端,并返回成一个字典

rdd = sc.parallelize([("a",1),("b",2),("b",1),("a", 2)])
rdd.collectAsMap()
输出: {'a': 2, 'b': 1}
  • 1
  • 2
  • 3
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/556902
推荐阅读
相关标签
  

闽ICP备14008679号