当前位置:   article > 正文

PySpark基础 —— RDD_pyspark rdd

pyspark rdd

一、Spark环境测试

1.导入相关库

  1. # import os
  2. # os.environ['JAVA_HOME'] = 'D:\ProgramData\Spark\jdk1.8.0_302'
  3. # os.environ['HADOOP_HOME'] = 'D:\ProgramData\Spark\winutils-master\hadoop-2.7.1'
  4. # os.environ['SPARK_HOME'] = 'D:\ProgramData\Spark\spark-3.1.2-bin-hadoop2.7'
  5. from pyspark.sql import SparkSession
  6. import findspark
  7. findspark.init()

2.创建SparkSession实例

  1. # local本地模式
  2. # [*], 最大的线程数量
  3. # [4], 线程数量设置为4
  4. spark = SparkSession.Builder().master("local[*]").getOrCreate()
  5. spark
  6. # http://localhost:4040/

 3.创建Spark的DataFrame

  1. df = spark.createDataFrame(
  2. data=[['python', '数据分析'],
  3. ['pyspark', '大数据']],
  4. schema=('name', 'type'))
  5. df.show()
  6. # 关闭SparkSession
  7. # spark.stop()
+-------+--------+
|   name|    type|
+-------+--------+
| python|数据分析|
|pyspark|  大数据|
+-------+--------+

4.创建Pandas的DataFrame

  1. import numpy as np
  2. import pandas as pd
  3. pd_df = pd.DataFrame(np.random.rand(100, 3))
  4. pd_df.head(10)

 5.从Pandas的DataFrame创建Spark的DataFrame

  1. spark_df = spark.createDataFrame(pd_df)
  2. spark_df.show(10)
+-------------------+-------------------+--------------------+
|                  0|                  1|                   2|
+-------------------+-------------------+--------------------+
| 0.7734370300584474|0.42283178859893444|  0.8257498529298667|
|0.44575544415993906|0.49245180252222975|0.014261692547622662|
| 0.3420733794127957| 0.8822635169563398| 0.35380553666355063|
|0.31045724993989887|0.12137972216632553| 0.08901413277815406|
| 0.7241060466628902| 0.6316423526465608|  0.3991496071189753|
|0.22678194237871974| 0.9869818222587557|  0.6060528459473943|
|0.22495181866362846| 0.4185845149128945| 0.47356977129591526|
| 0.7396151249153267| 0.7804451983660282|  0.9502911251018666|
|0.15263591158972922| 0.8882795838843202|  0.3790204587517769|
| 0.9089614551221472| 0.2663836523951706|  0.8517316157986443|
+-------------------+-------------------+--------------------+
only showing top 10 rows

6.将Spark的DataFrame转为Pandas的DataFrame

  1. pd_df = spark_df.select("*").toPandas()
  2. pd_df.head(10)

 二、RDD

1.RDD——创建RDD

  1. import pyspark
  2. from pyspark import SparkContext, SparkConf
  3. import findspark
  4. findspark.init()
  5. # 或sc = SparkContext(master='local[*]', appName='test')
  6. # SparkContext,无法同时运行多个SparkContext环境
  7. conf = SparkConf().setAppName('test').setMaster('local[*]')
  8. sc = SparkContext(conf=conf)

1.查看Spark环境信息

  1. # 查看Python版本
  2. sc.pythonVer
  3. # '3.8'
  4. # 查看Spark版本
  5. sc.version
  6. # 或pyspark.__version__
  7. # '3.1.2'
  8. # 查看主机URL
  9. sc.master
  10. # 'local[*]'
  11. # 查看运行Spark的用户名称
  12. sc.sparkUser()
  13. # 'joe'
  14. # 查看应用程序ID
  15. sc.applicationId
  16. # 'local-1665974057511'
  17. # 查看应用程序名称
  18. sc.appName
  19. # 'test'
  20. # 查看默认的并行级别(线程数量)
  21. sc.defaultParallelism
  22. # 4
  23. # 查看默认的最小分区数量
  24. sc.defaultMinPartitions
  25. # 2
  26. # 查看Spark Web URL
  27. sc.uiWebUrl
  28. # 'http://DESKTOP-H03ONKG:4041'
  29. # 停止运行Spark
  30. # sc.stop()
  31. # '3.8'

2.创建RDD

  • 创建RDD主要有两种方式
  • 第一种:textFile方法
  • 第二种:parallelize方法

 2.1.textFile方法

  • 本地文件系统加载数据

  1. # 第2个参数,指定分区数量
  2. file = "./data/hello.txt"
  3. rdd = sc.textFile(file, 3)
  4. # 展示所有元素
  5. rdd.collect()
  6. # ['python', 'numpy', 'pandas', 'matplotlib', 'pyspark']

 2.2.parallelize方法

  1. # 第2个参数,指定分区数量
  2. rdd = sc.parallelize(range(1, 11), 2)
  3. rdd.collect()
  4. # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  5. # 查看RDD的id
  6. rdd.id()
  7. # 3
  8. # 查看分区数量
  9. rdd.getNumPartitions()
  10. # 2

 2.3.wholeTextFiles方法

  1. # 读取文件夹下所有文件
  2. folder = './data/folder/'
  3. rdd = sc.wholeTextFiles(folder)
  4. rdd.collect()
  5. '''
  6. [('file:/C:/课程/PySpark/data/folder/1.txt', '第1个text文件内容'),
  7. ('file:/C:/课程/PySpark/data/folder/2.txt', '第2个text文件内容'),
  8. ('file:/C:/课程/PySpark/data/folder/3.txt', '第3个text文件内容'),
  9. ('file:/C:/课程/PySpark/data/folder/4.txt', '第4个text文件内容'),
  10. ('file:/C:/课程/PySpark/data/folder/5.txt', '第5个text文件内容'),
  11. ('file:/C:/课程/PySpark/data/folder/6.txt', '第6个text文件内容')]
  12. '''

2.RDD——动作算子

  1. import pyspark
  2. from pyspark import SparkContext, SparkConf
  3. import findspark
  4. findspark.init()
  5. conf = SparkConf().setAppName('test').setMaster('local[*]')
  6. sc = SparkContext(conf=conf)
  • Action动作算子/行动操作

1.collect

  1. rdd = sc.parallelize(range(10))
  2. # 查看所有的元素
  3. rdd.collect()
  4. # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

2.take

  1. rdd = sc.parallelize(range(10))
  2. # 查看指定数量的元素
  3. rdd.take(4)
  4. # [0, 1, 2, 3]

 3.first

  1. rdd = sc.parallelize(range(10))
  2. # 获取第1个元素
  3. rdd.first()
  4. # 0

4.top

  1. rdd = sc.parallelize(range(10))
  2. # 获取top n的元素
  3. rdd.top(3)
  4. # [9, 8, 7]

5.takeOrdered

  1. rdd = sc.parallelize([10, 7, 6, 9, 4, 3, 5, 2, 1])
  2. # 按指定规则排序后,再抽取指定数量的元素
  3. # 升序后抽取
  4. rdd.takeOrdered(num=5)
  5. # [1, 2, 3, 4, 5]
  6. # 降序后抽取
  7. rdd.takeOrdered(num=5, key=lambda x: -x)
  8. # [10, 9, 7, 6, 5]

6.takeSample

  1. rdd = sc.parallelize(range(10))
  2. # 随机抽取指定数量的元素
  3. # 第1个参数,是否重复抽样
  4. # 第2个参数,抽样数量
  5. # 第3个参数,随机种子
  6. rdd.takeSample(False, 5, 0)
  7. # [7, 8, 1, 5, 3]

7.count

  1. rdd = sc.parallelize(range(10))
  2. # 查看元素数量
  3. rdd.count()
  4. # 10

8.sum

  1. rdd = sc.parallelize(range(10))
  2. rdd.sum() # 求和
  3. rdd.max() # 最大值
  4. rdd.min() # 最小值
  5. rdd.mean() # 平均值
  6. rdd.stdev() # 总体标准差
  7. rdd.variance() # 总体方差
  8. rdd.sampleStdev() # 样本标准差
  9. rdd.sampleVariance() # 样本方差
  10. rdd.stats() # 描述统计
  11. # (count: 10, mean: 4.5, stdev: 2.8722813232690143, max: 9.0, min: 0.0)

9.histogram

  1. rdd = sc.parallelize(range(51))
  2. rdd.count()
  3. # 51
  4. # 按指定箱数,分组统计频数
  5. rdd.histogram(2)
  6. # ([0, 25, 50], [25, 26])
  7. # 第1组[0, 25): 25
  8. # 第2组[25, 50]: 26
  9. # ([0, 25, 50], [25, 26])
  10. # 按指定区间,分组统计频数
  11. rdd.histogram([0, 10, 40, 50])’
  12. # ([0, 10, 40, 50], [10, 30, 11])

10.fold

  1. rdd = sc.parallelize(range(10))
  2. # 按指定函数(add加法)对元素折叠
  3. from operator import add
  4. rdd.fold(0, add)
  5. # 45

11.reduce

  1. rdd = sc.parallelize(range(10))
  2. # 二元归并操作,如累加
  3. # 逐步对两个元素进⾏操作
  4. rdd.reduce(lambda x, y: x + y)
  5. from operator import add
  6. rdd.reduce(add)
  7. # 45

12.foreach

  1. rdd = sc.parallelize(range(10))
  2. # 对每个元素执行一个函数操作
  3. # accumulator累加器
  4. acc = sc.accumulator(value=0)
  5. rdd.foreach(lambda x: acc.add(x))
  6. acc.value
  7. # 45

13.collectAsMap

  1. rdd = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
  2. # 将RDD转换为字典
  3. rdd.collectAsMap()
  4. # {'a': 1, 'b': 2, 'c': 3}

14.saveAsTextFile

  1. rdd = sc.parallelize(range(5))
  2. # 保存rdd为text文件到本地
  3. # 如文件已存在, 将报错
  4. rdd.saveAsTextFile("./data/rdd.txt")

15.textFile

  1. # 加载text文件
  2. rdd = sc.textFile("./data/rdd.txt")
  3. # 判断是否为空
  4. rdd.isEmpty()
  5. # False
  6. rdd.collect()
  7. # ['0', '1', '2', '3', '4']

3.RDD——变换算子

  1. import pyspark
  2. from pyspark import SparkContext, SparkConf
  3. import findspark
  4. findspark.init()
  5. conf = SparkConf().setAppName('test').setMaster('local[*]')
  6. sc = SparkContext(conf=conf)
  • Transformation变换算子/转换操作

1.map

  1. rdd = sc.parallelize(range(10))
  2. rdd.collect()
  3. # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  4. # 对每个元素映射一个函数操作,如求平方
  5. rdd.map(lambda x: x**2).collect()
  6. # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

2.filter

  1. # 筛选数据,如筛选大于5的元素
  2. rdd.filter(lambda x: x > 5).collect()
  3. # [6, 7, 8, 9]

3.flatMap

  1. rdd = sc.parallelize(["hello world", "hello python"])
  2. rdd.collect()
  3. # flat展平
  4. # ['hello world', 'hello python']
  5. # 先以空格拆分为二维结构
  6. rdd.map(lambda x: x.split(" ")).collect()
  7. # [['hello', 'world'], ['hello', 'python']]
  8. # 对每个元素映射一个函数操作
  9. # 并将结果数据进行扁平化(展平)
  10. rdd.flatMap(lambda x: x.split(" ")).collect()
  11. # ['hello', 'world', 'hello', 'python']

4.sample

  1. rdd = sc.parallelize(range(10))
  2. # 每个分区按比例抽样
  3. # 第1个参数,是否重复抽样
  4. # 第2个参数,抽样概率
  5. # 第3个参数,随机种子
  6. rdd.sample(False, 0.5, 666).collect()
  7. # [1, 2, 3, 8]

5.distinct

  1. rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 4, 5])
  2. # 去重
  3. rdd.distinct().collect()
  4. # [4, 1, 5, 2, 3]

6.subtract

  1. a = sc.parallelize(range(10))
  2. a.collect()
  3. # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  4. b = sc.parallelize(range(5, 15))
  5. b.collect()
  6. # [5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
  7. # 差集,a-b
  8. a.subtract(b).collect()
  9. # [0, 1, 2, 3, 4]

7.union

  1. # 并集,a+b
  2. a.union(b).collect()
  3. # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

8.intersection

  1. # 交集
  2. a.intersection(b).collect()
  3. # [8, 9, 5, 6, 7]

9.cartesian

  1. a = sc.parallelize([1, 2])
  2. b = sc.parallelize(["python", "pyspark"])
  3. # 笛卡尔积
  4. a.cartesian(b).collect()
  5. # [(1, 'python'), (1, 'pyspark'), (2, 'python'), (2, 'pyspark')]

10.sortBy

  1. rdd = sc.parallelize([(1, 2, 3), (3, 2, 2), (4, 1, 1)])
  2. # 按第3列排序,默认升序
  3. rdd.sortBy(
  4. keyfunc=lambda x: x[2],
  5. ascending=True
  6. ).collect()
  7. # [(4, 1, 1), (3, 2, 2), (1, 2, 3)]

11.zip

  1. rdd1 = sc.parallelize([1, 2, 3])
  2. rdd2 = sc.parallelize(["python", "pandas", "pyspark"])
  3. # 两个RDD必须具有相同的分区,每个分区元素数量相同
  4. # 类似于python内置函数zip
  5. rdd1.zip(rdd2).collect()
  6. # [(1, 'python'), (2, 'pandas'), (3, 'pyspark')]

12.zipWithIndex

  1. rdd = sc.parallelize(["python", "pandas", "pyspark"])
  2. # 将RDD和索引压缩, 类似于python内置函数enumerate
  3. rdd.zipWithIndex().collect()
  4. '''
  5. 0 python
  6. 1 pandas
  7. 2 pyspark
  8. [('python', 0), ('pandas', 1), ('pyspark', 2)]
  9. '''
  10. lst = ["python", "pandas", "pyspark"]
  11. for i, v in enumerate(lst):
  12. print(i, v)
  13. '''
  14. 0 python
  15. 1 pandas
  16. 2 pyspark
  17. '''

4.RDD——PairRDD变换算子

  1. import pyspark
  2. from pyspark import SparkContext, SparkConf
  3. import findspark
  4. findspark.init()
  5. conf = SparkConf().setAppName('test').setMaster('local[*]')
  6. sc = SparkContext(conf=conf)
  • PairRDD变换算子
  • 包含key和value的RDD,类似python的字典

1.KeyBy

  1. rdd = sc.parallelize(["a", "b", "c"])
  2. # 创建一个键值对RDD
  3. # 以函数返回值作为key,原有元素作为value
  4. rdd.keyBy(lambda x: 1).collect()
  5. # [(1, 'a'), (1, 'b'), (1, 'c')]

2.lookup

  1. rdd = sc.parallelize(
  2. [("python", 1),
  3. ("python", 2),
  4. ("pandas", 3),
  5. ("pandas", 4)])
  6. # 获取RDD的键
  7. rdd.keys().collect()
  8. # ['python', 'python', 'pandas', 'pandas']
  9. # 获取RDD的值
  10. rdd.values().collect()
  11. # [1, 2, 3, 4]
  12. dct = {"python": 1, "pandas": 3}
  13. dct.keys()
  14. # dict_keys(['python', 'pandas'])
  15. dct.values()
  16. # dict_values([1, 3])
  17. dct.items()
  18. # dict_items([('python', 1), ('pandas', 3)])
  19. dct['python']
  20. # 1
  21. # 通过key访问value,动作算子
  22. rdd.lookup("python")
  23. # [1, 2]

3.reduceByKey

  1. rdd.collect()
  2. # [('python', 1), ('python', 2), ('pandas', 3), ('pandas', 4)]
  3. # 以key分组对value执行二元归并操作,比如求和
  4. rdd.reduceByKey(lambda x, y: x+y).collect()
  5. # [('python', 3), ('pandas', 7)]

4.reduceByKeyLocally

  1. # 以key分组并按指定函数合并value,返回python字典
  2. from operator import add
  3. dct = rdd.reduceByKeyLocally(add)
  4. dct
  5. # {'python': 3, 'pandas': 7}

5.foldByKey

  1. # 以key分组并按指定函数(add加法)合并value
  2. # 类似reduceByKey(分组求和)
  3. # fold折叠,必须传递zeroValue的初始值
  4. from operator import add
  5. rdd.foldByKey(0, add).collect()
  6. # [('python', 3), ('pandas', 7)]

6.combineByKey

  1. # 以key分组按指定函数合并value,合并后返回列表
  2. # createCombiner,将value转换为列表
  3. # mergeValue,将value添加至列表
  4. # mergeCombiners,将多个列表合并为一个列表
  5. def to_list(x):
  6. return [x]
  7. def append(x, y):
  8. x.append(y)
  9. return x
  10. def extend(x, y):
  11. x.extend(y)
  12. return x
  13. rdd.combineByKey(to_list, append, extend).collect()
  14. # [('python', [1, 2]), ('pandas', [3, 4])]

7.subtractByKey

  1. x = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
  2. y = sc.parallelize([("a", 2), ("b", 2)])
  3. # 按key求差集
  4. x.subtractByKey(y).collect()
  5. # [('c', 3)]

8.groupBy

  1. rdd = sc.parallelize(range(10))
  2. # 将RDD转换为迭代器
  3. iterator = rdd.toLocalIterator()
  4. type(iterator)
  5. # generator
  6. # groupBy:以函数返回值分组合并,合并后返回迭代器
  7. # 如奇数为一个迭代器,偶数为一个迭代器
  8. rdd_new = rdd.groupBy(lambda x: x % 2).collect()
  9. rdd_new
  10. '''
  11. [(0, <pyspark.resultiterable.ResultIterable at 0x241bc7b8820>),
  12. (1, <pyspark.resultiterable.ResultIterable at 0x241bc2c2370>)]
  13. '''
  14. [[x, list(y)] for x, y in rdd_new]
  15. # [[0, [0, 2, 4, 6, 8]], [1, [1, 3, 5, 7, 9]]]

9.groupByKey

  1. rdd = sc.parallelize(
  2. [("python", 1),
  3. ("python", 2),
  4. ("pandas", 3),
  5. ("pandas", 4)])
  6. # 以key分组合并value,合并后返回迭代器
  7. rdd_new = rdd.groupByKey().collect()
  8. [[x, list(y)] for x, y in rdd_new]
  9. # [['python', [1, 2]], ['pandas', [3, 4]]]

10.mapValues

  1. rdd = sc.parallelize(
  2. [("python", [1, 2]),
  3. ("pandas", [3, 4])])
  4. # 对value应用一个函数操作,比如求和
  5. rdd.mapValues(sum).collect()
  6. # [('python', 3), ('pandas', 7)]

11.groupBy+mapValues

  1. rdd = sc.parallelize(range(10))
  2. # 以函数返回值分组合并,合并后返回列表
  3. # 如奇数为一个列表,偶数为一个列表
  4. rdd.groupBy(lambda x: x % 2).mapValues(list).collect()
  5. # [(0, [0, 2, 4, 6, 8]), (1, [1, 3, 5, 7, 9])]

12.groupByKey+mapValues

  1. rdd = sc.parallelize(
  2. [("python", 1),
  3. ("python", 2),
  4. ("pandas", 3),
  5. ("pandas", 4)])
  6. # 以key分组合并value为列表
  7. rdd.groupByKey().mapValues(list).collect()
  8. # [('python', [1, 2]), ('pandas', [3, 4])]
  9. # 以key分组求value之和
  10. rdd.groupByKey().mapValues(sum).collect()
  11. # [('python', 3), ('pandas', 7)]
  12. # 以key分组求value最大值
  13. rdd.groupByKey().mapValues(max).collect()
  14. # [('python', 2), ('pandas', 4)]

13.countByKey

  1. # 以key分组计数,返回字典
  2. rdd.countByKey().items()
  3. # dict_items([('python', 2), ('pandas', 2)])

14.countByValue

  1. rdd1 = sc.parallelize([(1, 1), (1, 1), (3, 4), (2, 1)])
  2. rdd2 = sc.parallelize([1, 2, 2, 3, 3, 3])
  3. # 如为键值对RDD,则以键值对(k-v)分组计数,返回字典
  4. rdd1.countByValue().items()
  5. # {(1, 1): 2, (3, 4): 1, (2, 1): 1}
  6. # 如为单元素RDD,则以值(v)分组计数,返回字典
  7. rdd2.countByValue().items()
  8. # [(1, 1), (2, 2), (3, 3)]

15.cogroup

  1. x = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
  2. y = sc.parallelize([("a", 4), ("b", 5), ("b", 6)])
  3. # groupWith等价于cogroup(combine group)
  4. # 以key分组合并value,合并后返回迭代器
  5. # 先对两个RDD分别goupByKey,再对合并结果groupByKey
  6. rdd = x.cogroup(y).collect()
  7. [[x, [list(z) for z in y]] for x, y in rdd]
  8. # [['a', [[1, 3], [4]]], ['b', [[2], [5, 6]]]]

16.sortByKey

  1. rdd = sc.parallelize(
  2. [("python", 1),
  3. ("python", 2),
  4. ("pandas", 3),
  5. ("pandas", 4)])
  6. # 按key排序
  7. rdd.sortByKey().collect()
  8. # [('pandas', 3), ('pandas', 4), ('python', 1), ('python', 2)]

17.sampleByKey

  1. fruit = sc.parallelize(["apple", "banana"])
  2. number = sc.parallelize(range(10))
  3. # cartesian笛卡尔积
  4. rdd = fruit.cartesian(number)
  5. rdd.collect()
  6. '''
  7. [('apple', 0),
  8. ('apple', 1),
  9. ('apple', 2),
  10. ('apple', 3),
  11. ('apple', 4),
  12. ('apple', 5),
  13. ('apple', 6),
  14. ('apple', 7),
  15. ('apple', 8),
  16. ('apple', 9),
  17. ('banana', 0),
  18. ('banana', 1),
  19. ('banana', 2),
  20. ('banana', 3),
  21. ('banana', 4),
  22. ('banana', 5),
  23. ('banana', 6),
  24. ('banana', 7),
  25. ('banana', 8),
  26. ('banana', 9)]
  27. '''
  28. # 以key分组按比例随机抽样
  29. # withReplacement是否放回抽样
  30. # fractions抽样比例
  31. # seed随机种子
  32. frac = {"apple": 0.3, "banana": 0.5}
  33. rdd.sampleByKey(False, frac, 999).collect()
  34. '''
  35. [('apple', 1),
  36. ('apple', 4),
  37. ('apple', 5),
  38. ('apple', 7),
  39. ('apple', 8),
  40. ('banana', 0),
  41. ('banana', 1),
  42. ('banana', 2),
  43. ('banana', 4),
  44. ('banana', 5),
  45. ('banana', 8),
  46. ('banana', 9)]
  47. '''

18.flatMapValues

  1. rdd = sc.parallelize([("a", [1, 2, 3]),
  2. ("b", [4, 5, 6])])
  3. # 将value进行扁平化(展平),类似pandas的explode
  4. rdd.flatMapValues(lambda x: x).collect()
  5. # [('a', 1), ('a', 2), ('a', 3), ('b', 4), ('b', 5), ('b', 6)]

19.join

  1. age = sc.parallelize(
  2. [("jack", 20),
  3. ("rose", 18),
  4. ("tony", 20)])
  5. gender = sc.parallelize(
  6. [("jack", "male"),
  7. ("rose", "female"),
  8. ("tom", "male")])
  9. # 按key内连接
  10. age.join(gender).collect()
  11. # [('jack', (20, 'male')), ('rose', (18, 'female'))]

20.leftOuterJoin

  1. # 按key左连接
  2. age.leftOuterJoin(gender).collect()
  3. # [('jack', (20, 'male')), ('tony', (20, None)), ('rose', (18, 'female'))]

21.rightOuterJoin

  1. # 按key右连接
  2. age.rightOuterJoin(gender).collect()
  3. # [('tom', (None, 'male')), ('jack', (20, 'male')), ('rose', (18, 'female'))]

22.fullOuterJoin

  1. # 按key全连接
  2. age.fullOuterJoin(gender).collect()
  3. '''
  4. [('tom', (None, 'male')),
  5. ('jack', (20, 'male')),
  6. ('tony', (20, None)),
  7. ('rose', (18, 'female'))]
  8. '''

5.RDD——分区

  1. import pyspark
  2. from pyspark import SparkContext, SparkConf
  3. import findspark
  4. findspark.init()
  5. conf = SparkConf().setAppName('test').setMaster('local[*]')
  6. sc = SparkContext(conf=conf)

1.glom

  1. rdd = sc.parallelize(range(10), 2)
  2. rdd.collect()
  3. # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  4. # 将每个分区的元素转换为列表
  5. rdd.glom().collect()
  6. # [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

2.coalesce

  1. # hive: coalesce空值处理
  2. rdd = sc.parallelize(range(10), 3)
  3. rdd.glom().collect()
  4. # [[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
  5. # 重置分区数量
  6. # shuffle=True,增加至指定分区数量
  7. # shuffle=False,减少至指定分区数量
  8. rdd_new = rdd.coalesce(2, shuffle=False)
  9. rdd_new.glom().collect()
  10. # [[0, 1, 2], [3, 4, 5, 6, 7, 8, 9]]

3.repartition

  1. # 单元素RDD重置分区数量
  2. rdd1 = sc.parallelize(range(10), 3)
  3. # 键值对RDD重置分区数量
  4. rdd2 = sc.parallelize(
  5. [("a", 1),
  6. ("a", 2),
  7. ("a", 3),
  8. ("c", 4)])
  9. # 增加分区数量,实际上调用coalesce(shuffle=True)
  10. # 减少分区数量,实际上调用coalesce(shuffle=False)
  11. rdd1.repartition(4).glom().collect()
  12. # [[6, 7, 8, 9], [3, 4, 5], [], [0, 1, 2]]
  13. # 按key打乱,相同key不一定在同一分区
  14. rdd2.repartition(2).glom().collect()
  15. # [[('a', 1), ('a', 3), ('c', 4)], [('a', 2)]]

4.partitionBy

  1. # 键值对RDD重置分区数量
  2. rdd2 = sc.parallelize(
  3. [("a", 1),
  4. ("a", 2),
  5. ("a", 3),
  6. ("c", 4)])
  7. # 相同key一定在同一个分区
  8. rdd2.partitionBy(2).glom().collect()
  9. # [[('c', 4)], [('a', 1), ('a', 2), ('a', 3)]]

5.mapPartitions

  1. rdd = sc.parallelize(range(10), 2)
  2. # 对每个分区分别应用一个函数,如求和
  3. # 函数必须使用yield关键字(即生成器), 生成器返回迭代器
  4. def func(x): yield sum(x)
  5. rdd.mapPartitions(func).collect()
  6. # [10, 35]

6.mapPartitionsWithIndex

  1. rdd = sc.parallelize(range(10), 2)
  2. # 对每个分区分别应用一个函数,如求和
  3. # 并且对每个分区添加索引
  4. # 函数必须使用yield关键字(即生成器)
  5. def func(i, x): yield i, sum(x)
  6. rdd.mapPartitionsWithIndex(func).collect()
  7. # [(0, 10), (1, 35)]

7.repartitionAndSortWithinPartitions

  1. rdd = sc.parallelize(
  2. [(0, 1),
  3. (3, 2),
  4. (1, 3),
  5. (0, 4),
  6. (3, 5),
  7. (2, 6)])
  8. # 按指定函数进行重新分区repartition
  9. # 并在每个分区内按key排序SortWithinPartitions
  10. rdd_new = rdd.repartitionAndSortWithinPartitions(
  11. numPartitions=2,
  12. partitionFunc=lambda x: x % 2,
  13. ascending=True)
  14. rdd_new.glom().collect()
  15. # [[(0, 1), (0, 4), (2, 6)], [(1, 3), (3, 2), (3, 5)]]

8.foreachPartition

  1. rdd = sc.parallelize(range(10), 2)
  2. rdd.glom().collect()
  3. # [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
  4. # 对每个分区分别执行一个函数操作
  5. # 先对每个分区求和
  6. # 再对每个分区的执行结果求和
  7. acc = sc.accumulator(value=0)
  8. def func(x): acc.add(sum(x))
  9. rdd.foreachPartition(func)
  10. acc.value
  11. # 45

9.aggregate

aggregate函数

  1. # aggregate(zeroValue, seqOp, combOp)
  2. # zeroValue:必须传递初始值
  3. # seqOp:先对每个分区分别执行一个函数操作
  4. # combOp:再对每个分区的执行结果,执行另一个函数操作
  5. # 求元素之和及元素个数
  6. rdd = sc.parallelize(range(1, 10), 3)
  7. print(rdd.glom().collect())
  8. # 第1个分区的元素:[1, 2, 3]
  9. # 第2个分区的元素:[4, 5, 6]
  10. # 第3个分区的元素:[7, 8, 9]
  11. seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
  12. combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
  13. rdd.aggregate(
  14. zeroValue=(0, 0),
  15. seqOp=seqOp,
  16. combOp=combOp)
  17. # (45, 9)

seqOp执行过程

  1. # seqOp:先对每个分区分别执行一个函数操作
  2. # 第1个分区的元素:[1, 2, 3]
  3. # 第2个分区的元素:[4, 5, 6]
  4. # 第3个分区的元素:[7, 8, 9]
  5. seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
  6. x[0] + y:累计求和;x[1] + 1:累计计数
  7. x等于zeroValue初始值
  8. 1个分区的执行结果:(6, 3)
  9. x=(0, 0), y=[1, 2, 3]
  10. (0+1, 0+1):x与y的第1个元素
  11. (1+2, 1+1):上一步结果与y的第2个元素
  12. (3+3, 2+1):上一步结果与y的第3个元素
  13. 2个分区的执行结果:(15, 3)
  14. x=(0, 0), y=[4, 5, 6]
  15. (0+4, 0+1):x与y的第1个元素
  16. (4+5, 1+1):上一步结果与y的第2个元素
  17. (9+6, 2+1):上一步结果与y的第3个元素
  18. 3个分区的执行结果:(24, 3)
  19. x=(0, 0), y=[7, 8, 9]
  20. (0+7, 0+1):x与y的第1个元素
  21. (7+8, 1+1):上一步结果与y的第2个元素
  22. (15+9, 2+1):上一步结果与y的第3个元素

 combOp执行过程  

  1. # combOp:再对每个分区的执行结果,执行另一个函数操作
  2. # 第1个分区的执行结果:(6, 3)
  3. # 第2个分区的执行结果:(15, 3)
  4. # 第3个分区的执行结果:(24, 3)
  5. combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
  6. x[0] + y[0]:累计求和,x[1] + y[1]:累计求和
  7. x等于zeroValue初始值
  8. 第一步:zeroValue初始值与第1个分区的执行结果相加
  9. x=(0, 0), y=(6, 3)
  10. (0+6, 0+3)
  11. 第二步:上一步结果与第2个分区的执行结果相加
  12. x=(6, 3), y=(15, 3)
  13. (6+15, 3+3)
  14. 第三步:上一步结果与第3个分区的执行结果相加
  15. x=(21, 6), y=(24, 3)
  16. (21+24, 6+3)
  17. # 最终结果
  18. (45, 9)
  1. rdd = sc.parallelize(range(1, 10), 3)
  2. rdd.glom().collect()
  3. # [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
  4. # 求元素之和及元素个数
  5. # seqOp:先对每个分区分别执行一个函数操作
  6. # combOp:再对每个分区的执行结果,执行另一个函数操作
  7. # zeroValue:必须传递初始值
  8. # x累计求和,y累计计数
  9. seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
  10. combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
  11. # aggregate是动作算子
  12. rdd.aggregate(
  13. zeroValue=(0, 0),
  14. seqOp=seqOp,
  15. combOp=combOp)
  16. # (45, 9)

10.aggregateByKey

  1. rdd = sc.parallelize(
  2. [("orange", 1),
  3. ("orange", 2),
  4. ("banana", 3),
  5. ("orange", 4),
  6. ("banana", 5),
  7. ("banana", 6)], 2)
  8. rdd.glom().collect()
  9. '''
  10. [[('orange', 1), ('orange', 2), ('banana', 3)],
  11. [('orange', 4), ('banana', 5), ('banana', 6)]]
  12. '''
  13. # seqFunc:先对每个分区按key执行一个函数
  14. # combFunc:再对每个分区的执行结果,按key执行另一个函数
  15. # zeroValue:必须传递初始值
  16. # 按key分组求value的最大值
  17. # 高性能算子,执行效率高
  18. rdd_new = rdd.aggregateByKey(
  19. zeroValue=0,
  20. seqFunc=lambda x, y: max(x, y),
  21. combFunc=lambda x, y: max(x, y))
  22. rdd_new.collect()
  23. # [('orange', 4), ('banana', 6)]

6.RDD——缓存

  1. import pyspark
  2. from pyspark import SparkContext, SparkConf
  3. import findspark
  4. findspark.init()
  5. conf = SparkConf().setAppName('test').setMaster('local[*]')
  6. sc = SparkContext(conf=conf)

1.缓存的好处

# 什么缓存?
# 缓存是一种可以实现内存与CPU之间高速交换数据的存储器
# 工作原理: 当CPU要读取一个数据, 优先从缓存中查找, 找到就立即读取并发给CPU处理

# 如果一个RDD被多个任务调用, 那么可以缓存到内存中, 提高计算效率
# 如果一个RDD后续不再被调用, 那么可以立即释放缓存, 避免资源浪费

2.缓存到内存

  1. rdd = sc.parallelize(range(10000), 5)
  2. rdd.cache()
  3. # PythonRDD[1] at RDD at PythonRDD.scala:53
  4. rdd.getStorageLevel()
  5. # 常见的两种存储级别
  6. # 第1种: 缓存到内存
  7. # 第2种: 缓存到内存和磁盘
  8. # StorageLevel(False, True, False, False, 1)
  9. # 是否使用磁盘, False
  10. # 是否使用内存, True
  11. # 是否使用堆外内存, False
  12. # - java虚拟机概念(jvm)
  13. # - 堆外内存受操作系统管理
  14. # - 堆内内存受jvm管理
  15. # 是否以java反序列化格式存储, False
  16. # - 序列化: 将对象转换为可传输的字节序列的过程
  17. # - 反序列化: 将字节序列还原为对象的过程
  18. # 备份数量, 1
  19. # StorageLevel(False, True, False, False, 1)
  20. rdd_cnt = rdd.count()
  21. rdd_sum = rdd.reduce(lambda x, y: x+y)
  22. rdd_mean = rdd_sum/rdd_cnt
  23. print(rdd_mean)
  24. # 立即释放缓存
  25. rdd.unpersist()
  26. # 4999.5
  27. # PythonRDD[1] at RDD at PythonRDD.scala:53

3.缓存到内存和磁盘

  1. rdd = sc.parallelize(range(10000), 5)
  2. from pyspark.storagelevel import StorageLevel
  3. # 缓存到内存和磁盘中, MEMORY_AND_DISK
  4. # 如果内存存储不了, 其余部分存储至磁盘中
  5. rdd.persist(StorageLevel.MEMORY_AND_DISK)
  6. # 缓存到内存中
  7. # 等价于rdd.cache()
  8. # rdd.persist(StorageLevel.MEMORY_ONLY)
  9. # PythonRDD[3] at RDD at PythonRDD.scala:53
  10. rdd.getStorageLevel()
  11. # StorageLevel(True, True, False, False, 1)
  12. rdd_sum = rdd.reduce(lambda x, y: x+y)
  13. rdd_cnt = rdd.count()
  14. rdd_mean = rdd_sum/rdd_cnt
  15. print(rdd_mean)
  16. # 立即释放缓存
  17. rdd.unpersist()
  18. # 4999.5
  19. # PythonRDD[3] at RDD at PythonRDD.scala:53

7.RDD——共享变量

  1. import pyspark
  2. from pyspark import SparkContext, SparkConf
  3. import findspark
  4. findspark.init()
  5. conf = SparkConf().setAppName('test').setMaster('local[*]')
  6. sc = SparkContext(conf=conf)

1.广播变量

  1. # 设置广播变量, 提高计算效率
  2. rdd = sc.parallelize(range(10))
  3. rdd.collect()
  4. # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  5. broad = sc.broadcast(100)
  6. broad.value
  7. # 100
  8. rdd.map(lambda x: x+broad.value).collect()
  9. # [100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
  10. # 立即释放
  11. broad.unpersist()

2.累加器-求和

  1. rdd = sc.parallelize(range(10))
  2. rdd.collect()
  3. # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  4. acc = sc.accumulator(0)
  5. rdd.foreach(lambda x: acc.add(x))
  6. acc.value
  7. # 45

3.累加器求均值

  1. rdd = sc.parallelize(range(10000))
  2. # 累计求和
  3. acc_sum = sc.accumulator(0)
  4. # 累计计数
  5. acc_cnt = sc.accumulator(0)
  6. def func(x):
  7. acc_sum.add(x)
  8. acc_cnt.add(1)
  9. rdd.foreach(func)
  10. acc_sum.value/acc_cnt.value
  11. # 4999.5
  12. rdd.count()
  13. # 10000
  14. rdd.sum()
  15. # 49995000
  16. rdd.sum() / rdd.count()
  17. # 4999.5
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/556894
推荐阅读
相关标签
  

闽ICP备14008679号