赞
踩
人生很长,不必慌张。你未长大,我要担当。
传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。Spark中的RDD可以很好的解决这一缺点。
RDD是Spark提供的最重要的抽象概念,我们可以将RDD理解为一个分布式存储在集群中的大型数据集合,不同RDD之间可以通过转换操作形成依赖关系实现管道化,从而避免了中间结果的I/O操作,提高数据处理的速度和性能。接下来,本章将针对RDD进行详细讲解。
Spark用Scala语言实现了RDD的API,程序开发者可以通过调用API对RDD进行操作处理。RDD经过一系列的“转换”操作,每一次转换都会产生不同的RDD,以供给下一次“转换”操作使用,直到最后一个RDD经过“行动”操作才会被真正计算处理,并输出到外部数据源中,若是中间的数据结果需要复用,则可以进行缓存处理,将数据缓存到内存中。
Spark用Scala语言实现了RDD的API,程序开发者可以通过调用API对RDD进行操作处理。下面,通过一张图来描述RDD的处理过程。
RDD经过一系列的"转换”操作,每一次转换都会产生不同的RDD,以供给下一次转换”操作使用,直到最后一个RDD经过“行动”操作才会被真正计算处理。
需要注意的是,RDD采用了惰性调用,即在RDD的处理过程中,真正的计算发生在RDD的"行动”操作,对于"行动"之前的所有"转换"操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD相互之间的依赖关系,而不会触发真正的计算处理。
RDD处理过程中的“转换”操作主要用于根据已有RDD创建新的RDD,每一次通过Transformation算子计算后都会返回一个新RDD,供给下一个转换算子使用。下面,通过一张表来列举一些常用转换算子操作的API,具体如下。
下面,我们通过结合具体的示例对这些转换算子API进行详细讲解。
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[6] at textFile at <console>:24
scala> val linesWithSpark=lines.filter(line=>line.contains("spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at <console>:25
具体步骤如下:
1.进入到hadoop01,进入/export/data目录,命令如下
cd /export/data
2.修改test.txt文件的内容与源数据保持一致(vi test.txt
)。
hadoop spark
itcast heima
scala spark
spark itcast
iscast hadoop
3.进入到spark shell(参考之前的启动)。
cd export/servers/spark/
bin /spark-shell --master local[2]
4.加载文件并产生RDD,代码如下。
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[6] at textFile at <console>:24
scala> val linesWithSpark=lines.filter(line=>line.contains("spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at <console>:25
结果如下图所示
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[9] at textFile at <console>:24
scala> var words=lines.map(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[10] at map at <console>:25
结果如下所示:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[12] at textFile at <console>:24
scala> val words=lines.flatMap(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:25
结果如下所示:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[15] at textFile at <console>:24
scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[17] at map at <console>:25
scala> val groupWords=words.groupByKey()
groupWords: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[18] at groupByKey at <console>:25
结果如下所示:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[20] at textFile at <console>:24
scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:25
scala> var reduceWords=words.reduceByKey((a,b)=>a+b)
reduceWords: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:25
结果如下所示:
行动算子主要是将在数据集上运行计算后的数值返回到驱动程序,从而触发真正的计算。下面,通过一张表来列举一些常用行动算子操作的API,具体如下。
下面,结合具体的示例对这些行动算子API进行详细讲解。
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.count()
res0: Long = 5
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.first()
res1: Int = 1
从上述结果可以看出,当执行arrRdd.first()操作后返回的结果是1,说明成功获取到了第1个元素。
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.take(3)
res2: Array[Int] = Array(1, 2, 3)
从上述代码可以看出,执行arrRdd.take(3)操作后返回的结果是Array(1,2,3),说明成功获取到了RDD数据集的前3个元素。
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.reduce((a,b)=>a+b)
res3: Int = 15
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.collect()
res4: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.foreach(x=>println(x))
1
2
3
4
5
在Linux本地系统的/export/data目录下,有一个test.txt文件,文件里有多行文本,每行文本都是由2个单词构成,且单词之间都是用空格分隔。现在,我们需要通过RDD统计每个单词出现的次数(即词频),具体操作过程如下。
具体参见书本内容
转载自:https://blog.csdn.net/u014727709/article/details/136032993
欢迎
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。