当前位置:   article > 正文

Pyspark: RDD及其常用算子_pyspark dstream 计算两个rdd

pyspark dstream 计算两个rdd

本文为Pyspark代码

Spark版本:Spark-3.2.1

1. RDD的定义

Spark提供了一种对数据的核心抽象,称为弹性分布式数据集(Resilient Distributed Dataset, RDD)。这个数据集的全部或部分可以缓存在内存中,并且可以在多次计算时重用。RDD其实就是一个分布在多个节点上的数据集合(一个数据集存储在不同的节点上,每个节点存储数据集的一部分)。RDD的主要特征如下: 

  • RDD是不可变的,但可以将RDD转换成新的RDD进行操作;
  • RDD是可分区的。RDD由很多分区组成,每个分区对应一个Task任务来执行。对RDD进行操作,相当于对RDD的每个分区分别进行操作;
  • RDD拥有一系列对分区进行计算的函数,称为算子;
  • RDD之间存在依赖关系,可以实现管道化,避免中间数据的存储;

2. 创建RDD

2.1 Spyder连接PySpark

使用Spyder连接PySpark的代码如下:

  1. from pyspark import SparkContext, SparkConf
  2. import os
  3. os.environ['SPARK_HOME'] ='/Users/sherry/documents/spark/spark-3.2.1-bin-hadoop3.2'
  4. spark_conf = SparkConf().setAppName('Python_Spark_WordCount')\
  5. .setMaster('local[2]')
  6. sc = SparkContext(conf=spark_conf)
  7. sc.setLogLevel('WARN')

这里要注意定义sc时spark_conf要明确赋值给conf变量,否则会报如下错误:py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.

2.2 创建RDD

RDD的数据源可以是程序中的对象集合,也可以是外部存储系统中的数据集。例如共享文件系统、HDFS、HBase等。常用的创建RDD的方法主要有以下:

  • parallelize()方法可以将一个对象集合转化为RDD
  • textFile()方法可以读取本地文件系统或外部其他系统中的数据

其用法如下:

  1. #1.创建RDD
  2. rdd_1=sc.parallelize([1,2,3,4,5,6])
  3. rdd_2=sc.textFile(r'test_spark.txt')

另外,在Spark的过往版本中还可以使用makeRDD()方法将对象转化为RDD,但在Spark3.2版本中并没有看到该方法(或者该方法换了位置?)。

在使用parallelize()方法和textFile()方法将数据转化成RDD时,会自动对数据进行分区。parallelize()方法中的numSlices参数和textFile()方法中的minPartitions参数都可以用来指定分区数。若未设置这两个参数,则默认分区数为sc.defaultParallelism(Spark Standalone模式下该值为2)。具体可以使用如下方法查看RDD分区数及各个分区的数据:

  1. a=range(1,11)
  2. rdd_1=sc.parallelize(a)
  3. print('rdd_1分区数量:',rdd_1.getNumPartitions())
  4. print('rdd_1分区结果:',rdd_1.glom().collect())
  5. rdd_2=sc.parallelize(a,numSlices=3)
  6. print('rdd_2分区数量:',rdd_2.getNumPartitions())
  7. print('rdd_2分区结果:',rdd_2.glom().collect())

其执行结果如下:

rdd_1分区数量: 2
rdd_1分区结果: [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
rdd_2分区数量: 3
rdd_2分区结果: [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]

3.常用算子

Spark中提供了丰富的操作RDD的方法,这些方法被称为算子。目前RDD支持两种算子:转化(Transformation)算子和行动(Action)算子。这两种算子的区别如下:

  • 转化算子:负责对RDD中的数据进行计算并转化为新的RDD。但所有的转化算子都是惰性的,即这些算子不会立即计算结果,而只是记住对某个RDD的具体操作过程,直到遇到行动算子才会与行动算子一起执行。
  • 行动算子:行动算子会触发Spark的任务调度。

3.1 行动算子

  • reduce()算子、fold()算子和aggregate()算子

reduce(func)算子可以对RDD中的元素进行聚合运算。其执行过程如下:先对每个分区的数据aList执行reduce(func,aList),将各个分区获得的中间结果组合形成bList,再次执行reduce(func,bList),其结果即为最终的reduce(func)算子结果。假如aList=[9,10,11,12],func=lambda x,y:x+y,那么reduce(func,aList)的执行过程如下:现将aList的第1个元素值9和第2个元素值10传给func,将其计算的中间结果与第3个元素值11再次传给func,直至得到最后一个结果。举例如下:

  1. a=range(1,11)
  2. #只有1个分区时的结果
  3. rdd_1=sc.parallelize(a,numSlices=1)
  4. red_1=rdd_1.reduce(lambda x,y:x+y)
  5. red_2=rdd_1.reduce(lambda x,y:x-y)
  6. print("1个分区时的结果:{} {}".format(red_1,red_2))
  7. #2个分区时的结果
  8. rdd_2=sc.parallelize(a)
  9. red_1=rdd_2.reduce(lambda x,y:x+y)
  10. red_2=rdd_2.reduce(lambda x,y:x-y)
  11. print("2个分区时的结果:{} {}".format(red_1,red_2))
  12. #3个分区时的结果
  13. rdd_3=sc.parallelize(a,numSlices=3)
  14. red_1=rdd_3.reduce(lambda x,y:x+y)
  15. red_2=rdd_3.reduce(lambda x,y:x-y)
  16. print("3个分区时的结果:{} {}".format(red_1,red_2))

其结果如下,当func=lambda x,y:x-y时,分区数不同时其结果也不同。

1个分区时的结果:55 -53
2个分区时的结果:55 15
3个分区时的结果:55 23

fold(zeroValue,op)算子的执行过程与reduce(func)算子基本相同,区别在于fold()算子给每一次的reduce()计算都提供了一个初始值zeroValue。fold()源码如下:

  1. def fold(self, zeroValue, op):
  2. op = fail_on_stopiteration(op)
  3. def func(iterator):
  4. acc = zeroValue
  5. for obj in iterator:
  6. acc = op(acc, obj)
  7. yield acc
  8. vals = self.mapPartitions(func).collect()
  9. return reduce(op, vals, zeroValue)

fold()用法举例如下:

  1. a=range(1,11)
  2. rdd_1=sc.parallelize(a)
  3. fold_1=rdd_1.fold(2,lambda x,y:x+y)
  4. fold_2=rdd_1.fold(2,lambda x,y:x-y)
  5. print(fold_1,fold_2)

其结果fold_1=61,fold_2=53。这里只说一下fold_2的计算过程:第1个分区的计算结果=2(即为zeroValue)-1-2-3-4-5=-13;第2个分区的计算结果=2-6-7-8-9-10=-38;最后的结果=2-(-13)-(-38)=53

aggregate(zeroValue, seqOp, combOp)其计算过程如下:先对每个分区执行seqOp函数,然后再对所有分区的结果执行combOp函数。zeroValue仍然是每次计算的初始值。其用法举例如下:

  1. rdd_1=sc.parallelize(range(1,11))
  2. agg_1=rdd_1.aggregate(2,seqOp=lambda x,y:x+y,combOp=lambda x,y:x+y)
  3. agg_2=rdd_2.aggregate((0,0),seqOp=lambda x,y:(x[0]+y,x[1]+1),
  4. combOp=lambda x,y:(x[0]+y[0],x[1]+y[1]))
  5. print(agg_1,agg_2)

 其计算结果如下(agg_2可以同时统计rdd_1中元素的值的总和及元素个数)

61  (55, 10)

  • count()算子

count()算子可以统计RDD集合中元素的数量。具体代码如下:

  1. rdd_1=sc.parallelize(range(1,11))
  2. print(rdd_1.count())#输出10
  3. rdd_2=sc.parallelize([[1,2],[3,4]])
  4. print(rdd_2.count())#输出2

除了count()算子之外,spark还为数值型的RDD提供了如下描述性统计算子。

方法含义
mean()元素的平均值
sum()总和
max()最大值
min()最小值
variance()方差
sampleVariance()采样方差
stdev()标准差
sampleStdev()采样的标准差
  • collect()算子

collect()算子会把所有元素返回给驱动程序,然后由驱动程序序列化成一个list。collect()要求所有数据必须同一同放入单台机器的内存中,所以一般只会在单元测试中使用。

  1. rdd_1=sc.parallelize(range(1,11))
  2. print("rdd_1中的元素:",rdd_1.collect())

其执行结果如下:

rdd_1中的元素: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

  • take()算子、first()算子和top()算子

take(n)算子返回包含数据集前n个元素组成的数组。first()算子是take()算子的特例,其执行结果与take(1)相同。take()算子用法举例如下:

  1. rdd_1=sc.parallelize(range(1,11),numSlices=5)
  2. print(rdd_1.glom().collect())
  3. print(rdd_1.take(5))

其执行结果如下:

[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]]
[1, 2, 3, 4, 5]

top(num,key)算子的用法与Python中的sorted()用法相同,但top()算子只支持逆序排序。其具体用法举例如下:

  1. rdd_1=sc.parallelize([5,3,7,19,6,2])
  2. print(rdd_1.top(2))
  3. rdd_2=sc.parallelize([('a',19),('z',4),('c',7),('f',24)])
  4. print(rdd_2.top(3))
  5. print(rdd_2.top(3,key=lambda x:x[1]))

其执行结果如下:

[19, 7]
[('z', 4), ('f', 24), ('c', 7)]
[('f', 24), ('a', 19), ('c', 7)]

  • forecah()算子

foreach(func)算子可以对RDD中的每一个元素运行指定的函数func。但是这个算子不会返回任何结果给驱动器程序。通常情况下,使用该算子可以将RDD中的数据以JSON格式把数据发送到一个网络服务器上,或者把数据存到数据库中。

  • takeSample()算子

takeSample(withReplacement,num,seed)算子可以对RDD中的数据进行采样。withReplacement参数控制是否需要重复采用。num控制采样数量。其用法如下:

  1. rdd_1=sc.parallelize(range(1,20),numSlices=3)
  2. result=rdd_1.takeSample(withReplacement=True,num=10)
  3. print("可放回采样时的结果:",result)
  4. result=rdd_1.takeSample(withReplacement=False, num=10)
  5. print("不可放回采样时的结果:",result)

其结果如下:

可放回采样时的结果: [19, 15, 14, 9, 9, 19, 16, 6, 7, 16]
不可放回采样时的结果: [4, 1, 12, 5, 19, 8, 16, 11, 9, 10]

3.2 转化算子

因为转化算子都是惰性的,所以下文中通过转化算子形成的新的RDD的结果主要是通过collect()算子和glom().collect()算子来展示。后者主要是为了展示新旧RDD之间分区的变化情况。

  • map()算子和flatMap()算子

map()算子接收一个函数做为参数,并把这个函数应用于RDD中的每个元素,最后将函数的返回结果作为结果RDD中对应元素的值。flatMap()算子与map()算子类似,但是每个传入给函数func的RDD元素会返回0到多个元素,最终会将返回的所有元素合并到一个RDD中。其具体用法如下:

  1. rdd_1=sc.parallelize(['hello hadoop world','spark helllo','hello world'])
  2. map_1=rdd_1.map(lambda x:x.split())
  3. map_2=rdd_1.flatMap(lambda x:x.split())
  4. print(rdd_1.collect())
  5. print(map_1.collect())
  6. print(map_2.collect())
  7. print(rdd_1.glom().collect())
  8. print(map_1.glom().collect())
  9. print(map_2.glom().collect())

其计算结果如下:

['hello hadoop world', 'spark helllo', 'hello world']
[['hello', 'hadoop', 'world'], ['spark', 'helllo'], ['hello', 'world']]
['hello', 'hadoop', 'world', 'spark', 'helllo', 'hello', 'world']
[['hello hadoop world'], ['spark helllo', 'hello world']]
[[['hello', 'hadoop', 'world']], [['spark', 'helllo'], ['hello', 'world']]]
[['hello', 'hadoop', 'world'], ['spark', 'helllo', 'hello', 'world']]

  • filter()算子

filter()算子通过函数func对源RDD中的元素进行过滤,并返回一个新的RDD。其用法如下:

  1. rdd_1=sc.parallelize(range(1,11))
  2. rdd_2=rdd_1.filter(lambda x:x>=5)
  3. print(rdd_1.glom().collect())
  4. print(rdd_2.glom().collect())

其计算结果如下:

[[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
[[5], [6, 7, 8, 9, 10]]

  • distinct()算子

disinct()算子对RDD中的数据进行去重操作并返回一个新的RDD。当需要验证数据集时非常有用,但开销较大,因为该方法需要将所有数据通过网络进行混洗。其用法如下:

  1. rdd_1=sc.parallelize(([2,3,4,4,5,6]))
  2. rdd_2=rdd_1.distinct()
  3. print(rdd_1.glom().collect())
  4. print(rdd_2.glom().collect())

其结果如下。distinct()算子不仅对数据进行了去重,而且数据的分区也发生了变化。

[[2, 3, 4], [4, 5, 6]]
[[2, 4, 6], [3, 5]]

  • union()算子、intersection()算子、subtract()算子和cartesian()算子

这四个算子是集合类操作。union()算子可以将两个RDD合并成一个新的RDD,该算子主要用于对不同的数据来源进行合并(只做简单的合并不去重)。但合并的两个RDD的数据类型必须一致(因为Python是动态语言,对这一点的要求没有scala这么高)。其用法举例如下:

  1. rdd_1=sc.parallelize([[1,2],[3,4],2])
  2. rdd_2=sc.parallelize([1,2,4])
  3. rdd_3=rdd_1.union(rdd_2)
  4. print(rdd_3.collect())
  5. print(rdd_3.glom().collect())

其结果如下(union操作没有破坏源RDD的数据分区):

[[1, 2], [3, 4], 2, 1, 2, 4]
[[[1, 2]], [[3, 4], 2], [1], [2, 4]]

intersection()算子对两个RDD进行交集操作,即将两个RDD中共有的元素去重之后形成新的RDD。但是RDD中的数据的类型必须是可Hash的,即整型、浮点型、字符串、tuple和不可变集合,否则程序报错(遇到行动算子时才会显示出报错信息)。其用法如下:

  1. rdd_1=sc.parallelize([1,2,4,2,(1,2),'hello',frozenset([1,3])])
  2. rdd_2=sc.parallelize([1.0,3.6,2,(1,2),'hello',frozenset([1,3])])
  3. rdd_3=rdd_1.intersection(rdd_2)
  4. print(rdd_3.collect())

其结果如下:

[1, (1, 2), 'hello', 2, frozenset({1, 3})]

cartesian()算子对两个RDD进行笛卡尔积操作,该算子返回的结果为(a,b)对,其中a来自第一个RDD中,而b来自第二个RDD中,结果不去重。该操作的开销较大。其用法具体如下:

  1. rdd_1=sc.parallelize([1,2,4])
  2. rdd_2=sc.parallelize([3,3])
  3. rdd_3=rdd_1.cartesian(rdd_2)
  4. print(rdd_3.collect())

其结果如下:

[(1, 3), (1, 3), (2, 3), (4, 3), (2, 3), (4, 3)]

subtract()算子对两个RDD进行求差集操作,该算子将第1个RDD中有的元素而第2个RDD中没有的元素组成一个新的RDD。其用法如下:

  1. rdd_1=sc.parallelize([1,2,4,2,(1,2)])
  2. rdd_2=sc.parallelize([1.0,3.6,(1,2)])
  3. rdd_3=rdd_1.subtract(rdd_2)
  4. print(rdd_3.collect())

其结果如下:

[4, 2, 2]

  • repartition()算子

该算子会对RDD重新进行分组分区。该操作会对数据重新混洗,所以开销比较大。另外,该操作可能会产生空白分区。其具体用法如下:

  1. rdd_1=sc.parallelize(range(1,20),numSlices=3)
  2. print(rdd_1.glom().collect())
  3. rdd_2=rdd_1.repartition(4)
  4. print(rdd_2.glom().collect())
  5. rdd_3=rdd_1.repartition(2)
  6. print(rdd_3.glom().collect())

其结果如下:

[[1, 2, 3, 4, 5, 6], [7, 8, 9, 10, 11, 12], [13, 14, 15, 16, 17, 18, 19]]
[[13, 14, 15, 16, 17, 18, 19], [7, 8, 9, 10, 11, 12], [], [1, 2, 3, 4, 5, 6]]
[[1, 2, 3, 4, 5, 6, 13, 14, 15, 16, 17, 18, 19], [7, 8, 9, 10, 11, 12]]

  • sample()算子

sample()算子与takeSample()算子效果相同,但sample()算子为转化算子,且使用参数fraction控制采样样本数。其用法如下:

  1. rdd_1=sc.parallelize(range(1,20),numSlices=3)
  2. rdd_2=rdd_1.sample(withReplacement=True,fraction=0.3)
  3. print("可放回采样时的结果:",rdd_2.collect())
  4. rdd_3=rdd_1.sample(withReplacement=False, fraction=0.3)
  5. print("不可放回采样时的结果:",rdd_3.collect())

其结果如下:

可放回采样时的结果: [8, 12, 12, 12, 14, 15, 19]
不可放回采样时的结果: [1, 3, 4, 8, 12, 19]

  • sortBy()算子

sortBy()算子可以将RDD中的元素按照某个规则进行排序,第一个参数keyfunc为排序函数,第二个参数指定升序(默认)或降序。其用法如下:

  1. rdd_1=sc.parallelize([[1,2],[4,2],[3,9]])
  2. rdd_2=rdd_1.sortBy(keyfunc=lambda x:x)
  3. print(rdd_2.collect())
  4. rdd_3=rdd_1.sortBy(keyfunc=lambda x:x[1],ascending=False)
  5. print(rdd_3.collect())

其结果如下:

[[1, 2], [3, 9], [4, 2]]
[[3, 9], [1, 2], [4, 2]]

  • groupBy()算子

groupBy()算子可以对RDD中的数据重新进行分组。其具体用法如下:

  1. rdd_1=sc.parallelize(range(1,20))
  2. rdd_2=rdd_1.groupBy(lambda x:x//5)
  3. res_1=[[item[0],list(item[1])]for item in rdd_2.collect()]
  4. print(res_1)

其结果如下:

[[0, [1, 2, 3, 4]], [2, [10, 11, 12, 13, 14]], [1, [5, 6, 7, 8, 9]], [3, [15, 16, 17, 18, 19]]]

  • mapPartitions()算子

mapPartitions()算子与其他算子的不同之处在于,其他算子是作用在RDD的每一个分区的每一个元素上的,而mapPartitions()算子则是作用在RDD的每个分区上的,并将各个分区的计算结果组合成一个新的RDD。其具体用法如下:

  1. rdd_1=sc.parallelize(range(1,10))
  2. def f(x): yield sum(x)
  3. rdd_2=rdd_1.mapPartitions(f)
  4. print(rdd_2.collect())

其结果如下:

[10, 35]

这里要注意的是传入mapPartitions()函数f的写法,这里要使用yield,这样可以将这个函数看作是一个生成器。

参考文献

  1. 《Spark大数据分析实战》
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/670645
推荐阅读
相关标签
  

闽ICP备14008679号