当前位置:   article > 正文

RDD算子操作(基本算子和常见算子)_rdd 操作

rdd 操作

目录

一、基本算子

        1.map算子

        2.flatMap算子

        3.filter算子

         4.foreach算子

        5.saveAsTextFile算子

        6.redueceByKey算子

二、常用Transformation算子       

         1.mapValues算子

        2.groupBy算子

        3.distinct算子

        4.union算子

        5.join算子

        6.intersection算子

        7.glom算子

        8.groupByKey算子

        9.sortBy算子

        10.sortByKey算子

三、常用Action算子

        1.countByKey算子

        2.collect算子

        3.reduce算子

        4.takeSample算子

        5.takeOrdered算子

四、分区操作算子

        1.mapPartitions算子

        2.foreachPartition算子

        3.partitionBy算子

        4.repartition算子和coalesce算子


一、基本算子

        RDD中map、filter、flatMap及foreach等函数为最基本算子,都是都RDD中每个元素进行操作,将元素传递到函数中进行转换。

        1.map算子

        map(f:T=>U): RDD[T]=>RDD[U],表示将RDD经由某一函数f后,转变为另一个RDD。

        功能:map算子,是将RDD的数据一条条处理(处理的逻辑 基于map算子中接受的处理函数),返回新的RDD。

  1. #cording:utf-8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == "__main__":
  4. # 构建SparkContext对象
  5. conf = SparkConf().setAppName('test').setMaster("local[*]")
  6. sc = SparkContext(conf=conf)
  7. rdd = sc.parallelize([1,2,3,4,5,6],3)
  8. # 定义方法,作为算子的传入函数体
  9. def add(data):
  10. return data * 10
  11. print(rdd.map(add).collect())
  12. # 更简单的方式 是定义lambda表达式来写匿名函数
  13. print(rdd.map(lambda data:data * 10).collect())
  14. '''
  15. 对于算子的接受函数来说,两种方法都可以
  16. lambda表达式 适用于 一行代码就搞定的函数体,如果是多行,需要定义独立的方法
  17. '''
        2.flatMap算子

        flatMap(f:T=>Seq[U]): RDD[T]=>RDD[U]),表示将RDD经由某一函数f后,转变为一个新的 RDD,但是与map 不同,RDD中的每一个元素会被映射成新的0到多个元素(f 函数返回的是一个序列Seq)。

        功能:对RDD执行map操作,然后进行解除嵌套操作

  1. #cording:utf-8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == "__main__":
  4. # 构建SparkContext对象
  5. conf = SparkConf().setAppName('test').setMaster("local[*]")
  6. sc = SparkContext(conf=conf)
  7. rdd = sc.parallelize(["hadoop hadoop spark","spark hadoop hadoop","hadoop flink spark"])
  8. #得到所有的单词,组成RDD
  9. rdd2 = rdd.map(lambda line: line.split(" "))
  10. rdd3 = rdd.flatMap(lambda line: line.split(" "))
  11. print(rdd2.collect())
  12. print(rdd3.collect())
        3.filter算子

        filter(f.T=>Bool): RDD[T]=>RDD[T],表示将 RDD经由某一函数f后,只保留f返回True的数据,组成新的RDD。

        功能:过滤想要的数据进行保留,返回值是True的数据保留,返回值是False的数据则会被丢弃。

  1. #corfding:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setAppName('test').setMaster('local[*]')
  5. sc = SparkContext(conf=conf)
  6. # 通过filter算子过滤奇数
  7. rdd = sc.parallelize((1,2,3,4,5,6,7,8,9,10))
  8. result_rdd = rdd.filter(lambda x: x % 2 == 1)
  9. print(result_rdd.collect())

         4.foreach算子

       foreach(func),将函数 func应用在数据集的每一个元素上,通常用于更新一个累加器,或者和外部存储系统进行交互,例如 Redis。

        功能:对RDD的每一个元素,执行你提供的逻辑的操作(和map一个意思),但是这个方法没有返回值。

        ps:该算子是分区(Executor)直接执行的,跳过Driver,由分区所在的Executor直接执行。

  1. #cording:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setMaster('local[*]').setAppName('test')
  5. sc = SparkContext(conf=conf)
  6. rdd = sc.parallelize([1, 5, 4, 2, 3, 6])
  7. print(rdd.foreach(lambda x: 10 * x))
  8. print('----------------------------------')
  9. print(rdd.foreach(lambda x: print(10 * x)))
        5.saveAsTextFile算子

        saveAsTextFile(path:String),数据集内部的元素会调用其 toString方法,转换为字符串形式,然后根据传入的路径保存成文本文件,既可以是本地文件系统,也可以是HDFS等。

        ps:该算子是分区(Executor)直接执行的,跳过Driver,由分区所在的Executor直接执行。

  1. #cording:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setMaster('local[*]').setAppName('test')
  5. sc = SparkContext(conf=conf)
  6. rdd = sc.parallelize([1, 5, 4, 2, 3, 6])
  7. rdd.saveAsTextFile('hdfs://pyspark01/output/out1')

        6.redueceByKey算子

        功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。

  1. #cording:utf-8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == "__main__":
  4. # 构建SparkContext对象
  5. conf = SparkConf().setAppName('test').setMaster("local[*]")
  6. sc = SparkContext(conf=conf)
  7. rdd = sc.parallelize([('a',1),('b',1),('a',1),('a',1),('b',1),('c',1),('a',1)])
  8. #使用reduceByKey函数进行聚合
  9. reduce_rdd = rdd.reduceByKey(lambda a,b : a + b).collect()
  10. print("聚合结果:",reduce_rdd)

二、常用Transformation算子       

         1.mapValues算子

        功能:针对二元元组RDD,对其内部的二元元组的Value执行map操作。

  1. #cording:utf-8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == "__main__":
  4. # 构建SparkContext对象
  5. conf = SparkConf().setAppName('test').setMaster("local[*]")
  6. sc = SparkContext(conf=conf)
  7. rdd = sc.parallelize([('a',2),('b',11),('a',1)])
  8. #使用map函数
  9. map_rdd = rdd.map(lambda x: (x[0],x[1]*10)).collect()
  10. print("结果:",map_rdd)
  11. # 使用mapValue函数
  12. value_rdd = rdd.mapValues(lambda value: value*10).collect()
  13. print("结果:",value_rdd)

        2.groupBy算子

        功能:将RDD数据进行分组。

  1. #cording:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setMaster('local[*]').setAppName('test')
  5. sc = SparkContext(conf=conf)
  6. # 创建数据
  7. test_rdd = sc.parallelize([('a',1),('b',1),('a',2),('b',2),('b',3)])
  8. # 通过groupBy函数对数据进行分组
  9. # groupBy函数传入函数的意思是:通过这个函数,来确定按照谁来分组(返回谁即可)
  10. # 分组规则和SQL一致:也就是相同的在同一个组(Hash分组)
  11. result_1 = test_rdd.groupBy(lambda t: t[0])
  12. result_2 = result_1.map(lambda t: (t[0],list(t[1])))
  13. print(result_1.collect())
  14. print(result_2.collect())

        3.distinct算子

        功能:对RDD数据进行去重复,返回新的RDD。

  1. #cording:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setAppName('test').setMaster('local[*]')
  5. sc = SparkContext(conf=conf)
  6. rdd_1 = sc.parallelize((1,2,1,2,3,4,5,6))
  7. rdd_2 = sc.parallelize([('a',1),('b',1),('a',1),('a',1),('b',1),('c',1),('a',1)])
  8. # 使用distinct算子进行去重
  9. print('数字:',rdd_1.distinct().collect())
  10. print('元组:',rdd_2.distinct().collect())

        4.union算子

        功能:将两个RDD合并成一个RDD返回。只合并不去重,RDD的类型不同也是可以合并的。

  1. #corfding:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setAppName('test').setMaster('local[*]')
  5. sc = SparkContext(conf=conf)
  6. # 通过union算子合并RDD
  7. rdd_1 = sc.parallelize((1,2,3,4,5))
  8. rdd_2 = sc.parallelize((6,7,8,9,10))
  9. print(rdd_1.union(rdd_2).collect())

        5.join算子

        功能:对两个RDD执行join操作(可实现SQL外/内连接),join算子只能用于二元元组。

  1. #corfding:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setAppName('test').setMaster('local[*]')
  5. sc = SparkContext(conf=conf)
  6. rdd1 = sc.parallelize([(1001,'zhangsan'),(1002,'lisi'),(1003,'wangwu'),(1004,'zhaoliu')])
  7. rdd2 = sc.parallelize([(1001,'销售部'),(1002,'科技部')])
  8. # 通过join算子来进行rdd之间的关联
  9. # 对于join算子来说,关联条件按照二元元组的key来进行关联
  10. print(rdd1.join(rdd2).collect())
  11. # 左外连接,右外连接可以更换一下rdd的顺序或者调用rightOuterJoin即可
  12. print(rdd1.leftOuterJoin(rdd2).collect())

        6.intersection算子

        功能:求两个RDD的交集,返回一个新的RDD。

  1. #corfding:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setAppName('test').setMaster('local[*]')
  5. sc = SparkContext(conf=conf)
  6. rdd1 = sc.parallelize([('a',1),('b',3)])
  7. rdd2 = sc.parallelize([('a',1),('c',1)])
  8. # 通过intersection算子求出RDD的交集 取出并返回新的RDD
  9. print(rdd1.intersection(rdd2).collect())

        7.glom算子

        功能:将RDD的数据,加上嵌套,这个嵌套按照分区来进行,比如RDD数据[1,2,3,4,5]有两个分区,那么glom后,数据变成:[[1,2,3],[4,5]]。

  1. #corfding:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setAppName('test').setMaster('local[*]')
  5. sc = SparkContext(conf=conf)
  6. rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
  7. print(rdd1.glom().collect())
  8. # 解嵌套操作
  9. print(rdd1.glom().flatMap(lambda x: x).collect())

        8.groupByKey算子

        功能:针对KV型RDD,自动按照key分组。

  1. #cording:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setMaster('local[*]').setAppName('test')
  5. sc = SparkContext(conf=conf)
  6. # 创建数据
  7. test_rdd = sc.parallelize([('a',1),('b',1),('a',2),('b',2),('b',3)])
  8. # 使用groupByKey算子
  9. result_1 = test_rdd.groupByKey()
  10. #查看结果
  11. result_2 = result_1.map(lambda t: (t[0],list(t[1])))
  12. print(result_1.collect())
  13. print(result_2.collect())

        9.sortBy算子

        功能:对RDD数据进行排序,基于你指定的排序依据。

  1. #cording:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setMaster('local[*]').setAppName('test')
  5. sc = SparkContext(conf=conf)
  6. rdd = sc.parallelize([('c',3),('f',1),('b',11),('c',3),('e',1),('n',9),('a',1)],3)
  7. # 使用sortBy对RDD执行排序
  8. # 按照value 数字进行排序
  9. # 参数1函数:表示的是,告知spark,按照数据的哪个列进行排序
  10. # 参数2:True表示升序 False表示降序
  11. # 参数3:排序的分区数
  12. '''注意:如果要全局有序,排序分区数设置为1'''
  13. print('按照value排序:',rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=3).collect())
  14. # 按照key进行排序
  15. print('按照key排序:',rdd.sortBy(lambda x: x[0], ascending=True, numPartitions=3).collect())
        10.sortByKey算子

        功能:针对KV型RDD,按照Key进行排序

  1. #cording:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setMaster('local[*]').setAppName('test')
  5. sc = SparkContext(conf=conf)
  6. rdd = sc.parallelize([('a',1),('E',1),('C',1),('D',1),('b',1),('g',1),('h',1),
  7. ( "y" ,1),('u',1),('i',1),('o',1),('p',1),
  8. ( 'm',1),('n',1),('L',1),('k',1),('f',1)],3)
  9. # 根据字母的小写排序
  10. print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: key.lower()).collect())

三、常用Action算子

        1.countByKey算子

        功能:统计key出现的次数(一般适用于KV型RDD)

  1. import json
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setMaster('local[*]').setAppName('test')
  5. sc = SparkContext(conf=conf)
  6. rdd = sc.textFile('../input/words.txt')
  7. rdd2 = rdd.flatMap(lambda x: x.split(' ')).map(lambda x: (x,1))
  8. # 通过countByKey来对key进行计数,这是一个Action算子
  9. result = rdd2.countByKey()
  10. print(result)
  11. print(type(result))

        2.collect算子

        功能:将RDD各个分区的数据,统一收集到Driver中,形成一个list对象。这个算子,是将RDD各个分区数据都拉取到Driver,注意的是,RDD是分布式对象,其数据量可以很大,所以用这个算子之前,要心知肚明的了解结果数据集不会太大,不然,会把Driver内存撑爆。

        3.reduce算子

        功能:对RDD数据集按照你传入的逻辑进行聚合。

  1. import json
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setMaster('local[*]').setAppName('test')
  5. sc = SparkContext(conf=conf)
  6. rdd = sc.parallelize([1,2,3,4,5,6])
  7. print(rdd.reduce(lambda a,b: a+b))

        4.takeSample算子

        功能:随机抽样RDD数据,随机数种子数字可以随便传,如果传同一个数字,那么取出的结果是一致的。一般参数三不传,spark会自动给与一个随机的种子。

  1. from pyspark import SparkConf,SparkContext
  2. if __name__ == '__main__':
  3. conf = SparkConf().setMaster('local[*]').setAppName('test')
  4. sc = SparkContext(conf=conf)
  5. rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10,1,2])
  6. print('True:',rdd.takeSample(True,22))
  7. print('False:',rdd.takeSample(False,22))
  8. print('无随机种子1:',rdd.takeSample(True,5))
  9. print('无随机种子2:', rdd.takeSample(True, 5))
  10. print('有随机种子1:',rdd.takeSample(True,5,1))
  11. print('有随机种子2:', rdd.takeSample(True, 5, 1))

        5.takeOrdered算子

        功能:对RDD进行排序取前N个。

  1. #cording:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setMaster('local[*]').setAppName('test')
  5. sc = SparkContext(conf=conf)
  6. rdd = sc.parallelize([1,5,4,2,3,6])
  7. print('普通:',rdd.takeOrdered(3))
  8. # 函数操作只会对结果产生影响,不会影响数据本身
  9. print("传入函数:",rdd.takeOrdered(3, lambda x: -x))

四、分区操作算子

        1.mapPartitions算子

        功能:与map功能相似,但区别是,mapPartition一次被传递的是一整个分区的数据,是作为一个迭代器(一次性list)对象传入过来,而map是一个一个数据的传递。

  1. #cording:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setMaster('local[*]').setAppName('test')
  5. sc = SparkContext(conf=conf)
  6. rdd = sc.parallelize([1, 5, 4, 2, 3, 6],3)
  7. def process(iter):
  8. result = list()
  9. for it in iter:
  10. result.append(it * 10)
  11. return result
  12. # mapPartitions算子相比于map算子,节省了大量打IO操作,每一个分区只需要进行一次IO操作即可
  13. print('输出结果:',rdd.mapPartitions(process).collect())

        2.foreachPartition算子

        功能:和普通的foreach一致,一次处理的是一整个分区的数据。

  1. #cording:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setMaster('local[*]').setAppName('test')
  5. sc = SparkContext(conf=conf)
  6. rdd = sc.parallelize([1, 5, 4, 2, 3, 6],3)
  7. def process(iter):
  8. result = list()
  9. for it in iter:
  10. result.append(it * 10)
  11. print(result)
  12. rdd.foreachPartition(process)

        3.partitionBy算子

        功能:对RDD进行自定义分区操作。

  1. #cording:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setMaster('local[*]').setAppName('test')
  5. sc = SparkContext(conf=conf)
  6. rdd = sc.parallelize([('hadoop',1),('hadoop',1),('hello',1),('spark',1),('flink',1),('spark',1)])
  7. # 使用partitionBy自定义分区
  8. def process(x):
  9. if 'hadoop' == x or 'hello' == x:return 0
  10. if 'spark' == x:return 1
  11. return 2
  12. # 使用glom算子将每个分区的数据进行嵌套
  13. print('显示分区:',rdd.partitionBy(3, process).glom().collect())

        4.repartition算子和coalesce算子

        功能:对RDD的分区执行重新分区(仅数量)

        ps:对分区的数量进行操作,一定要慎重,一般情况下,我们写Spark代码除了要求全局排序设置为1个分区外多数时候,所有API中关于分区相关的代码我们都不太理会。因为,如果你改分区了,会影响并行计算(内存迭代的并行管道数量)后面学分区如果增加,极大可能导致shuffle。

  1. #cording:utf8
  2. from pyspark import SparkConf,SparkContext
  3. if __name__ == '__main__':
  4. conf = SparkConf().setMaster('local[*]').setAppName('test')
  5. sc = SparkContext(conf=conf)
  6. rdd = sc.parallelize([1, 5, 4, 2, 3, 6],3)
  7. # repartition 修改分区
  8. # 减少分区
  9. print("减少分区为1:",rdd.repartition(1).getNumPartitions())
  10. # 增加分区
  11. print("增加分区为5:", rdd.repartition(5).getNumPartitions())
  12. # coalesce 修改分区
  13. # 减少分区
  14. print("减少分区为1:",rdd.coalesce(1).getNumPartitions())
  15. # 增加分区 ps:coalesce增加分区数量需要指定参数shuffle为True才能1成功修改
  16. print("减少分区为5:", rdd.coalesce(5).getNumPartitions())
  17. print("减少分区为5:",rdd.coalesce(5, shuffle=True).getNumPartitions())

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/466777
推荐阅读
相关标签
  

闽ICP备14008679号