赞
踩
https://blog.csdn.net/zzh118/article/details/51998163
1、创建RDD的两种方式:
(1)、从HDFS读入数据产生RDD;
(2)、有其他已存在的RDD转换得到新的RDD;
- scala> val textFile = sc.textFile("hdfs://192.169.26.58:9000/home/datamining/zhaozhuohui/workspace/test01.txt")
- scala> val tf2 = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
- scala> tf2.clollect
2、RDD主要有三种类型的操作:Transformation、Action、Persist。
3、Transformation操作的懒加载机制:避免产生中间结果数据,在Action操作时才进行真正的操作。这样一连串的操作一起执行就有优化的空间。
4、Transformation的map操作:map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。
- scala> val rdd1 = sc.parallelize(1 to 9, 3)
- rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27
- scala> val rdd2 = rdd1.map(_ * 2)
- rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at map at <console>:29
- scala> rdd2.collect
- res12: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
5、flatMap:与map类似,flatMap会将经过函数处理的元素生成到一个RDD中。
- scala> val rdd1 = sc.parallelize(1 to 4, 2)
- rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:27
- scala> val rdd2 = rdd1.flatMap(x => 1 to x)
- rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at flatMap at <console>:29
- scala> rdd2.collect
- res14: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)
6、mapPartitions:map的一个变种。输入函数作用于每个分区,每个分区作为整体来处理。输入函数的参数是迭代器,返回值也是一个迭代器。处理后的合并结果会自动转化成一个新的RDD。
- scala> val rdd1 = sc.parallelize(1 to 9, 3)
- rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:27
- scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
- | var res = List[(T, T)]()
- | var pre = iter.next
- | while (iter.hasNext)
- | {
- | val cur = iter.next;
- | res .::= (pre, cur)
- | pre = cur;
- | }
- | res.iterator
- | }
- myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
-
- scala> rdd1.mapPartitions(myfunc).collect
- res17: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
7、glom:将每个分区中的元素转换成Array,这样每个分区就只有一个数组元素,最终返回一个RDD
- scala> var rdd1 = sc.makeRDD(1 to 10, 3)
- rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at makeRDD at <console>:27
- scala> rdd1.partitions.size
- res18: Int = 3
- scala> rdd1.glom().collect
- res19: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
8、Transformation的filter操作:筛选出输入函数计算结果为true的元素,放入到一个新的RDD中。
- def funOps2(): Unit = {
- val a = sc.parallelize(1 to 10, 3)
- val b = a.filter(_ % 2 == 0)
- b.collect
- }
9、Transformation的distinct操作
- scala> val a = sc.parallelize(List("tom", "jim", "sherry", "dog", "tom"))
- a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27
- scala> a.distinct.collect
- res1: Array[String] = Array(jim, tom, dog, sherry)
- scala> val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 1, 3, 2))
- b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27
- scala> b.distinct(2).partitions.length
- res2: Int = 2
10、Transformation的cartesian计算两个RDD的笛卡尔积
- scala> val x = sc.parallelize(List(1, 2, 3, 4, 5))
- x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:27
- scala> val y = sc.parallelize(List(6, 7, 8, 9, 10))
- y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:27
- scala> x.cartesian(y).collect
- res3: Array[(Int, Int)] = Array((1,6), (1,7), (2,6), (2,7), (1,8), (1,9), (1,10), (2,8), (2,9), (2,10), (3,6), (3,7), (4,6), (4,7), (5,6), (5,7), (3,8), (3,9), (3,10), (4,8), (4,9), (4,10), (5,8), (5,9), (5,10))
11、Transformation的union,++操作,两个RDD取并集
- scala> val a = sc.parallelize(3 to 6, 1)
- a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:27
- scala> val b = sc.parallelize(5 to 7, 1)
- b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27
- scala> (a ++ b).collect
- res4: Array[Int] = Array(3, 4, 5, 6, 5, 6, 7)
12、Transformation的mapValues操作,处理两个元素的tuple构成的RDD。把mapValue参数传入的输入函数应用到每个value上,生成一个由两个元素的tuple构成的新的RDD。
- scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panda", "eagle"))
- a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:27
- scala> val b = a.map(x => (x.length, x))
- b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[15] at map at <console>:31
- scala> b.mapValues("x" + _ + "x").collect
- res6: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (5,xpandax), (5,xeaglex))
13、Transformation的subtract操作,取两个RDD的差集,返回一个新的RDD。
- scala> val a = sc.parallelize(1 to 9, 1)
- a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:27
- scala> val b = sc.parallelize(1 to 5, 1)
- b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:27
- scala> val c = a.subtract(b)
- c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at subtract at <console>:31
- scala> c.collect
- res7: Array[Int] = Array(6, 7, 8, 9)
14、Transformation的sample操作,随机从RDD中取出一个片段作为一个新的RDD。
- scala> val a = sc.parallelize(1 to 10, 1)
- a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:27
- scala> a.sample(true, 0.5, 0).count
- res9: Long = 4
- scala> a.sample(true, 0.2, 12).count
- res10: Long = 2
15、takeSample随机取指定数目的元素,返回的是数组不是RDD
- scala> val x = sc.parallelize(1 to 10, 1)
- x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:27
- scala> x.takeSample(true, 5, 1)
- res12: Array[Int] = Array(3, 4, 7, 10, 3)
16、Transformation的groupByKey操作,用于将RDD[K,V]中每个K对应的V值,合并到一个集合Iterable[V]中。
- scala> var rdd1 = sc.makeRDD(Array(("A", 0), ("A", 2), ("B", 1), ("c", 1)))
- rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at makeRDD at <console>:27
- scala> rdd1.groupByKey().collect
- res13: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(1)), (A,CompactBuffer(0, 2)), (c,CompactBuffer(1)))
17、Transformation的partitionBy操作,根据传入的分区器进行分区。
18、Transformation的cogroup操作,相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空。
- scala> var rdd1 = sc.makeRDD(Array(("A", "1"), ("B", "2"), ("C", "3")), 2)
- rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at <console>:27
- scala> var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")), 2)
- rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[35] at makeRDD at <console>:27
- scala> var rdd3 = rdd1.cogroup(rdd2)
- rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[37] at cogroup at <console>:31
- scala> rdd3.collect
- res16: Array[(String, (Iterable[String], Iterable[String]))] = Array((B,(CompactBuffer(2),CompactBuffer())), (D,(CompactBuffer(),CompactBuffer(d))), (A,(CompactBuffer(1),CompactBuffer(a))), (C,(CompactBuffer(3),CompactBuffer(c))))
19、Transformation的combineByKey操作,使用用户设置好的聚合函数对每个key中得value进行组合(combine),可以将输入类型为RDD[(k, v)]转成RDD[(k, c)]。
20、Transformation的reduceByKey操作,该函数用于将RDD[K,V]中每个K对应的V值根据映射函数来两两运算。
- scala> var rdd1 = sc.makeRDD(Array(("A", 0), ("A", 2), ("B", 1), ("B", 2),("c", 1)))
- rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at makeRDD at <console>:27
- scala> rdd1.partitions.length
- res14: Int = 2
- scala> var rdd2 = rdd1.reduceByKey(_ + _)
- rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at <console>:29
- scala> rdd2.collect
- res15: Array[(String, Int)] = Array((B,3), (A,2), (c,1))
21、Transformation的join操作,def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))],要进行join操作的两个RDD的每个元素必须是两个子元素的tuple.
- scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
- rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[38] at makeRDD at <console>:27
- scala> var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
- rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[39] at makeRDD at <console>:27
- scala> rdd1.join(rdd2).collect
- res17: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))
22、Transformation的leftOuterJoin、rightOuterJoin操作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。