赞
踩
此种算子不触发作业提交,只有作业遇到action算子后才会进行提交,提交后才会真正启动转换计算
这种算子会触发sparkContext提交作业
foreach
saveAsTextFile
saveAsObjectFile
collect
collectAsMap
reduceByKeyLocally
lookup
count
top
reduce
fold
aggregate
map
概述
单输入单输出,将输入进行映射(就是处理一顿)后进行输出
例子
- // 创建一个list
- scala> var r1 = sc.parallelize(List("hello","world","antg"))
- r1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
-
- // 每个元素都拼接一个字符串-->123然后输出
- scala> r1.map(x=>x+"-->123").collect()
- res1: Array[String] = Array(hello-->123, world-->123, antg-->123)
flatMap
概述
单输入单输出,与map的功能类似,但是会在最终会将结果打平成一个一维集合
例子
- //创建一个嵌套的list
- scala> var r2 = sc.parallelize(List(List(1,2,3),List(4,5,6)))
- r2: org.apache.spark.rdd.RDD[List[Int]] = ParallelCollectionRDD[3] at parallelize at <console>:24
- //输出的结果是一个一维数组
- scala> r2.flatMap(x=>x).collect
- res2: Array[Int] = Array(1, 2, 3, 4, 5, 6)
- //对比一下map
- scala> r2.map(x=>x).collect
- res4: Array[List[Int]] = Array(List(1, 2, 3), List(4, 5, 6))
- 1
glom
概述
以分区为单位,每个分区的值将会形成一个数组
例子
- scala> var r3 = sc.parallelize(List(1,2,3,4,5,6,7),4)
- r3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
-
- scala> r3.glom.collect
- res5: Array[Array[Int]] = Array(Array(1), Array(2, 3), Array(4, 5), Array(6, 7))
mapPartitions
概述
以分区为单位进行计算处理,而map是以每个元素为单位进行计算处理。
当在map过程中需要频繁创建额外对象时,如文件输出流操作、jdbc操作、Socket操作等时,当用mapPartitions算子
例子
- // 创建一份数据,3个分区
- scala> var r4 = sc.parallelize(List(1,2,3,4,5,6),3)
- r4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
- //可以看到mapPartitions是按照分区进行计算的
- scala> r4.mapPartitions(partition=>{var init = 10;println("antg");partition.map(x=>x+init);}).collect
- antg
- antg
- antg
- res8: Array[Int] = Array(11, 12, 13, 14, 15, 16)
union
概述
将两个rdd合成一个rdd,不去重
例子
- scala> val a = sc.parallelize(1 to 4,2)
- a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
-
- scala> val b = sc.parallelize(3 to 6,2)
- b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
-
- scala> a.union(b).collect
- res0: Array[Int] = Array(1, 2, 3, 4, 3, 4, 5, 6)
-
- scala> (a++b).collect
- res1: Array[Int] = Array(1, 2, 3, 4, 3, 4, 5, 6)
-
- scala> (a union b).collect
- res2: Array[Int] = Array(1, 2, 3, 4, 3, 4, 5, 6)
groupBy
概述
按照条件重新分组,输入分区与输出分区多对多型
例子
- scala> val c = sc.parallelize(Seq(1,2,3,4,5,6,100,101,102),3)
- c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
-
- scala> c.groupBy(x=>if(x>=100) ">100" else "<100").collect
- res5: Array[(String, Iterable[Int])] = Array((>100,CompactBuffer(100, 101, 102)), (<100,CompactBuffer(1, 2, 3, 4, 5, 6)))
filter
概述
按照一定条件进行过滤,输出分区为输入分区子集型
例子
- scala> val d = sc.parallelize(1 to 20,4)
- d: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
-
- scala> d.filter(x=>x>=10).collect
- res6: Array[Int] = Array(10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
-
- scala> d.filter(x=>x>=10).glom.collect
- res7: Array[Array[Int]] = Array(Array(), Array(10), Array(11, 12, 13, 14, 15), Array(16, 17, 18, 19, 20))
distinct
概述
去重,输出分区为输入分区子集型,全局去重
例子
- scala> val e = sc.parallelize(1 to 4,2)
- e: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24
-
- scala> val f = sc.parallelize(3 to 6,2)
- f: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24
-
- scala> (e++f).distinct.collect
- res8: Array[Int] = Array(4, 1, 5, 6, 2, 3)
-
- scala> (e++f).distinct.glom.collect
- res9: Array[Array[Int]] = Array(Array(4), Array(1, 5), Array(6, 2), Array(3))
-
- scala> (e++f).glom.collect
- res10: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4), Array(3, 4), Array(5, 6))
cache
概述
cache 将 RDD 元素从磁盘缓存到内存。 相当于 persist(MEMORY_ONLY) 函数的功能。
主要应用在当RDD数据反复被使用的场景下
例子
- val a = sc.parallelize(1 to 4, 2)
- val b = sc.parallelize(3 to 6, 2)
- val c=a.union(b).cache
- c.count
- c.distinct().collect
mapValues
概述
输入分区与输出分区一对一
针对(Key,Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。
例子
- scala> val r3 = sc.parallelize(List(("tom",1),("jack",2),("blus",3),("antg",4)),2)
- r3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:24
-
- scala> r3.mapValues(x=>x+1).collect
- res21: Array[(String, Int)] = Array((tom,2), (jack,3), (blus,4), (antg,5))
combineByKey
例子
- scala> val r1 = sc.parallelize(List(("a",1),("b",2),("c",3),("b",1),("c",2),("d",4)),2)
- r1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
-
- scala> r1.combineByKey(List(_),(x:List[Int],y:Int)=>y::x,(x:List[Int],y:List[Int])=>x:::y).collect
- res1: Array[(String, List[Int])] = Array((d,List(4)), (b,List(2, 1)), (a,List(1)), (c,List(3, 2))
reduceByKey
概述
按key聚合后对组进行归约处理,如求和、连接等操作
例子
- scala> var r2 = sc.parallelize(Array("a","b","c","a","b"))
- r2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:24
-
- scala> r2.map((_,1)).reduceByKey(_+_).collect
- res2: Array[(String, Int)] = Array((a,2), (b,2), (c,1))
概述
对Key-Value结构的RDD进行按Key的join操作,最后将V部分做flatmap打平操作。
例子
- scala> val r3 = sc.parallelize(List(("a",1),("b",2)),2)
- r3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24
-
- scala> val r4 = sc.parallelize(List(("a",3),("b",4)),2)
- r4: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24
-
- scala> r3.join(r4).collect
- res3: Array[(String, (Int, Int))] = Array((b,(2,4)), (a,(1,3)))
这种算子会触发sparkContext提交作业,触发RDD的DAG执行foreach
概述
无输出型,遍历每个元素
例子
- scala> var r5 = sc.parallelize(1 to 5)
- r5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24
-
- scala> r5.foreach(println)
- 3
- 1
- 2
- 4
- 5
saveAsTextFile
概述
指定输出的路径
例子
- scala> val r6 = sc.parallelize(1 to 10)
- r6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24
-
- scala> r6.filter(x=>x>=5).saveAsTextFile("file:///home/job018/fujunhua/data/spark/output01")
-
- [fujunhua@cluster1 spark]$ cd output01/
- [fujunhua@cluster1 output01]$ ls
- part-00000 part-00001 part-00002 part-00003 part-00004 part-00005 part-00006 part-00007 _SUCCESS
- [fujunhua@cluster1 output01]$ cat ./*
- 5
- 6
- 7
- 8
- 9
- 10
例子
- scala> val r7 = sc.parallelize(List(("a",1),("b",2)))
- r7: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at <console>:24
-
- scala> r7.collectAsMap
- res7: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)
lookup
概述
对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。
例子
- scala> val r8 = sc.parallelize(List("小米", "华为", "华米", "大米", "苹果","米老鼠"), 2)
-
- scala> r8.map(x=>({if(x.contains("米")) "有米" else "无米"},x)).lookup("有米")
- res9: Seq[String] = WrappedArray(小米, 华米, 大米, 米老鼠)
reduce
概述
先对两个元素进行reduce函数操作,然后将结果和迭代器取出的下一个元素进行reduce函数操作,直到迭代器遍历完所有元素,得到最后结果。
例子
- scala> val r10 = sc.parallelize(1 to 10)
- r10: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
-
- scala> r10.reduce((x,y)=>x+y)
- res12: Int = 55
fold
概述
ofold算子签名: def fold(zeroValue: T)(op: (T, T) => T): T
其实就是先对rdd分区的每一个分区进行op函数,在调用op函数过程中将zeroValue参与计算,最后在对所有分区的结果调用op函数,同理此处zeroValue再次参与计算。
例子
- //和是41,公式=(1+2+3+4+5+6+10)+10
- sc.parallelize(List(1, 2, 3, 4, 5, 6), 1).fold(10)(_+_)
- //和是51,公式=(1+2+3+10)+(4+5+6+10)+10=51
- sc.parallelize(List(1, 2, 3, 4, 5, 6), 2).fold(10)(_+_)
- //和是61,公式=(1+2+10)+(3+4+10)+(5+6+10)+10=61
- sc.parallelize(List(1, 2, 3, 4, 5, 6), 3).fold(10)(_+_)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。