当前位置:   article > 正文

[Spark]PySpark入门学习教程---RDD介绍(2)_dfvderdddfssdxfdakpooq

dfvderdddfssdxfdakpooq

 一 RDD

pyspark.RDD        SparkRDD

RDD指的是弹性分布式数据集(Resilient Distributed Dataset),它是spark计算的核心。尽管现在都使用 DataFrame、Dataset 进行编程,但是它们的底层依旧是依赖于RDD的。我们来解释一下 RDD 的这几个单词含义。

  • 弹性:在计算上具有容错性,spark是一个计算框架,如果某一个节点挂了,可以自动进行计算之间血缘关系的跟踪
  • 分布式:很好理解,hdfs上数据是跨节点的,那么spark的计算也是要跨节点的
  • 数据集:可以将数组、文件等一系列数据的集合转换为RDD

RDD 是 Spark 的一个最基本的抽象 (如果你看一下源码的话,你会发现RDD在底层是一个抽象类,抽象类显然不能直接使用,必须要继承它然后实现它内部的一些方法后才可以使用),它代表了不可变的、元素的分区(partition)集合,这些分区可以被并行操作。假设我们有一个包含 300 万个元素的数组,那么我们就可以将这个数组分成 3 份,每一份对应一个分区,每个分区都可以在不同的机器上进行运算,这样就能提高运算效率。

RDD 支持很多操作,比如:map、filter 等等,我们后面会慢慢介绍。当然,RDD在 Spark 的源码是一个类,但是我们后面有时候会把 RDD 和 RDD实例对象 都叫做 RDD,没有刻意区分,心里面清楚就可以啦。

大致上可分三大类算子:

  1、Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Value型的数据。

  2、Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。

  3、Action算子,这类算子会触发SparkContext提交作业。

二 创建RDD

创建RDD主要有两种方式

一个是textFile加载本地或者集群文件系统中的数据,

第二个是用parallelize方法将Driver中的数据结构并行化成RDD。

  1. #从本地文件系统中加载数据
  2. file = "/home/data/hello.txt"
  3. #从集群文件系统中加载数据
  4. #file = "hdfs://localhost:9000/user/hadoop/data.txt"
  5. #也可以省去hdfs://localhost:9000
  6. rdd = sc.textFile(file,3)
  7. rdd.collect()
  1. ['hello world',
  2. 'hello spark',
  3. 'spark love jupyter',
  4. 'spark love pandas',
  5. 'spark love sql']
  1. #parallelize将Driver中的数据结构生成RDD,第二个参数指定分区数
  2. rdd = sc.parallelize(range(1,11),2)
  3. rdd.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

三 常用Transformation操作

Transformation转换操作具有懒惰执行的特性,它只指定新的RDD和其父RDD的依赖关系,只有当Action操作触发到该依赖的时候,它才被计算。

 

3.1 Value型Transformation算子 

  1. import pyspark
  2. conf = pyspark.SparkConf().setMaster("local[4]").setAppName("PySpark_Transformation1")
  3. sc = pyspark.SparkContext(conf=conf)

[1] map

  1. # 以下的操作由于是Transform操作,因为我们需要在最后加上一个collect算子用来触发计算。
  2. # 1. map: 和python差不多,map转换就是对每一个元素进行一个映射
  3. rdd = sc.parallelize(range(1, 11), 4)
  4. rdd_map = rdd.map(lambda x: x*2)
  5. print("原始数据:", rdd.collect())
  6. print("扩大2倍:", rdd_map.collect())
  7. # 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  8. # 扩大2倍: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

[2] flatMap 

  1. # 2. flatMap: 这个相比于map多一个flat(压平)操作,顾名思义就是要把高维的数组变成一维
  2. rdd2 = sc.parallelize(["hello SamShare", "hello PySpark"])
  3. print("原始数据:", rdd2.collect())
  4. print("直接split之后的map结果:", rdd2.map(lambda x: x.split(" ")).collect())
  5. print("直接split之后的flatMap结果:", rdd2.flatMap(lambda x: x.split(" ")).collect())
  6. # 直接split之后的map结果: [['hello', 'SamShare'], ['hello', 'PySpark']]
  7. # 直接split之后的flatMap结果: ['hello', 'SamShare', 'hello', 'PySpark']

[3] mapPartitions 

  1. # mapPartitions: 根据分区内的数据进行映射操作
  2. rdd = sc.parallelize([1, 2, 3, 4], 2)
  3. def f(iterator):
  4.     yield sum(iterator)
  5. print(rdd.collect())
  6. print(rdd.mapPartitions(f).collect())
  7. # [1, 2, 3, 4]
  8. # [3, 7]

[4] union 

  1. # 9. union: 合并两个RDD
  2. rdd = sc.parallelize([1, 1, 2, 3])
  3. print(rdd.union(rdd).collect())
  4. # [1, 1, 2, 3, 1, 1, 2, 3]

[5] cartesian  

  1. # cartesian: 生成笛卡尔积
  2. rdd = sc.parallelize([1, 2])
  3. print(sorted(rdd.cartesian(rdd).collect()))
  4. # [(1, 1), (1, 2), (2, 1), (2, 2)] 

[6] groupByKey  

  1. # groupByKey: 按照key来聚合数据
  2. rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
  3. print(rdd.collect())
  4. print(sorted(rdd.groupByKey().mapValues(len).collect()))
  5. print(sorted(rdd.groupByKey().mapValues(list).collect()))
  6. # [('a', 1), ('b', 1), ('a', 1)]
  7. # [('a', 2), ('b', 1)]
  8. # [('a', [1, 1]), ('b', [1])]

[7] filter  

  1. # 3. filter: 过滤数据
  2. rdd = sc.parallelize(range(1, 11), 4)
  3. print("原始数据:", rdd.collect())
  4. print("过滤奇数:", rdd.filter(lambda x: x % 2 == 0).collect())
  5. # 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  6. # 过滤奇数: [2, 4, 6, 8, 10]

[8] distinct  

  1. # distinct: 去重元素
  2. rdd = sc.parallelize([2, 2, 4, 8, 8, 8, 8, 16, 32, 32])
  3. print("原始数据:", rdd.collect())
  4. print("去重数据:", rdd.distinct().collect())
  5. # 原始数据: [2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
  6. # 去重数据: [4, 8, 16, 32, 2] 

 [9] distinct  

  1. # subtract: 数据集相减, Return each value in self that is not contained in other.
  2. x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
  3. y = sc.parallelize([("a", 3), ("c", None)])
  4. print(sorted(x.subtract(y).collect()))
  5. # [('a', 1), ('b', 4), ('b', 5)]

[10] sample

  1. rdd = sc.parallelize(range(1,11),2) # 这里的 2 指的是分区数量
  2. print("原始数据:", rdd.collect())
  3. print("Sample数据:", rdd.sample(False, 0.5, 9).collect())
  4. # 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  5. # Sample数据: [3, 4, 6, 7, 8, 9]

 [11] takeSample

  1. rdd = sc.parallelize(range(1,15),2) # 这里的 2 指的是分区数量
  2. print("原始数据:", rdd.collect())
  3. print("taksSample数据:", rdd.takeSample(True, 4, 9))
  4. # 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  5. # takeSample数据: [3, 4, 6, 7, 8, 9]

 

[12] cache、persist 

 3.2 Key-Value型Transformation算子 

[1] mapValues

  1. rdd1 = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
  2. print("mapValues:", rdd.mapValues(len).collect())
  3. #mapValues  len: [('a', 3), ('b', 1)]
  4. rdd2 = sc.parallelize([("a", [1,2]), ("b", [3,4,5])])
  5. print("mapValues:", rdd2.mapValues(len).collect())
  6. #mapValues  sum: [('a', 3), ('b', 1)]

[2] reduceByKey

  1. # reduceByKey: 根据key来映射数据
  2. from operator import add
  3. rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
  4. print("原始数据:", rdd.collect())
  5. print("原始数据:", rdd.reduceByKey(add).collect())
  6. # 原始数据: [('a', 1), ('b', 1), ('a', 1)]
  7. # 原始数据: [('b', 1), ('a', 2)]

 [3] join | leftOuterJoin | ightOuterJoin

  1. # 16. join:
  2. x = sc.parallelize([("a", 1), ("b", 4)])
  3. y = sc.parallelize([("a", 2), ("a", 3)])
  4. print(sorted(x.join(y).collect()))
  5. # [('a', (1, 2)), ('a', (1, 3))]
  6. # 17. leftOuterJoin/rightOuterJoin
  7. x = sc.parallelize([("a", 1), ("b", 4)])
  8. y = sc.parallelize([("a", 2)])
  9. print(sorted(x.leftOuterJoin(y).collect()))
  10. # [('a', (1, 2)), ('b', (4, None))]

 其他

  1. # 7. sortBy: 根据规则进行排序
  2. tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
  3. print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect())
  4. print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect())
  5. # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
  6. # [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
  7. # subtractByKey 去除x中那些key也在y中的元素
  8. x = sc.parallelize([("a",1),("b",2),("c",3)])
  9. y = sc.parallelize([("a",2),("b",(1,2))])
  10. x.subtractByKey(y).collect()
  11. #[('c', 3)]
  12. # 10. intersection: 取两个RDD的交集,同时有去重的功效
  13. rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5, 2, 3])
  14. rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
  15. print(rdd1.intersection(rdd2).collect())
  16. # [1, 2, 3]
  17. # 12. zip: 拉链合并,需要两个RDD具有相同的长度以及分区数量
  18. x = sc.parallelize(range(0, 5))
  19. y = sc.parallelize(range(1000, 1005))
  20. print(x.collect())
  21. print(y.collect())
  22. print(x.zip(y).collect())
  23. # [0, 1, 2, 3, 4]
  24. # [1000, 1001, 1002, 1003, 1004]
  25. # [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
  26. # 13. zipWithIndex: 将RDD和一个从0开始的递增序列按照拉链方式连接。
  27. rdd_name = sc.parallelize(["LiLei", "Hanmeimei", "Lily", "Lucy", "Ann", "Dachui", "RuHua"])
  28. rdd_index = rdd_name.zipWithIndex()
  29. print(rdd_index.collect())
  30. # [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]
  31. # 15. sortByKey:
  32. tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
  33. print(sc.parallelize(tmp).sortByKey(True, 1).collect())
  34. # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

四  常用Action操作

Action操作将触发基于RDD依赖关系的计算。

  1. import os
  2. import pyspark
  3. from pyspark import SparkContext, SparkConf
  4. conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
  5. sc = SparkContext(conf=conf)
  6. # 使用 parallelize方法直接实例化一个RDD
  7. rdd = sc.parallelize(range(1,11),4) # 这里的 4 指的是分区数量
  8. rdd.take(100)
  9. # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  10. ----------------------------------------------
  11. Action算子解析
  12. ----------------------------------------------
  13. # 1. collect: 指的是把数据都汇集到driver端,便于后续的操作
  14. rdd = sc.parallelize(range(0, 5))
  15. rdd_collect = rdd.collect()
  16. print(rdd_collect)
  17. # [0, 1, 2, 3, 4]
  18. # 2. first: 取第一个元素
  19. sc.parallelize([2, 3, 4]).first()
  20. # 2
  21. # 3. collectAsMap: 转换为dict,使用这个要注意了,不要对大数据用,不然全部载入到driver端会爆内存
  22. m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
  23. m
  24. # {1: 2, 3: 4}
  25. # 4. reduce: 逐步对两个元素进行操作
  26. rdd = sc.parallelize(range(10),5)
  27. print(rdd.reduce(lambda x,y:x+y))
  28. # 45
  29. # 5. countByKey/countByValue:
  30. rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
  31. print(sorted(rdd.countByKey().items()))
  32. print(sorted(rdd.countByValue().items()))
  33. # [('a', 2), ('b', 1)]
  34. # [(('a', 1), 2), (('b', 1), 1)]
  35. # 6. take: 相当于取几个数据到driver端
  36. rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
  37. print(rdd.take(5))
  38. # [('a', 1), ('b', 1), ('a', 1)]
  39. # 7. saveAsTextFile: 保存rdd成text文件到本地
  40. text_file = "./data/rdd.txt"
  41. rdd = sc.parallelize(range(5))
  42. rdd.saveAsTextFile(text_file)
  43. # 8. takeSample: 随机取数
  44. rdd = sc.textFile("./test/data/hello_samshare.txt", 4) # 这里的 4 指的是分区数量
  45. rdd_sample = rdd.takeSample(True, 2, 0) # withReplacement 参数1:代表是否是有放回抽样
  46. rdd_sample
  47. # 9. foreach: 对每一个元素执行某种操作,不生成新的RDD
  48. rdd = sc.parallelize(range(10), 5)
  49. accum = sc.accumulator(0)
  50. rdd.foreach(lambda x: accum.add(x))
  51. print(accum.value)

全面解析Spark,以及和Python的对接 - 古明地盆 - 博客园 (cnblogs.com)icon-default.png?t=M0H8https://www.cnblogs.com/traditional/p/11724876.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/556891
推荐阅读
相关标签
  

闽ICP备14008679号