当前位置:   article > 正文

Spark大数据分析与实战笔记(第三章 Spark RDD 弹性分布式数据集-02)

Spark大数据分析与实战笔记(第三章 Spark RDD 弹性分布式数据集-02)

每日一句正能量

人生很长,不必慌张。你未长大,我要担当。

第3章 Spark RDD弹性分布式数据集

章节概要

传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。Spark中的RDD可以很好的解决这一缺点。

RDD是Spark提供的最重要的抽象概念,我们可以将RDD理解为一个分布式存储在集群中的大型数据集合,不同RDD之间可以通过转换操作形成依赖关系实现管道化,从而避免了中间结果的I/O操作,提高数据处理的速度和性能。接下来,本章将针对RDD进行详细讲解。

3.3 RDD的处理过程

Spark用Scala语言实现了RDD的API,程序开发者可以通过调用API对RDD进行操作处理。RDD经过一系列的“转换”操作,每一次转换都会产生不同的RDD,以供给下一次“转换”操作使用,直到最后一个RDD经过“行动”操作才会被真正计算处理,并输出到外部数据源中,若是中间的数据结果需要复用,则可以进行缓存处理,将数据缓存到内存中。

Spark用Scala语言实现了RDD的API,程序开发者可以通过调用API对RDD进行操作处理。下面,通过一张图来描述RDD的处理过程。
在这里插入图片描述
RDD经过一系列的"转换”操作,每一次转换都会产生不同的RDD,以供给下一次转换”操作使用,直到最后一个RDD经过“行动”操作才会被真正计算处理。

需要注意的是,RDD采用了惰性调用,即在RDD的处理过程中,真正的计算发生在RDD的"行动”操作,对于"行动"之前的所有"转换"操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD相互之间的依赖关系,而不会触发真正的计算处理。

3.3.1 转换算子

RDD处理过程中的“转换”操作主要用于根据已有RDD创建新的RDD,每一次通过Transformation算子计算后都会返回一个新RDD,供给下一个转换算子使用。下面,通过一张表来列举一些常用转换算子操作的API,具体如下。
在这里插入图片描述
下面,我们通过结合具体的示例对这些转换算子API进行详细讲解。

  • filter(func)
    filter(func)操作会筛选出满足函数func的元素,并返回一个新的数据集。假设,有一个文件test.txt(内容如前面所示),下面,通过一张图来描述如何通过filter算子操作,筛选出包含单词“spark”的元素。
    在这里插入图片描述
    通过从test.txt文件中加载数据的方式创建RDD,然后通过map操作将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[6] at textFile at <console>:24

scala> val linesWithSpark=lines.filter(line=>line.contains("spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at <console>:25

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

具体步骤如下:
1.进入到hadoop01,进入/export/data目录,命令如下
cd /export/data
2.修改test.txt文件的内容与源数据保持一致(vi test.txt)。

hadoop spark
itcast     heima
scala      spark
spark     itcast
iscast     hadoop
  • 1
  • 2
  • 3
  • 4
  • 5

3.进入到spark shell(参考之前的启动)。

cd export/servers/spark/
bin /spark-shell --master local[2]
  • 1
  • 2

4.加载文件并产生RDD,代码如下。

scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[6] at textFile at <console>:24

scala> val linesWithSpark=lines.filter(line=>line.contains("spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at <console>:25
  • 1
  • 2
  • 3
  • 4
  • 5

结果如下图所示
在这里插入图片描述

  • map(func)
    map(func)操作将每个元素传递到函数func中,并将结果返回为一个新的数据集。假设,有一个文件test.txt,接下来,通过一张图来描述如何通过map算子操作把文件内容拆分成一个个的单词并封装在数组对象中,具体过程如下图所示。
    在这里插入图片描述
    通过从test.txt文件中加载数据的方式创建RDD,然后通过map操作将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[9] at textFile at <console>:24

scala> var words=lines.map(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[10] at map at <console>:25
  • 1
  • 2
  • 3
  • 4
  • 5

结果如下所示:
在这里插入图片描述

  • flatMap(func)
    flatMap(func)与map(func)相似,但是每个输入的元素都可以映射到0或者多个输出的结果。有一个文件test.txt,接下来,通过一张图来描述如何通过flatMap算子操作,把文件内容拆分成一个个的单词。具体过程如下图所示。
    在这里插入图片描述
    通过从test.txt文件中加载数据的方式创建RDD,然后通过flatMap操作将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[12] at textFile at <console>:24

scala> val words=lines.flatMap(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:25
  • 1
  • 2
  • 3
  • 4
  • 5

结果如下所示:
在这里插入图片描述

  • groupByKey()
    groupByKey()主要用于(Key,Value)键值对的数据集,将具有相同Key的Value进行分组,会返回一个新的(Key ,lterable)形式的数据集。同样以文件test.txt为例,接下来,通过一张图来描述如何通过groupByKey算子操作,将文件内容中的所有单词进行分组。具体过程如下图所示。
    在这里插入图片描述
    通过groupByKey操作把(Key,Value))键值对类型的RDD,按单词将单词出现的次数进行分组,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[15] at textFile at <console>:24

scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[17] at map at <console>:25

scala> val groupWords=words.groupByKey()
groupWords: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[18] at groupByKey at <console>:25
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

结果如下所示:
在这里插入图片描述

  • reduceByKey(func)
    reduceByKey()主要用于(Key,Value)键值对的数据集,返回的是一个新的(Key,Iterable)形式的数据集,该数据集是每个Key传递给函数func进行聚合运算后得到的结果。同样以文件test.txt,接下来,通过一张图来描述如何通过reduceByKey算子操作统计单词出现的次数。具体过程如下图所示。
    在这里插入图片描述
    通过groupByKey操作把(Key,Value)键值对类型的RDD,按单词将单词出现的次数进行分组,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[20] at textFile at <console>:24

scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:25

scala> var reduceWords=words.reduceByKey((a,b)=>a+b)
reduceWords: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:25
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

结果如下所示:
在这里插入图片描述

3.3.2 行动算子

行动算子主要是将在数据集上运行计算后的数值返回到驱动程序,从而触发真正的计算。下面,通过一张表来列举一些常用行动算子操作的API,具体如下。
在这里插入图片描述
下面,结合具体的示例对这些行动算子API进行详细讲解。

  • count ()
    count()主要用于返回数据集中的元素个数。假设,现有一个arrRdd,如果要统计arrRdd元素的个数,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> arrRdd.count()
res0: Long = 5
  • 1
  • 2
  • 3
  • 4
  • 5
  • first()
    first()主要用于返回教组的第一个元素。现有一个arrRdd,如果要获取arrRdd中第一个元素,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> arrRdd.first()
res1: Int = 1
  • 1
  • 2
  • 3
  • 4
  • 5

从上述结果可以看出,当执行arrRdd.first()操作后返回的结果是1,说明成功获取到了第1个元素。

  • take()
    take()主要用于以数组的形式返回数组集中的前n个元素。现有一个arrRdd,如果要获取arrRdd中的前三个元素,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> arrRdd.take(3)
res2: Array[Int] = Array(1, 2, 3)
  • 1
  • 2
  • 3
  • 4
  • 5

从上述代码可以看出,执行arrRdd.take(3)操作后返回的结果是Array(1,2,3),说明成功获取到了RDD数据集的前3个元素。

  • reduce(func)
    reduce()主要用于通过函数func(输入两个参数并返回一个值)聚合数据集中的元素。现有一个arrRdd,如果要对arrRdd中的元素进行聚合,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> arrRdd.reduce((a,b)=>a+b)
res3: Int = 15
  • 1
  • 2
  • 3
  • 4
  • 5
  • collect()
    collect()主要用于以数组的形式返回数据集中的所有元素。现有一个rdd,如果希望rdd中的元素以数组的形式输出,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> arrRdd.collect()
res4: Array[Int] = Array(1, 2, 3, 4, 5)
  • 1
  • 2
  • 3
  • 4
  • 5
  • foreach(func)
    foreach()主要用于将数据集中的每个元素传递到函数func中运行。现有一个arrRdd,如果希望遍历输出arrRdd中的元素,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> arrRdd.foreach(x=>println(x))
1
2
3
4
5
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

3.3.3 编写WordCount词频统计案例

在Linux本地系统的/export/data目录下,有一个test.txt文件,文件里有多行文本,每行文本都是由2个单词构成,且单词之间都是用空格分隔。现在,我们需要通过RDD统计每个单词出现的次数(即词频),具体操作过程如下。
在这里插入图片描述
具体参见书本内容

转载自:https://blog.csdn.net/u014727709/article/details/136032993
欢迎

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/151965
推荐阅读
相关标签