赞
踩
IDEA运行spark攻略:https://blog.csdn.net/qq_40343117/article/details/101158794
本篇博客知识介绍了许多简单时常用的算子,具体复杂一些的,会单独开章,这样看理解的更清晰一些。
transformation翻译过来就是转换的意思,作用是将一个RDD重新构建成一个新的RDD。
接受函数,将其应用到RDD的每一个元素上,返回一个新的RDD,返回的同时可以指定相应的逻辑,,例如下面的例子,我们将原来numRDD中的每个元素的平方传到reslutRDD形成一个新的RDD对象。
object MapTest {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("map function").setMaster("local")
val sc=new SparkContext(conf)
// 初始化数组
val arr=Array(1,2,3,4,5,6)
val numRDD=sc.parallelize(arr)
//返回一个新的RDD
//这里的collect是一个action算子,后面会有介绍
val reslutRDD=numRDD.map(x=>x*x).collect().foreach(println)
}
}
结果:
1
4
9
16
25
36
接收函数,返回只包含满足了filter中判定条件的元素。
object FilterTest {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("filter").setMaster("local")
val sc=new SparkContext(conf)
val arr=Array(1,2,3,4,5,6)
val nRDD=sc.parallelize(arr)
//将符合计算结果的返回给一个新的RDD
val rRDD=nRDD.filter(_%2==0).collect.foreach(println)
}
}
结果:
2
4
6
将RDD中的元素按照规则拍扁成一个新的RDD对象。
object FlatMapTest {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("flatmap").setMaster("local")
val sc=new SparkContext(conf)
val words=Array("hello lxr","i love","you you")
//将words中的元素按照“ ”分割后,将每个分割出来的元素存入新的RDD
val wordRDD=sc.parallelize(words).flatMap(_.split(" ")).collect.foreach(println)
}
}
结果:
hello
lxr
i
love
you
you
对两个RDD求交集,并将结果放入一个新的RDD中。
//交集
object IntersectionTest {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("intersection").setMaster("local")
val sc=new SparkContext(conf)
val r1=sc.parallelize(Array(2,4,6,8,10))
val r2=sc.parallelize(Array(6,8,10,12))
val reslutRDD=r1.intersection(r2).collect.foreach(println)
}
}
结果:
6
8
10
求两个集合的并集,并将结果放入新的RDD中。
//并集
object UnionTest {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("union")
val sc=new SparkContext(conf)
//1 to 5就是取1,2,3,4,5
val r1=sc.parallelize(1 to 5)
val r2=sc.parallelize(10 to 13)
val reslutRDD=r1.union(r2).collect.foreach(println)
}
}
结果:
1
2
3
4
5
10
11
12
13
对指定索引的元素进行去重处理,但是只是对指定的索引,例如下面的例子,的意思就像是java中的*通配符,可以指代集合中的元素,而我们制定的(._2)就代表Array中的每个元组中的第二个元素,所以去重操作也只针对第二个元素,结果是按照学科去重,要想整行进行处理,会用到groupbykey等算子进行操作,后面我们会讲到,这列只是针对distinct的一个解释。
object DistinctTest {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("distinct").setMaster("local")
val sc=new SparkContext(conf)
val arr=Array(("lxr","math",100),("wlp","math",60),("lxr","english",100))
val arRDD=sc.parallelize(arr)
val reslutRDD=arRDD.map(_._2).distinct().collect().foreach(println)
}
}
结果:
math
english
跟reduce操作差不多,不过他是针对KEY来计算的,比如下面的例子我们会得到,相同key的value按照我们设定逻辑累加得到结果。
import org.apache.spark.{SparkConf, SparkContext}
//相同key的value形成集合,可自定义逻辑
object ReduceByKeyTest {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("group by key").setMaster("local")
val sc=new SparkContext(conf)
val numRDD=Array(Tuple2("wlp",80),Tuple2("wlp",100),Tuple2("lxr",101))
val reslut=sc.parallelize(numRDD)
reslut.reduceByKey(_+_).collect().foreach(println)
}
}
结果:
(wlp,180)
(lxr,101)
将相同key的的value值放入一个集合内显示,其中map(x=>(x._1,x._2))
表示用一个map接受RDD的内容,x._1表示第一个keyvalue对中的key,x._2表示value
//按照新的逻辑形成一个新的的RDD
object GroupByKeyTest {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("group by key").setMaster("local")
val sc=new SparkContext(conf)
val numRDD=Array(("wlp",80),("wlp",100),("lxr",101))
val reslut=sc.parallelize(numRDD).map(x=>(x._1,x._2)).groupByKey.collect.foreach(println)
// reslut.groupByKey.collect.foreach(_._2.foreach(println))
}
}
结果:
(wlp,CompactBuffer(80, 100))
(lxr,CompactBuffer(101))
针对KV对的V做累加或者其他操作,K不变。
examole:Array((1,2),(3,4),(5,6))
//执行
rdd.mapValues(x=>x+1)
//结果 :((1,3),(3,5),(5,7))
只返回key/value的值。
examole:Array((1,2),(3,4),(5,6))
//执行
rdd.keys
//结果 :{1,3,5}
rdd.values
//结果{2,4,6}
将RDD中的内容放入map排序输出。
object SortByKeyTest {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("group by key").setMaster("local")
val sc=new SparkContext(conf)
val numRDD=Array(Tuple2("wlp",80),Tuple2("wlp",100),Tuple2("lxr",101))
val reslutRDD=sc.parallelize(numRDD)
// false降序,true升序
// x._2,x._1表示按照第二个元素排序
// 变换两次,所以第二次._2的时候又变成原来的顺序
val sortRDD=reslutRDD.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
// val sortRDD=reslutRDD.sortByKey(false).map(x=>(x._2,x._1))
sortRDD.collect().foreach(println)
}
}
结果:
(lxr,101)
(wlp,100)
(wlp,80)
spark中所有的transformation操作都是lazy的,也就是一个懒操作,当我们不去指定的时候,他是不会运行的,所以我们引入action这个概念,他就是用来在RDD上计算出一个结果,并将结果返回到Driver program显示或保存在文件系统中。
遍历整个RDD,向driver program返回该RDD的内容,是一个很重要的action算子,但需要是以单机内存可以容纳为前提,因为collect在集群运行的时候会将原来存储在其他节点的RDD数据一下汇聚到driver的节点上,并且是存储在数组中的,会占用大量的JVM内存,这是非常容易造成内存溢出问题的,所以一般都用于测试操作,可以完整的看到RDD内容,对于大数据的文件,我们还是使用saveAsTextFile这个action,将结果保存在硬盘的一个文件中。
这个就不举例了,因为基本我们的每个测试例题都用到了他,就是起到一个遍历作用。
提一点,或许有的朋友会问,在IDEA中我直接用foreach就可以输出,为什么还要加一个collect呢?
这个是有本质区别的,我们用collect会将数据收集在一起,但是你可以不打印输出出来,用collect.foreach是将collect集合好的数据遍历输出,而直接foreach是遍历输出你RDD中的集合元素,或许结果是一样的,但是他本质是不一样的,一定要注意我们collect的作用是遍历RDD,而不是遍历输出RDD。
接收一个函数,作用在RDD两个类型相同的元素上,返回一个新的元素,这个作用可以是累加、计数或者其他类型的聚集操作。
下面的例子我们就求出Array中相同类型的元素累加和。
object ReduceTest {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("group by key").setMaster("local")
val sc=new SparkContext(conf)
val numRDD=Array(1,5,6,8,9,10)
val reslut=sc.parallelize(numRDD)
println(reslut.reduce(_+_))
}
}
结果:
39
返回RDD的前n个元素,就是RDD中从0到n之间的元素,并尝试访问最少的partitions分区,意思就是分区时尽量不每个分区都访问一次那种。
例如:
val conf=new SparkConf().setAppName("action").setMaster("local")
val sc=new SparkContext(conf)
val RDD=sc.parallelize(Array(5,8,2,6,7,2,1))
RDD.take(2).foreach(println)
结果:
5
8
按照RDD默认的比较器进行排序输出top(n),当然也可以自定义一个比较器来实现。(RDD中默认的是从大到小排列)与take不同的是,take只取前n个,top是取排序后的前n个。
object actiontest {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("action").setMaster("local")
val sc=new SparkContext(conf)
val RDD=sc.parallelize(Array(5,8,2,6,7,2,1))
RDD.top(3).foreach(println)
}
}
结果:
8
7
6
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。