赞
踩
RDD,全称为 Resilient Distributed Datasets,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。同时,RDD 还提供了一组丰富的操作来操作这些数据。在这些操作中,诸如 map、flatMap、filter 等转换操作实现了 monad 模式,很好地契合了 Scala 的集合操作。除此之外,RDD 还提供了诸如 join、groupBy、reduceByKey 等更为方便的操作(注意,reduceByKey 是 action,而非 transformation),以支持常见的数据运算
RDD 支持两种操作:转化操作和行动操作。RDD 的转化操作是返回一
个新的 RDD 的操作,比如 map() 和 filter(),而行动操作则是向驱动器程序返回结果或
把结果写入外部系统的操作,会触发实际的计算,比如 count() 和 first()。Spark 对待
转化操作和行动操作的方式很不一样,因此理解你正在进行的操作的类型是很重要的。如
果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类
型:转化操作返回的是 RDD,而行动操作返回的是其他的数据类型
。
通常来讲,针对数据处理有几种常见模型,包括:Iterative Algorithms,Relational Queries,MapReduce,Stream Processing。例如 Hadoop MapReduce 采用了 MapReduces 模型,Storm 则采用了 Stream Processing 模型。RDD 混合了这四种模型,使得 Spark 可以应用于各种大数据处理场景。
RDD 作为数据结构,本质上是一个只读的分区记录集合。一个 RDD 可以包含多个分区,每个分区就是一个 dataset 片段。RDD 可以相互依赖。如果 RDD 的每个分区最多只能被一个 Child RDD 的一个分区使用,则称之为 narrow dependency;若多个 Child RDD 分区都可以依赖,则称之为 wide dependency。不同的操作依据其特性,可能会产生不同的依赖。例如 map 操作会产生 narrow dependency,而 join 操作则产生 wide dependency。
创建RDD
1.使用程序中的集合创建RDD
2.使用本地文件系统创建RDD
3.使用HDS创建RDD
4.基于DB创建RDD
5.基于NoSQL,例如HBase
6.基于s3创建RDD
7.基于数据流创建RDD
调用SparkContext 的 parallelize(),将一个存在的集合,变成一个RDD
- package com.rdd.spark
-
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
-
- /**
- * 通过集合获取rdd
- * */
- object RDDBaseCollections {
- def main(args: Array[String] ){
- /**
- * 第1步,创建Spark的配置对象SparkConf ,设置Spark程序的运行时的配置信息。
- * */
- val conf = new SparkConf() ;//创建SparkConf 对象
- conf.setAppName("Wow,my first spark app"); // 设置应用程序的名称,在程序运行的监控面可以看到名称
- conf.setMaster("local");//此时,程序在本地运行,不需要安装Spark集群
- /**
- * 第2步:创建SparkContext 对象
- * Sparkcontext 是spark 程序所有功能的唯一入口,无论是采用Scala ,Java 、Python, R等都必须有一个Spark
- * Sparkcontext 核心作用:初始spark 应用程序 运行所有需要的核心组件,DAGScheduker,TaskScheduker
- * 同时还会负责Spark程序Master注册程序
- * Sparkcontext 是整个 应用中最为至关重要的一个对象
- *
- * */
- val sc = new SparkContext(conf);//创建sparkContext对象,通过sparkConf实例来定制spark运行
-
- var numbers = 1 to 100 //创建一个scala的集合
- var rdd = sc.parallelize( numbers)
- var sum = rdd.reduce(_+_)//1+2 =3 3+3=6 6+4=10 ....
- println("sum="+sum)
- }
- }
- package com.rdd.spark
-
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
-
- /**
- * 通过文件获取rdd
- * */
- object RDDBaseLocalFile {
- def main(args: Array[String] ){
- /**
- * 第1步,创建Spark的配置对象SparkConf ,设置Spark程序的运行时的配置信息。
- * */
- val conf = new SparkConf() ;//创建SparkConf 对象
- conf.setAppName("RDDBaseLocalFile"); // 设置应用程序的名称,在程序运行的监控面可以看到名称
- conf.setMaster("local");//此时,程序在本地运行,不需要安装Spark集群
- /**
- * 第2步:创建SparkContext 对象
- * Sparkcontext 是spark 程序所有功能的唯一入口,无论是采用Scala ,Java 、Python, R等都必须有一个Spark
- * Sparkcontext 核心作用:初始spark 应用程序 运行所有需要的核心组件,DAGScheduker,TaskScheduker
- * 同时还会负责Spark程序Master注册程序
- * Sparkcontext 是整个 应用中最为至关重要的一个对象
- *
- * */
- val sc = new SparkContext(conf);//创建sparkContext对象,通过sparkConf实例来定制spark运行
- val rdd = sc.textFile("G://work//64bit//scala//spark-1.6.0-bin-hadoop2.6//README.md")
- var linesLength = rdd.map(line => line.length())//map(func):数据集中的每个元素经过用户自定义的函数转换形成一个新的RDD,新的RDD叫MappedRDD
- var sum = linesLength.reduce(_+_)
- println("文件总的长度:"+sum)
-
- }
- }
是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
下面列出一些常见的行动操作(Action API):
* count() 返回数据集中的元素个数
* collect() 以数组的形式返回数据集中的所有元素
* first() 返回数据集中的第一个元素
* take(n) 以数组的形式返回数据集中的前n个元素
* reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
* foreach(func) 将数据集中的每个元素传递到函数func中运行*
- scala> val list = List("Hadoop","Spark","Hive","Spark")
- list: List[String] = List(Hadoop, Spark, Hive, Spark)
-
- scala> val rdd = sc.parallelize(list)
- rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:29
-
- scala> val pairRDD = rdd.map(word => (word,1))
- pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at map at <console>:31
-
- scala> pairRDD.foreach(println)
- (Hadoop,1)
- (Spark,1)
- (Hive,1)
- (Spark,1)
常用的键值对转换操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等,下面我们通过实例来介绍。
reduceByKey(func)的功能是,使用func函数合并具有相同键的值。比如,reduceByKey((a,b) => a+b),有四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),对具有相同key的键值对进行合并后的结果就是:(“spark”,3)、(“hadoop”,8)。可以看出,(a,b) => a+b这个Lamda表达式中,a和b都是指value,比如,对于两个具有相同key的键值对(“spark”,1)、(“spark”,2),a就是1,b就是2。
我们对上面第二种方式创建得到的pairRDD进行reduceByKey()操作,代码如下:
- def reduceByKeyTranformation(sc: SparkContext){
- val lines = sc.textFile("G://work//64bit//scala//spark-1.6.0-bin-hadoop2.6//README.md", 1);//读取本地文件,并设置一个Partition
-
- /**
- * 第4步:对初始的RDD进行Transformation级别的处理,例如 map filter 等高阶函数等的编程,来进行具体的数据计算
- * 4.1步 :将每一行的字符串拆分分成单个的单词
- * */
- val words = lines.flatMap{line => line.split(" ")}; //对每一行的字符串进行拆分 并把所有的拆分结果通过flat 合并成一个
-
-
- /**
- * 第4步:对初始的RDD进行Transformation级别的处理,例如 map filter 等高阶函数等的编程,来进行具体的数据计算
- * 4.2步 : 在单词拆分的基础上对每个单词实例计数为1,也是word=>(word,1)
- * */
-
- val pairs = words.map{word => (word,1)};
-
- /**
- * 第4步:对初始的RDD进行Transformation级别的处理,例如 map filter 等高阶函数等的编程,来进行具体的数据计算
- * 4.3步 : 在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数
- * */
-
- val wordCounts = pairs.reduceByKey(_+_);//对相同的key,进行value的累计(包括Local和Reducer级别同时Reduce)
- wordCounts.foreach(wordNumberPair => println(wordNumberPair._1+":"+wordNumberPair._2));
-
- }
reduceByKey( )是对相同的key进行操作
groupByKey()的功能是,对具有相同键的值进行分组。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),采用groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5))。
我们对上面第二种方式创建得到的pairRDD进行groupByKey()操作,代码如下:
- def groupByKeyTranformation(sc: SparkContext){
- var data =Array(Tuple2(100,"Spark"),Tuple2(100,"scala"),Tuple2(90,"hadoop"),Tuple2(90,"kafka"),Tuple2(80,"java"),Tuple2(80,"Habase"))
- var rddData = sc.parallelize(data)
- var mapkey = rddData.groupByKey()
- mapkey.collect().foreach(print)
- }
(Spark, scala))
(80,CompactBuffer(java, Habase))
(90,CompactBuffer(hadoop, kafka))
scala
keys只会把键值对RDD中的key返回形成一个新的RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用keys后得到的结果是一个RDD[Int],内容是{“spark”,”spark”,”hadoop”,”hadoop”}。
我们对上面第二种方式创建得到的pairRDD进行keys操作,代码如下:
scala
values只会把键值对RDD中的value返回形成一个新的RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用keys后得到的结果是一个RDD[Int],内容是{1,2,3,5}。
我们对上面第二种方式创建得到的pairRDD进行values操作,代码如下:
scala
sortByKey()的功能是返回一个根据键排序的RDD。
我们对上面第二种方式创建得到的pairRDD进行keys操作,代码如下:
scala
我们经常会遇到一种情形,我们只想对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。对于这种情形,Spark提供了mapValues(func),它的功能是,对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的pairRDD,如果执行pairRDD.mapValues(x => x+1),就会得到一个新的键值对RDD,它包含下面四个键值对(“spark”,2)、(“spark”,3)、(“hadoop”,4)和(“hadoop”,6)。
我们对上面第二种方式创建得到的pairRDD进行keys操作,代码如下:
scala
join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。
对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。
比如,pairRDD1是一个键值对集合{(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)},pairRDD2是一个键值对集合{(“spark”,”fast”)},那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合{(“spark”,1,”fast”),(“spark”,2,”fast”)}。对于这个实例,我们下面在spark-shell中运行一下:
进行job操作
- def jobTranformation(sc: SparkContext){//进行job转换操作
- val studentNames = Array(
- Tuple2(1,"Spark"),
- Tuple2(2,"Techyon"),
- Tuple2(3,"Hadoop"),
- Tuple2(4,"Java")
- )
- val studentScores = Array(
- Tuple2(1,100),
- Tuple2(2,90),
- Tuple2(3,65),
- Tuple2(4,80)
- )
- val names = sc.parallelize(studentNames)
- val scores = sc.parallelize(studentScores)
- val studentInfo = names.join(scores)
- studentInfo.collect().foreach(println)
-
- }
运行结果:
- (4,(Java,80))
- (1,(Spark,100))
- (3,(Hadoop,65))
- (2,(Techyon,90))
题目:给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
很显然,对于上面的题目,结果是很显然的,(“spark”,4),(“hadoop”,5)。
下面,我们在spark-shell中演示代码执行过程:
scala
要注意,上面语句中,mapValues(x => (x,1))中出现了变量x,reduceByKey((x,y) => (x._1+y._1,x._2 + y._2))中也出现了变量x,mapValues(x => (x._1 / x._2))也出现了变量x。但是,必须要清楚,这三个地方出现的x,虽然都具有相同的变量名称x,但是,彼此之间没有任何关系,它们都处在不同的变量作用域内。如果你觉得这样会误导自己,造成理解上的掌握,实际上,你可以把三个出现x的地方分别替换成x1、x2、x3也是可以的,但是,很显然没有必要这么做。
上面是完整的语句和执行过程,可能不太好理解,下面我们进行逐条语句分解给大家介绍。每条语句执行后返回的屏幕信息,可以帮助大家更好理解语句的执行效果,比如生成了什么类型的RDD。
(1)首先构建一个数组,数组里面包含了四个键值对,然后,调用parallelize()方法生成RDD,从执行结果反馈信息,可以看出,rdd类型是RDD[(String, Int)]。
scala
(2)针对构建得到的rdd,我们调用mapValues()函数,把rdd中的每个每个键值对(key,value)的value部分进行修改,把value转换成键值对(value,1),其中,数值1表示这个key在rdd中出现了1次,为什么要记录出现次数呢?因为,我们最终要计算每个key对应的平均值,所以,必须记住这个key出现了几次,最后用value的总和除以key的出现次数,就是这个key对应的平均值。比如,键值对(“spark”,2)经过mapValues()函数处理后,就变成了(“spark”,(2,1)),其中,数值1表示“spark”这个键的1次出现。下面就是rdd.mapValues()操作在spark-shell中的执行演示:
scala> rdd.mapValues(x => (x,1)).collect()
res23: Array[(String, (Int, Int))] = Array((spark,(2,1)), (hadoop,(6,1)), (hadoop,(4,1)), (spark,(6,1)))
上面语句中,collect()是一个行动操作,功能是以数组的形式返回数据集中的所有元素,当我们要实时查看一个RDD中的元素内容时,就可以调用collect()函数。
(3)然后,再对上一步得到的RDD调用reduceByKey()函数,在spark-shell中演示如下:
scala
这里,必须要十分准确地理解reduceByKey()函数的功能。可以参考上面我们对该函数的介绍,reduceByKey(func)的功能是使用func函数合并具有相同键的值。这里的func函数就是Lamda表达式(x,y) => (x._1+y._1,x._2 + y._2),这个表达式中,x和y都是value,而且是具有相同key的两个键值对所对应的value,比如,在这个例子中, (“hadoop”,(6,1))和(“hadoop”,(4,1))这两个键值对具有相同的key,所以,对于函数中的输入参数(x,y)而言,x就是(6,1),x._1表示这个键值对中的第1个元素6,x._2表示这个键值对中的第二个元素1,y就是(4,1),y._1表示这个键值对中的第1个元素4,y._2表示这个键值对中的第二个元素1,所以,函数体(x._1+y._1,x._2 + y._2),相当于生成一个新的键值对(key,value),其中,key是x._1+y._1,也就是6+4=10,value是x._2 + y._2,也就是1+1=2,因此,函数体(x._1+y._1,x._2 + y._2)执行后得到的value是(10,2),但是,要注意,这个(10,2)是reduceByKey()函数执行后,”hadoop”这个key对应的value,也就是,实际上reduceByKey()函数执行后,会生成一个键值对(“hadoop”,(10,2)),其中,10表示hadoop书籍的总销量,2表示两天。同理,reduceByKey()函数执行后会生成另外一个键值对(“spark”,(8,2))。
(4)最后,就可以求出最终结果。我们可以对上面得到的两个键值对(“hadoop”,(10,2))和(“spark”,(8,2))所构成的RDD执行mapValues()操作,得到每种书的每天平均销量。当第一个键值对(“hadoop”,(10,2))输入给mapValues(x => (x._1 / x._2))操作时,key是”hadoop”,保持不变,value是(10,2),会被赋值给Lamda表达式x => (x._1 / x._2中的x,因此,x的值就是(10,2),x._1就是10,表示hadoop书总销量是10,x._2就是2,表示2天,因此,hadoop书籍的每天平均销量就是x._1 / x._2,也就是5。mapValues()输出的一个键值对就是(“hadoop”,5)。同理,当把(“spark”,(8,2))输入给mapValues()时,会计算得到另外一个键值对(“spark”,4)。在spark-shell中演示如下:
第一次接触王老师的大数据课程是在2014年底,当时在51CTO上有了spark六阶段,当时真的太吸引我了,但是由于是学生,所以没那么多钱去买教程,真的太后悔了,但是呢!后来看到了《大数据不眠夜:Spark内核天机解密(共100讲)》:http://pan.baidu.com/s/1eQsHZAq和《Scala深入浅出实战经典》http://pan.baidu.com/s/1sjDWG25 ,觉得希望来了,于是自己开始了spark的学习,从scala的一窍不通,到现在可以写一些scala的函数,实现一些业务逻辑,真的太感谢王老师了!也从spark的一窍不通,到现在虽然不算高手吧,但还是算得上半个高手的,真的发现spark太强大了,完全超乎自己的想像!王老师,我一定一定会好好跟着您的脚步,把spark学到手,然后分享给身边的朋友!
现在给大家分享王老师的一些免费视频,希望的大家能好好学一下,中国的下一代人才就从王老师这里开始了!
王家林的第一个中国梦:免费为全社会培养100万名优秀的大数据从业人员!
您可以通过王家林老师的微信号18610086859发红包捐助大数据、互联网+、O2O、工业4.0、微营销、移动互联网等系列免费实战课程, 目前已经发布的王家林免费视频全集如下:
1,《大数据不眠夜:Spark内核天机解密(共100讲)》:http://pan.baidu.com/s/1eQsHZAq
2,《Hadoop深入浅出实战经典》http://pan.baidu.com/s/1mgpfRPu
3,《Spark纯实战公益大讲坛》http://pan.baidu.com/s/1jGpNGwu
4,《Scala深入浅出实战经典》http://pan.baidu.com/s/1sjDWG25
5,《Docker公益大讲坛》http://pan.baidu.com/s/1kTpL8UF
6,《Spark亚太研究院Spark公益大讲堂》http://pan.baidu.com/s/1i30Ewsd
7,DT大数据梦工厂Spark、Scala、Hadoop的所有视频、PPT和代码在百度云网盘的链接:
http://pan.baidu.com/share/home?uk=4013289088#category/type=0&qq-pf-to=pcqq.group
王家林免费在51CTO发布的1000集合大数据spark、hadoop、scala、docker视频:
1,《Scala深入浅出实战初级入门经典视频课程》http://edu.51cto.com/lesson/id-66538.html
2,《Scala深入浅出实战中级进阶经典视频课程》http://edu.51cto.com/lesson/id-67139.html
3,《Akka深入浅出实战经典视频课程》http://edu.51cto.com/lesson/id-77672.html
4,《Spark亚太研究院决胜大数据时代公益大讲堂》http://edu.51cto.com/lesson/id-30815.html
5,《云计算Docker虚拟化公益大讲坛 》http://edu.51cto.com/lesson/id-61776.html
6,《Spark 大讲堂(纯实战手动操作)》http://edu.51cto.com/lesson/id-78653.html
7,《Hadoop深入浅出实战经典视频课程-集群、HDFS、Yarn、MapReduce》http://edu.51cto.com/lesson/id-77141.html
8,《从技术角度思考Hadoop到底是什么》http://edu.51cto.com/course/course_id-1151.html
“DT大数据梦工厂”团队第一个中国梦:免费为社会培养100万名优秀的大数据从业人员。每天早上4点起持续分享大数据、互联网+、O2O、工业4.0、微营销、移动互联网等领域的
精华内容,帮助您和公司在DT时代打造智慧大脑,将生产力提高百倍以上!
DT大数据梦工厂微信公众号:DT_Spark
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。