赞
踩
目录
一、基本算子
RDD中map、filter、flatMap及foreach等函数为最基本算子,都是都RDD中每个元素进行操作,将元素传递到函数中进行转换。
map(f:T=>U): RDD[T]=>RDD[U],表示将RDD经由某一函数f后,转变为另一个RDD。
功能:map算子,是将RDD的数据一条条处理(处理的逻辑 基于map算子中接受的处理函数),返回新的RDD。
- #cording:utf-8
- from pyspark import SparkConf,SparkContext
-
- if __name__ == "__main__":
- # 构建SparkContext对象
- conf = SparkConf().setAppName('test').setMaster("local[*]")
- sc = SparkContext(conf=conf)
-
- rdd = sc.parallelize([1,2,3,4,5,6],3)
-
- # 定义方法,作为算子的传入函数体
- def add(data):
- return data * 10
-
- print(rdd.map(add).collect())
-
- # 更简单的方式 是定义lambda表达式来写匿名函数
- print(rdd.map(lambda data:data * 10).collect())
-
- '''
- 对于算子的接受函数来说,两种方法都可以
- lambda表达式 适用于 一行代码就搞定的函数体,如果是多行,需要定义独立的方法
- '''
flatMap(f:T=>Seq[U]): RDD[T]=>RDD[U]),表示将RDD经由某一函数f后,转变为一个新的 RDD,但是与map 不同,RDD中的每一个元素会被映射成新的0到多个元素(f 函数返回的是一个序列Seq)。
功能:对RDD执行map操作,然后进行解除嵌套操作。
- #cording:utf-8
- from pyspark import SparkConf,SparkContext
-
- if __name__ == "__main__":
- # 构建SparkContext对象
- conf = SparkConf().setAppName('test').setMaster("local[*]")
- sc = SparkContext(conf=conf)
-
- rdd = sc.parallelize(["hadoop hadoop spark","spark hadoop hadoop","hadoop flink spark"])
-
- #得到所有的单词,组成RDD
- rdd2 = rdd.map(lambda line: line.split(" "))
- rdd3 = rdd.flatMap(lambda line: line.split(" "))
- print(rdd2.collect())
- print(rdd3.collect())
filter(f.T=>Bool): RDD[T]=>RDD[T],表示将 RDD经由某一函数f后,只保留f返回True的数据,组成新的RDD。
功能:过滤想要的数据进行保留,返回值是True的数据保留,返回值是False的数据则会被丢弃。
- #corfding:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setAppName('test').setMaster('local[*]')
- sc = SparkContext(conf=conf)
- # 通过filter算子过滤奇数
- rdd = sc.parallelize((1,2,3,4,5,6,7,8,9,10))
- result_rdd = rdd.filter(lambda x: x % 2 == 1)
- print(result_rdd.collect())
foreach(func),将函数 func应用在数据集的每一个元素上,通常用于更新一个累加器,或者和外部存储系统进行交互,例如 Redis。
功能:对RDD的每一个元素,执行你提供的逻辑的操作(和map一个意思),但是这个方法没有返回值。
ps:该算子是分区(Executor)直接执行的,跳过Driver,由分区所在的Executor直接执行。
- #cording:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
- rdd = sc.parallelize([1, 5, 4, 2, 3, 6])
- print(rdd.foreach(lambda x: 10 * x))
- print('----------------------------------')
- print(rdd.foreach(lambda x: print(10 * x)))
saveAsTextFile(path:String),数据集内部的元素会调用其 toString方法,转换为字符串形式,然后根据传入的路径保存成文本文件,既可以是本地文件系统,也可以是HDFS等。
ps:该算子是分区(Executor)直接执行的,跳过Driver,由分区所在的Executor直接执行。
- #cording:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
- rdd = sc.parallelize([1, 5, 4, 2, 3, 6])
-
- rdd.saveAsTextFile('hdfs://pyspark01/output/out1')
功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。
- #cording:utf-8
- from pyspark import SparkConf,SparkContext
-
- if __name__ == "__main__":
- # 构建SparkContext对象
- conf = SparkConf().setAppName('test').setMaster("local[*]")
- sc = SparkContext(conf=conf)
- rdd = sc.parallelize([('a',1),('b',1),('a',1),('a',1),('b',1),('c',1),('a',1)])
- #使用reduceByKey函数进行聚合
- reduce_rdd = rdd.reduceByKey(lambda a,b : a + b).collect()
- print("聚合结果:",reduce_rdd)
功能:针对二元元组RDD,对其内部的二元元组的Value执行map操作。
- #cording:utf-8
- from pyspark import SparkConf,SparkContext
-
- if __name__ == "__main__":
- # 构建SparkContext对象
- conf = SparkConf().setAppName('test').setMaster("local[*]")
- sc = SparkContext(conf=conf)
- rdd = sc.parallelize([('a',2),('b',11),('a',1)])
- #使用map函数
- map_rdd = rdd.map(lambda x: (x[0],x[1]*10)).collect()
- print("结果:",map_rdd)
- # 使用mapValue函数
- value_rdd = rdd.mapValues(lambda value: value*10).collect()
- print("结果:",value_rdd)
功能:将RDD数据进行分组。
- #cording:utf8
-
- from pyspark import SparkConf,SparkContext
-
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
- # 创建数据
- test_rdd = sc.parallelize([('a',1),('b',1),('a',2),('b',2),('b',3)])
- # 通过groupBy函数对数据进行分组
- # groupBy函数传入函数的意思是:通过这个函数,来确定按照谁来分组(返回谁即可)
- # 分组规则和SQL一致:也就是相同的在同一个组(Hash分组)
- result_1 = test_rdd.groupBy(lambda t: t[0])
- result_2 = result_1.map(lambda t: (t[0],list(t[1])))
- print(result_1.collect())
- print(result_2.collect())
功能:对RDD数据进行去重复,返回新的RDD。
- #cording:utf8
-
- from pyspark import SparkConf,SparkContext
-
- if __name__ == '__main__':
- conf = SparkConf().setAppName('test').setMaster('local[*]')
- sc = SparkContext(conf=conf)
- rdd_1 = sc.parallelize((1,2,1,2,3,4,5,6))
- rdd_2 = sc.parallelize([('a',1),('b',1),('a',1),('a',1),('b',1),('c',1),('a',1)])
- # 使用distinct算子进行去重
- print('数字:',rdd_1.distinct().collect())
- print('元组:',rdd_2.distinct().collect())
功能:将两个RDD合并成一个RDD返回。只合并不去重,RDD的类型不同也是可以合并的。
- #corfding:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setAppName('test').setMaster('local[*]')
- sc = SparkContext(conf=conf)
- # 通过union算子合并RDD
- rdd_1 = sc.parallelize((1,2,3,4,5))
- rdd_2 = sc.parallelize((6,7,8,9,10))
- print(rdd_1.union(rdd_2).collect())
功能:对两个RDD执行join操作(可实现SQL外/内连接),join算子只能用于二元元组。
- #corfding:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setAppName('test').setMaster('local[*]')
- sc = SparkContext(conf=conf)
-
- rdd1 = sc.parallelize([(1001,'zhangsan'),(1002,'lisi'),(1003,'wangwu'),(1004,'zhaoliu')])
- rdd2 = sc.parallelize([(1001,'销售部'),(1002,'科技部')])
-
- # 通过join算子来进行rdd之间的关联
- # 对于join算子来说,关联条件按照二元元组的key来进行关联
- print(rdd1.join(rdd2).collect())
-
- # 左外连接,右外连接可以更换一下rdd的顺序或者调用rightOuterJoin即可
- print(rdd1.leftOuterJoin(rdd2).collect())
功能:求两个RDD的交集,返回一个新的RDD。
- #corfding:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setAppName('test').setMaster('local[*]')
- sc = SparkContext(conf=conf)
-
- rdd1 = sc.parallelize([('a',1),('b',3)])
- rdd2 = sc.parallelize([('a',1),('c',1)])
- # 通过intersection算子求出RDD的交集 取出并返回新的RDD
- print(rdd1.intersection(rdd2).collect())
功能:将RDD的数据,加上嵌套,这个嵌套按照分区来进行,比如RDD数据[1,2,3,4,5]有两个分区,那么glom后,数据变成:[[1,2,3],[4,5]]。
- #corfding:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setAppName('test').setMaster('local[*]')
- sc = SparkContext(conf=conf)
-
- rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
- print(rdd1.glom().collect())
- # 解嵌套操作
- print(rdd1.glom().flatMap(lambda x: x).collect())
功能:针对KV型RDD,自动按照key分组。
- #cording:utf8
-
- from pyspark import SparkConf,SparkContext
-
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
- # 创建数据
- test_rdd = sc.parallelize([('a',1),('b',1),('a',2),('b',2),('b',3)])
- # 使用groupByKey算子
- result_1 = test_rdd.groupByKey()
- #查看结果
- result_2 = result_1.map(lambda t: (t[0],list(t[1])))
- print(result_1.collect())
- print(result_2.collect())
功能:对RDD数据进行排序,基于你指定的排序依据。
- #cording:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
-
- rdd = sc.parallelize([('c',3),('f',1),('b',11),('c',3),('e',1),('n',9),('a',1)],3)
- # 使用sortBy对RDD执行排序
- # 按照value 数字进行排序
- # 参数1函数:表示的是,告知spark,按照数据的哪个列进行排序
- # 参数2:True表示升序 False表示降序
- # 参数3:排序的分区数
- '''注意:如果要全局有序,排序分区数设置为1'''
- print('按照value排序:',rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=3).collect())
- # 按照key进行排序
- print('按照key排序:',rdd.sortBy(lambda x: x[0], ascending=True, numPartitions=3).collect())
功能:针对KV型RDD,按照Key进行排序
- #cording:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
- rdd = sc.parallelize([('a',1),('E',1),('C',1),('D',1),('b',1),('g',1),('h',1),
- ( "y" ,1),('u',1),('i',1),('o',1),('p',1),
- ( 'm',1),('n',1),('L',1),('k',1),('f',1)],3)
- # 根据字母的小写排序
- print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: key.lower()).collect())
功能:统计key出现的次数(一般适用于KV型RDD)
- import json
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
-
- rdd = sc.textFile('../input/words.txt')
- rdd2 = rdd.flatMap(lambda x: x.split(' ')).map(lambda x: (x,1))
-
- # 通过countByKey来对key进行计数,这是一个Action算子
- result = rdd2.countByKey()
- print(result)
- print(type(result))
功能:将RDD各个分区的数据,统一收集到Driver中,形成一个list对象。这个算子,是将RDD各个分区数据都拉取到Driver,注意的是,RDD是分布式对象,其数据量可以很大,所以用这个算子之前,要心知肚明的了解结果数据集不会太大,不然,会把Driver内存撑爆。
功能:对RDD数据集按照你传入的逻辑进行聚合。
- import json
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
-
- rdd = sc.parallelize([1,2,3,4,5,6])
- print(rdd.reduce(lambda a,b: a+b))
功能:随机抽样RDD数据,随机数种子数字可以随便传,如果传同一个数字,那么取出的结果是一致的。一般参数三不传,spark会自动给与一个随机的种子。
-
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
-
- rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10,1,2])
- print('True:',rdd.takeSample(True,22))
- print('False:',rdd.takeSample(False,22))
- print('无随机种子1:',rdd.takeSample(True,5))
- print('无随机种子2:', rdd.takeSample(True, 5))
- print('有随机种子1:',rdd.takeSample(True,5,1))
- print('有随机种子2:', rdd.takeSample(True, 5, 1))
功能:对RDD进行排序取前N个。
- #cording:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
-
- rdd = sc.parallelize([1,5,4,2,3,6])
- print('普通:',rdd.takeOrdered(3))
- # 函数操作只会对结果产生影响,不会影响数据本身
- print("传入函数:",rdd.takeOrdered(3, lambda x: -x))
功能:与map功能相似,但区别是,mapPartition一次被传递的是一整个分区的数据,是作为一个迭代器(一次性list)对象传入过来,而map是一个一个数据的传递。
- #cording:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
- rdd = sc.parallelize([1, 5, 4, 2, 3, 6],3)
- def process(iter):
- result = list()
- for it in iter:
- result.append(it * 10)
- return result
-
- # mapPartitions算子相比于map算子,节省了大量打IO操作,每一个分区只需要进行一次IO操作即可
- print('输出结果:',rdd.mapPartitions(process).collect())
功能:和普通的foreach一致,一次处理的是一整个分区的数据。
- #cording:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
- rdd = sc.parallelize([1, 5, 4, 2, 3, 6],3)
- def process(iter):
- result = list()
- for it in iter:
- result.append(it * 10)
- print(result)
-
- rdd.foreachPartition(process)
功能:对RDD进行自定义分区操作。
- #cording:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
- rdd = sc.parallelize([('hadoop',1),('hadoop',1),('hello',1),('spark',1),('flink',1),('spark',1)])
-
- # 使用partitionBy自定义分区
- def process(x):
- if 'hadoop' == x or 'hello' == x:return 0
- if 'spark' == x:return 1
- return 2
- # 使用glom算子将每个分区的数据进行嵌套
- print('显示分区:',rdd.partitionBy(3, process).glom().collect())
功能:对RDD的分区执行重新分区(仅数量)
ps:对分区的数量进行操作,一定要慎重,一般情况下,我们写Spark代码除了要求全局排序设置为1个分区外多数时候,所有API中关于分区相关的代码我们都不太理会。因为,如果你改分区了,会影响并行计算(内存迭代的并行管道数量)后面学分区如果增加,极大可能导致shuffle。
- #cording:utf8
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setMaster('local[*]').setAppName('test')
- sc = SparkContext(conf=conf)
- rdd = sc.parallelize([1, 5, 4, 2, 3, 6],3)
-
- # repartition 修改分区
- # 减少分区
- print("减少分区为1:",rdd.repartition(1).getNumPartitions())
- # 增加分区
- print("增加分区为5:", rdd.repartition(5).getNumPartitions())
- # coalesce 修改分区
- # 减少分区
- print("减少分区为1:",rdd.coalesce(1).getNumPartitions())
- # 增加分区 ps:coalesce增加分区数量需要指定参数shuffle为True才能1成功修改
- print("减少分区为5:", rdd.coalesce(5).getNumPartitions())
- print("减少分区为5:",rdd.coalesce(5, shuffle=True).getNumPartitions())
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。