当前位置:   article > 正文

Spark RDD的Transformation操作_在spark中,对rdd的transformation操作有哪些?

在spark中,对rdd的transformation操作有哪些?

 

https://blog.csdn.net/zzh118/article/details/51998163

1、创建RDD的两种方式: 
(1)、从HDFS读入数据产生RDD; 
(2)、有其他已存在的RDD转换得到新的RDD;

  1. scala> val textFile = sc.textFile("hdfs://192.169.26.58:9000/home/datamining/zhaozhuohui/workspace/test01.txt")
  2. scala> val tf2 = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
  3. scala> tf2.clollect

2、RDD主要有三种类型的操作:Transformation、Action、Persist。

3、Transformation操作的懒加载机制:避免产生中间结果数据,在Action操作时才进行真正的操作。这样一连串的操作一起执行就有优化的空间。

4、Transformation的map操作:map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 
这里写图片描述

  1. scala> val rdd1 = sc.parallelize(1 to 9, 3)
  2. rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27
  3. scala> val rdd2 = rdd1.map(_ * 2)
  4. rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at map at <console>:29
  5. scala> rdd2.collect
  6. res12: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

5、flatMap:与map类似,flatMap会将经过函数处理的元素生成到一个RDD中。 
这里写图片描述

  1. scala> val rdd1 = sc.parallelize(1 to 4, 2)
  2. rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:27
  3. scala> val rdd2 = rdd1.flatMap(x => 1 to x)
  4. rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at flatMap at <console>:29
  5. scala> rdd2.collect
  6. res14: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

6、mapPartitions:map的一个变种。输入函数作用于每个分区,每个分区作为整体来处理。输入函数的参数是迭代器,返回值也是一个迭代器。处理后的合并结果会自动转化成一个新的RDD。

  1. scala> val rdd1 = sc.parallelize(1 to 9, 3)
  2. rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:27
  3. scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
  4. | var res = List[(T, T)]()
  5. | var pre = iter.next
  6. | while (iter.hasNext)
  7. | {
  8. | val cur = iter.next;
  9. | res .::= (pre, cur)
  10. | pre = cur;
  11. | }
  12. | res.iterator
  13. | }
  14. myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
  15. scala> rdd1.mapPartitions(myfunc).collect
  16. res17: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

7、glom:将每个分区中的元素转换成Array,这样每个分区就只有一个数组元素,最终返回一个RDD 
这里写图片描述

  1. scala> var rdd1 = sc.makeRDD(1 to 10, 3)
  2. rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at makeRDD at <console>:27
  3. scala> rdd1.partitions.size
  4. res18: Int = 3
  5. scala> rdd1.glom().collect
  6. res19: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

8、Transformation的filter操作:筛选出输入函数计算结果为true的元素,放入到一个新的RDD中。

  1. def funOps2(): Unit = {
  2. val a = sc.parallelize(1 to 10, 3)
  3. val b = a.filter(_ % 2 == 0)
  4. b.collect
  5. }

9、Transformation的distinct操作

  1. scala> val a = sc.parallelize(List("tom", "jim", "sherry", "dog", "tom"))
  2. a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27
  3. scala> a.distinct.collect
  4. res1: Array[String] = Array(jim, tom, dog, sherry)
  5. scala> val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 1, 3, 2))
  6. b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27
  7. scala> b.distinct(2).partitions.length
  8. res2: Int = 2

10、Transformation的cartesian计算两个RDD的笛卡尔积

  1. scala> val x = sc.parallelize(List(1, 2, 3, 4, 5))
  2. x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:27
  3. scala> val y = sc.parallelize(List(6, 7, 8, 9, 10))
  4. y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:27
  5. scala> x.cartesian(y).collect
  6. res3: Array[(Int, Int)] = Array((1,6), (1,7), (2,6), (2,7), (1,8), (1,9), (1,10), (2,8), (2,9), (2,10), (3,6), (3,7), (4,6), (4,7), (5,6), (5,7), (3,8), (3,9), (3,10), (4,8), (4,9), (4,10), (5,8), (5,9), (5,10))

11、Transformation的union,++操作,两个RDD取并集

  1. scala> val a = sc.parallelize(3 to 6, 1)
  2. a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:27
  3. scala> val b = sc.parallelize(5 to 7, 1)
  4. b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27
  5. scala> (a ++ b).collect
  6. res4: Array[Int] = Array(3, 4, 5, 6, 5, 6, 7)

12、Transformation的mapValues操作,处理两个元素的tuple构成的RDD。把mapValue参数传入的输入函数应用到每个value上,生成一个由两个元素的tuple构成的新的RDD。

  1. scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panda", "eagle"))
  2. a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:27
  3. scala> val b = a.map(x => (x.length, x))
  4. b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[15] at map at <console>:31
  5. scala> b.mapValues("x" + _ + "x").collect
  6. res6: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (5,xpandax), (5,xeaglex))

13、Transformation的subtract操作,取两个RDD的差集,返回一个新的RDD。

  1. scala> val a = sc.parallelize(1 to 9, 1)
  2. a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:27
  3. scala> val b = sc.parallelize(1 to 5, 1)
  4. b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:27
  5. scala> val c = a.subtract(b)
  6. c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at subtract at <console>:31
  7. scala> c.collect
  8. res7: Array[Int] = Array(6, 7, 8, 9)

14、Transformation的sample操作,随机从RDD中取出一个片段作为一个新的RDD。

  1. scala> val a = sc.parallelize(1 to 10, 1)
  2. a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:27
  3. scala> a.sample(true, 0.5, 0).count
  4. res9: Long = 4
  5. scala> a.sample(true, 0.2, 12).count
  6. res10: Long = 2

15、takeSample随机取指定数目的元素,返回的是数组不是RDD

  1. scala> val x = sc.parallelize(1 to 10, 1)
  2. x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:27
  3. scala> x.takeSample(true, 5, 1)
  4. res12: Array[Int] = Array(3, 4, 7, 10, 3)

16、Transformation的groupByKey操作,用于将RDD[K,V]中每个K对应的V值,合并到一个集合Iterable[V]中。

  1. scala> var rdd1 = sc.makeRDD(Array(("A", 0), ("A", 2), ("B", 1), ("c", 1)))
  2. rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at makeRDD at <console>:27
  3. scala> rdd1.groupByKey().collect
  4. res13: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(1)), (A,CompactBuffer(0, 2)), (c,CompactBuffer(1)))

17、Transformation的partitionBy操作,根据传入的分区器进行分区。

18、Transformation的cogroup操作,相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空。

  1. scala> var rdd1 = sc.makeRDD(Array(("A", "1"), ("B", "2"), ("C", "3")), 2)
  2. rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at <console>:27
  3. scala> var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")), 2)
  4. rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[35] at makeRDD at <console>:27
  5. scala> var rdd3 = rdd1.cogroup(rdd2)
  6. rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[37] at cogroup at <console>:31
  7. scala> rdd3.collect
  8. res16: Array[(String, (Iterable[String], Iterable[String]))] = Array((B,(CompactBuffer(2),CompactBuffer())), (D,(CompactBuffer(),CompactBuffer(d))), (A,(CompactBuffer(1),CompactBuffer(a))), (C,(CompactBuffer(3),CompactBuffer(c))))

19、Transformation的combineByKey操作,使用用户设置好的聚合函数对每个key中得value进行组合(combine),可以将输入类型为RDD[(k, v)]转成RDD[(k, c)]。

20、Transformation的reduceByKey操作,该函数用于将RDD[K,V]中每个K对应的V值根据映射函数来两两运算。

  1. scala> var rdd1 = sc.makeRDD(Array(("A", 0), ("A", 2), ("B", 1), ("B", 2),("c", 1)))
  2. rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at makeRDD at <console>:27
  3. scala> rdd1.partitions.length
  4. res14: Int = 2
  5. scala> var rdd2 = rdd1.reduceByKey(_ + _)
  6. rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at <console>:29
  7. scala> rdd2.collect
  8. res15: Array[(String, Int)] = Array((B,3), (A,2), (c,1))

21、Transformation的join操作,def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))],要进行join操作的两个RDD的每个元素必须是两个子元素的tuple.

  1. scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
  2. rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[38] at makeRDD at <console>:27
  3. scala> var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
  4. rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[39] at makeRDD at <console>:27
  5. scala> rdd1.join(rdd2).collect
  6. res17: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))

22、Transformation的leftOuterJoin、rightOuterJoin操作。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/736392
推荐阅读
相关标签
  

闽ICP备14008679号