当前位置:   article > 正文

Spark中RDD概述及RDD算子详解

Spark中RDD概述及RDD算子详解

一、RDD概述

1、RDD: 弹性的分布式数据集

弹性:RDD 中的数据即可以缓存在内存中, 也可以缓存在磁盘中, 也可以缓存在外部存储中

分布式:数据可以分布在多台服务器中,RDD中的分区来自于block块,而block块会来自不同的datanode

 数据集:(1)RDD自身可以不存储数据的,只存放代码计算逻辑,触发作业执行的时候,数据会在RDD之间流动

               (2)RDD 也可以缓存起来, 相当于存储具体数据

2、RDD 是一个分布式计算框架, 所以, 一定是要能够进行分区计算的, 只有分区了, 才能利用集群的并行计算能力

二、shuffle

spark的运行过程中如果出现了相同的键被拉取到对应的分区,这个过程称之为shuffle。

注:只有 Key-Value 型的 RDD 才会有 Shuffle 操作,Spark的shuffle和mapreduce的shuffle原理是一样,都是要进行落盘。

  1. object Demo2Partition {
  2. def main(args: Array[String]): Unit = {
  3. //1、创建Spark环境
  4. //1.1 创建配置文件对象
  5. val conf: SparkConf = new SparkConf()
  6. //1.2 指定运行的模式(local Standalone Mesos YARN)
  7. conf.setMaster("local") //可以执行所运行需要核数资源local[2],不指定的话默认使用所有的资源执行程序
  8. //1.3 给spark作业起一个名字
  9. conf.setAppName("wc")
  10. //2、创建spark运行时的上下文对象
  11. //SparkContext是spark-core的入口组件, 是一个 Spark 程序的入口,主要作用是连接集群, 创建 RDD, 累加器, 广播变量等
  12. val sparkContext: SparkContext = new SparkContext(conf)
  13. //3、读取文件数据
  14. // val wordsLine: RDD[String] = sparkContext.textFile("spark/data/ws/*", minPartitions = 7)
  15. val wordsLine: RDD[String] = sparkContext.textFile("spark/data/ws/*")
  16. println(s"wordsLineRDD分区数是:${wordsLine.getNumPartitions}")
  17. //4、每一行根据|分隔符进行切分
  18. val words: RDD[String] = wordsLine.flatMap(_.split("\\|"))
  19. println(s"wordsRDD分区数是:${words.getNumPartitions}")
  20. val wordsTuple2: RDD[(String, Int)] = words.map((_, 1))
  21. println(s"wordsTuple2RDD分区数是:${wordsTuple2.getNumPartitions}")
  22. //产生shuffle的算子上可以单独设置分区数
  23. val wordsTuple2Group: RDD[(String, Iterable[(String, Int)])] = wordsTuple2.groupBy(_._1, 5)
  24. println(s"wordsTuple2GroupRDD分区数是:${wordsTuple2Group.getNumPartitions}")
  25. val wordCount: RDD[(String, Int)] = wordsTuple2Group.map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))
  26. println(s"wordCountRDD分区数是:${wordCount.getNumPartitions}")
  27. wordCount.saveAsTextFile("spark/data/word_count2")
  28. }
  29. }

SparkContext是spark功能的主要入口其代表与spark集群的连接,能够用来在集群上创建RDD、累加器、广播变量。每个JVM里只能存在一个处于激活状态的SparkContext,在创建新的SparkContext之前必须调用stop()来关闭之前的SparkContext。

每一个Spark应用都是一个SparkContext实例,可以理解为一个SparkContext就是一个spark application的生命周期,一旦SparkContext创建之后,就可以用这个SparkContext来创建RDD、累加器、广播变量,并且可以通过SparkContext访问Spark的服务,运行任务。spark context设置内部服务,并建立与spark执行环境的连接。

sparkContext在Spark应用程序的执行过程中起着主导作用,它负责与程序和spark集群进行交互,包括申请集群资源、创建RDD、accumulators及广播变量等。

三、RDD五大特性

1、RDD是由一些分区构成的,读取文件时有多少个block块,RDD中就会有多少个分区
注:默认情况下,所有的RDD中的分区数是一样的,无论是shuffle之前还是shuffle之后的,在最开始加载数据的时候决定的

  2、函数实际上是作用在RDD中的分区上的,一个分区是由一个task处理,有多少个分区,总共就有多少个task

 注:函数在spark中称之为算子(转换transformation算子 RDD-->RDD,行动action算子 RDD->Other数据类型)

  3、RDD之间存在一些依赖关系,后一个RDD中的数据是依赖与前一个RDD的计算结果,数据像水流一样在RDD之间流动

 注:

3.1 RDD之间有两种依赖关系

a. 窄依赖 后一个RDD中分区数据对应前一个RDD中的一个分区数据 1对1的关系

b. 宽依赖 后一个RDD中分区数据来自前一个RDD中的多个分区数据 1对多的关系(shuffle)

3.2 因为有了依赖关系,将整个作业划分了一个一个stage阶段 sumNum(stage) = Num(宽依赖) + 1

3.3 窄依赖的分区数是不可以改变,取决于第一个RDD分区数,宽依赖可以在产生shuffle的算子上设置分区数

4、分区类的算子只能作用在键值对格式的RDD上,groupByKey、reduceByKey

5、spark为task计算提供了精确的计算位置,移动计算而不移动数据

四、RDD算子

RDD 中的算子从功能上分为两大类

  1. Transformation(转换) :它会在一个已经存在的 RDD 上创建一个新的 RDD, 将旧的 RDD 的数据转换为另外一种形式后放入新的 RDD

  2. Action(行动): 执行各个分区的计算任务, 将的到的结果返回到 Driver 中

注意:RDD具有懒执行的特点,一个spark作业,由action算子来触发执行的,若没有action算子,整个作业不执行

转换算子:Transformation

1、Map算子

       将rdd中的数据,一条一条的取出来传入到map函数中,map会返回一个新的rdd,map不会改变总数据条数

  1. object Demo3Map {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. conf.setMaster("local")
  5. conf.setAppName("map算子演示")
  6. val context = new SparkContext(conf)
  7. //====================================================
  8. val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
  9. /**
  10. * map算子:将rdd中的数据,一条一条的取出来传入到map函数中,map会返回一个新的rdd,map不会改变总数据条数
  11. */
  12. val splitRDD: RDD[List[String]] = studentRDD.map((s: String) => {
  13. println("============好好学习================")
  14. s.split(",").toList
  15. })
  16. // splitRDD.foreach(println) //foreach是action算子
  17. }
  18. }

2、filter算子

       filter: 过滤,将RDD中的数据一条一条取出传递给filter后面的函数,如果函数的结果是true,该条数据就保留,否则丢弃

  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import org.apache.spark.rdd.RDD
  3. object Demo4Filter {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf()
  6. conf.setMaster("local")
  7. conf.setAppName("map算子演示")
  8. val context = new SparkContext(conf)
  9. //====================================================
  10. val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
  11. /**
  12. * filter: 过滤,将RDD中的数据一条一条取出传递给filter后面的函数,如果函数的结果是true,该条数据就保留,否则丢弃
  13. *
  14. * filter一般情况下会减少数据的条数
  15. */
  16. val filterRDD: RDD[String] = studentRDD.filter((s: String) => {
  17. val strings: Array[String] = s.split(",")
  18. "男".equals(strings(3))
  19. })
  20. filterRDD.foreach(println)
  21. }
  22. }

3、flatMap算子

       flatMap算子:将RDD中的数据一条一条的取出传递给后面的函数,函数的返回值必须是一个集合。最后会将集合展开构成一个新的RDD

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Demo5flatMap {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf()
  6. conf.setMaster("local")
  7. conf.setAppName("flatMap算子演示")
  8. val context = new SparkContext(conf)
  9. //====================================================
  10. val linesRDD: RDD[String] = context.textFile("spark/data/words.txt")
  11. /**
  12. * flatMap算子:将RDD中的数据一条一条的取出传递给后面的函数,函数的返回值必须是一个集合。最后会将集合展开构成一个新的RDD
  13. */
  14. val wordsRDD: RDD[String] = linesRDD.flatMap((line: String) => line.split("\\|"))
  15. wordsRDD.foreach(println)
  16. }
  17. }

4、sample算子

        从前一个RDD的数据中抽样一部分数据,抽取的比例不是正好对应的,在抽取的比例上下浮动 比如1000条抽取10% 抽取的结果在100条左右

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Demo6sample {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf()
  6. conf.setMaster("local")
  7. conf.setAppName("flatMap算子演示")
  8. val context = new SparkContext(conf)
  9. //====================================================
  10. val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
  11. /**
  12. * sample算子:从前一个RDD的数据中抽样一部分数据
  13. *
  14. * 抽取的比例不是正好对应的,在抽取的比例上下浮动 比如1000条抽取10% 抽取的结果在100条左右
  15. */
  16. val sampleRDD: RDD[String] = studentRDD.sample(withReplacement = true, 0.1)
  17. sampleRDD.foreach(println)
  18. }
  19. }

5、groupBy算子

        groupBy:按照指定的字段进行分组,返回的是一个键是分组字段,值是一个存放原本数据的迭代器的键值对 返回的是kv格式的RDD

  1. object Demo7GroupBy {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. conf.setMaster("local")
  5. conf.setAppName("groupBy算子演示")
  6. val context = new SparkContext(conf)
  7. //====================================================
  8. val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
  9. val splitRDD: RDD[Array[String]] = studentRDD.map((s: String) => s.split(","))
  10. //需求:求出每个班级平均年龄
  11. //使用模式匹配的方式取出班级和年龄
  12. val clazzWithAgeRDD: RDD[(String, Int)] = splitRDD.map {
  13. case Array(_, _, age: String, _, clazz: String) => (clazz, age.toInt)
  14. }
  15. /**
  16. * groupBy:按照指定的字段进行分组,返回的是一个键是分组字段,值是一个存放原本数据的迭代器的键值对 返回的是kv格式的RDD
  17. *
  18. * key: 是分组字段
  19. * value: 是spark中的迭代器
  20. * 迭代器中的数据,不是完全被加载到内存中计算,迭代器只能迭代一次
  21. *
  22. * groupBy会产生shuffle
  23. */
  24. //按照班级进行分组
  25. //val stringToStudents: Map[String, List[Student]] = stuList.groupBy((s: Student) => s.clazz)
  26. val kvRDD: RDD[(String, Iterable[(String, Int)])] = clazzWithAgeRDD.groupBy(_._1)
  27. val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {
  28. case (clazz: String, itr: Iterable[(String, Int)]) =>
  29. //CompactBuffer((理科二班,21), (理科二班,23), (理科二班,21), (理科二班,23), (理科二班,21), (理科二班,21), (理科二班,24))
  30. //CompactBuffer(21,23,21,23,21,21,24)
  31. val allAge: Iterable[Int] = itr.map((kv: (String, Int)) => kv._2)
  32. val avgAge: Double = allAge.sum.toDouble / allAge.size
  33. (clazz, avgAge)
  34. }
  35. clazzAvgAgeRDD.foreach(println)
  36. while (true){
  37. }
  38. }
  39. }

6、groupByKey算子

        groupByKey: 按照键进行分组,将value值构成迭代器返回,在spark中看到RDD[(xx, xxx)] 这样的RDD就是kv键值对类型的RDD,只有kv类型键值对RDD才可以调用groupByKey算子。

  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import org.apache.spark.rdd.RDD
  3. object Demo8GroupByKey {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf()
  6. conf.setMaster("local")
  7. conf.setAppName("groupByKey算子演示")
  8. val context = new SparkContext(conf)
  9. //====================================================
  10. val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
  11. val splitRDD: RDD[Array[String]] = studentRDD.map((s: String) => s.split(","))
  12. //需求:求出每个班级平均年龄
  13. //使用模式匹配的方式取出班级和年龄
  14. val clazzWithAgeRDD: RDD[(String, Int)] = splitRDD.map {
  15. case Array(_, _, age: String, _, clazz: String) => (clazz, age.toInt)
  16. }
  17. /**
  18. * groupByKey: 按照键进行分组,将value值构成迭代器返回
  19. * 将来你在spark中看到RDD[(xx, xxx)] 这样的RDD就是kv键值对类型的RDD
  20. * 只有kv类型键值对RDD才可以调用groupByKey算子
  21. *
  22. */
  23. val kvRDD: RDD[(String, Iterable[Int])] = clazzWithAgeRDD.groupByKey()
  24. val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {
  25. case (clazz: String, ageItr: Iterable[Int]) =>
  26. (clazz, ageItr.sum.toDouble / ageItr.size)
  27. }
  28. clazzAvgAgeRDD.foreach(println)
  29. while (true){
  30. }
  31. /**
  32. * groupBy与groupByKey的区别(spark的面试题)
  33. * 1、代码上的区别:任意一个RDD都可以调用groupBy算子,只有kv类型的RDD才可以调用groupByKey
  34. * 2、groupByKey之后产生的RDD的结构比较简单,方便后续处理
  35. * 3、groupByKey的性能更好,执行速度更快,因为groupByKey相比较与groupBy算子来说,shuffle所需要的数据量较少
  36. */
  37. }
  38. }

7、reduceByKey算子

       按照键key对value值直接进行聚合,需要传入聚合的方式,reduceByKey算子也是只有kv类型的RDD才能调用。

  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import org.apache.spark.rdd.RDD
  3. object Demo9ReduceByKey {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf()
  6. conf.setMaster("local")
  7. conf.setAppName("reduceByKey算子演示")
  8. val context = new SparkContext(conf)
  9. //====================================================
  10. val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
  11. val splitRDD: RDD[Array[String]] = studentRDD.map((s: String) => s.split(","))
  12. //求每个班级的人数
  13. val clazzKVRDD: RDD[(String, Int)] = splitRDD.map {
  14. case Array(_, _, _, _, clazz: String) => (clazz, 1)
  15. }
  16. /**
  17. * 利用groupByKey实现
  18. */
  19. // val kvRDD: RDD[(String, Iterable[Int])] = clazzKVRDD.groupByKey()
  20. // val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {
  21. // case (clazz: String, n: Iterable[Int]) =>
  22. // (clazz, n.sum)
  23. // }
  24. // clazzAvgAgeRDD.foreach(println)
  25. /**
  26. * 利用reduceByKey实现:按照键key对value值直接进行聚合,需要传入聚合的方式
  27. * reduceByKey算子也是只有kv类型的RDD才能调用
  28. *
  29. *
  30. */
  31. val countRDD: RDD[(String, Int)] = clazzKVRDD.reduceByKey((x: Int, y: Int) => x + y)
  32. countRDD.foreach(println)
  33. // clazzKVRDD.groupByKey()
  34. // .map(kv=>(kv._1,kv._2.sum))
  35. // .foreach(println)
  36. while (true){
  37. }
  38. /**
  39. * reduceByKey与groupByKey的区别
  40. * 1、reduceByKey比groupByKey在map端多了一个预聚合的操作,预聚合之后的shuffle数据量肯定是要少很多的,性能上比groupByKey要好
  41. * 2、从灵活角度来看,reduceByKey并没有groupByKey灵活
  42. * 比如reduceByKey无法做方差,groupByKey后续可以完成
  43. *
  44. */
  45. }
  46. }

8、union算子

union:上下合并两个RDD,前提是两个RDD中的数据类型要一致,合并后不会对结果进行去重。这里的合并只是逻辑层面上的合并,物理层面其实是没有合并

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Demo10Union {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf()
  6. conf.setMaster("local")
  7. conf.setAppName("Union算子演示")
  8. val context = new SparkContext(conf)
  9. //====================================================
  10. val w1RDD: RDD[String] = context.textFile("spark/data/ws/w1.txt") // 1
  11. val w2RDD: RDD[String] = context.textFile("spark/data/ws/w2.txt") // 1
  12. /**
  13. * union:上下合并两个RDD,前提是两个RDD中的数据类型要一致,合并后不会对结果进行去重
  14. *
  15. * 注:这里的合并只是逻辑层面上的合并,物理层面其实是没有合并
  16. */
  17. val unionRDD: RDD[String] = w1RDD.union(w2RDD)
  18. println(unionRDD.getNumPartitions) // 2
  19. unionRDD.foreach(println)
  20. while (true){
  21. }
  22. }
  23. }

9、join算子

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * 内连接:join
  5. * 左连接:leftJoin
  6. * 右连接:rightJoin
  7. * 全连接:fullJoin
  8. */
  9. object Demo11Join {
  10. def main(args: Array[String]): Unit = {
  11. val conf = new SparkConf()
  12. conf.setMaster("local")
  13. conf.setAppName("Join算子演示")
  14. val context = new SparkContext(conf)
  15. //====================================================
  16. //两个kv类型的RDD之间的关联
  17. //通过scala中的集合构建RDD
  18. val rdd1: RDD[(String, String)] = context.parallelize(
  19. List(
  20. ("1001", "尚平"),
  21. ("1002", "丁义杰"),
  22. ("1003", "徐昊宇"),
  23. ("1004", "包旭"),
  24. ("1005", "朱大牛"),
  25. ("1006","汪权")
  26. )
  27. )
  28. val rdd2: RDD[(String, String)] = context.parallelize(
  29. List(
  30. ("1001", "崩坏"),
  31. ("1002", "原神"),
  32. ("1003", "王者"),
  33. ("1004", "修仙"),
  34. ("1005", "学习"),
  35. ("1007", "敲代码")
  36. )
  37. )
  38. //内连接
  39. // val innerJoinRDD: RDD[(String, (String, String))] = rdd1.join(rdd2)
  40. // //加工一下RDD
  41. // val innerJoinRDD2: RDD[(String, String, String)] = innerJoinRDD.map {
  42. // case (id: String, (name: String, like: String)) => (id, name, like)
  43. // }
  44. // innerJoinRDD2.foreach(println)
  45. //左连接
  46. val leftJoinRDD: RDD[(String, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)
  47. //加工一下RDD
  48. val leftJoinRDD2: RDD[(String, String, String)] = leftJoinRDD.map {
  49. case (id: String, (name: String, Some(like))) => (id, name, like)
  50. case (id: String, (name: String, None)) => (id, name, "无爱好")
  51. }
  52. leftJoinRDD2.foreach(println)
  53. println("=================================")
  54. //右连接与左连接相差不多,不在赘述
  55. //全连接
  56. val fullJoinRDD: RDD[(String, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)
  57. //加工一下RDD
  58. val fullJoinRDD2: RDD[(String, String, String)] = fullJoinRDD.map {
  59. case (id: String, (Some(name), Some(like))) => (id, name, like)
  60. case (id: String, (Some(name), None)) => (id, name, "无爱好")
  61. case (id: String, (None, Some(like))) => (id, "无姓名", like)
  62. }
  63. fullJoinRDD2.foreach(println)
  64. }
  65. }

10、sortby算子

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Demo12Student {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf()
  6. conf.setMaster("local")
  7. conf.setAppName("sortby算子演示")
  8. val context = new SparkContext(conf)
  9. //====================================================
  10. //需求:统计总分年级排名前10的学生的各科分数
  11. //读取分数文件数据
  12. val scoreRDD: RDD[(String, String, String)] = context.textFile("spark/data/score.txt") // 读取数据文件
  13. .map((s: String) => s.split(",")) // 切分数据
  14. .filter((arr: Array[String]) => arr.length == 3) // 过滤掉脏数据
  15. .map {
  16. //整理数据,进行模式匹配取出数据
  17. case Array(sid: String, subject_id: String, score: String) => (sid, subject_id, score)
  18. }
  19. //计算每个学生的总分
  20. val sumScoreWithSidRDD: RDD[(String, Int)] = scoreRDD.map {
  21. case (sid: String, _: String, score: String) => (sid, score.toInt)
  22. }.reduceByKey((x: Int, y: Int) => x + y)
  23. //按照总分排序
  24. val sumScoreTop10: Array[(String, Int)] = sumScoreWithSidRDD.sortBy(-_._2).take(10)
  25. //取出前10的学生学号
  26. val ids: Array[String] = sumScoreTop10.map(_._1)
  27. //取出每个学生各科分数
  28. val top10StuScore: RDD[(String, String, String)] = scoreRDD.filter {
  29. case (id: String, _, _) => ids.contains(id)
  30. }
  31. top10StuScore.foreach(println)
  32. }
  33. }

11、mapValues算子

          也是作用在kv类型的RDD上,主要的作用键不变,处理值

  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import org.apache.spark.rdd.RDD
  3. object Demo13MapValues {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf()
  6. conf.setMaster("local")
  7. conf.setAppName("Join算子演示")
  8. val context = new SparkContext(conf)
  9. //====================================================
  10. //需求:统计总分年级排名前10的学生的各科分数
  11. //读取分数文件数据
  12. val scoreRDD: RDD[(String, String, String)] = context.textFile("spark/data/score.txt") // 读取数据文件
  13. .map((s: String) => s.split(",")) // 切分数据
  14. .filter((arr: Array[String]) => arr.length == 3) // 过滤掉脏数据
  15. .map {
  16. //整理数据,进行模式匹配取出数据
  17. case Array(sid: String, subject_id: String, score: String) => (sid, subject_id, score)
  18. }
  19. //计算每个学生的总分
  20. val sumScoreWithSidRDD: RDD[(String, Int)] = scoreRDD.map {
  21. case (sid: String, _: String, score: String) => (sid, score.toInt)
  22. }.reduceByKey((x: Int, y: Int) => x + y)
  23. /**
  24. * mapValues算子:也是作用在kv类型的RDD上
  25. * 主要的作用键不变,处理值
  26. */
  27. val resRDD: RDD[(String, Int)] = sumScoreWithSidRDD.mapValues(_ + 1000)
  28. resRDD.foreach(println)
  29. //等同于
  30. val res2RDD: RDD[(String, Int)] = sumScoreWithSidRDD.map((kv: (String, Int)) => (kv._1, kv._2 + 1000))
  31. }
  32. }

12、mapPartition算子

         mapPartition: 主要作用是一次处理一个分区的数据,将一个分区的数据一个一个传给后面的函数进行处理

  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import org.apache.spark.rdd.RDD
  3. object Demo14mapPartition {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf()
  6. conf.setMaster("local")
  7. conf.setAppName("mapPartition算子演示")
  8. val context = new SparkContext(conf)
  9. //====================================================
  10. //需求:统计总分年级排名前10的学生的各科分数
  11. //读取分数文件数据
  12. val scoreRDD: RDD[String] = context.textFile("spark/data/ws/*") // 读取数据文件
  13. println(scoreRDD.getNumPartitions)
  14. /**
  15. * mapPartition: 主要作用是一次处理一个分区的数据,将一个分区的数据一个一个传给后面的函数进行处理
  16. *
  17. * 迭代器中存放的是一个分区的数据
  18. */
  19. // val mapPartitionRDD: RDD[String] = scoreRDD.mapPartitions((itr: Iterator[String]) => {
  20. //
  21. // println(s"====================当前处理的分区====================")
  22. // //这里写的逻辑是作用在一个分区上的所有数据
  23. // val words: Iterator[String] = itr.flatMap(_.split("\\|"))
  24. // words
  25. // })
  26. // mapPartitionRDD.foreach(println)
  27. scoreRDD.mapPartitionsWithIndex{
  28. case (index:Int,itr: Iterator[String]) =>
  29. println(s"当前所处理的分区编号是:${index}")
  30. itr.flatMap(_.split("\\|"))
  31. }.foreach(println)
  32. }
  33. }

行动算子:Action

13、collect算子

          以数组的形式返回数据集中所有元素

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Demo15Actions {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf()
  6. conf.setMaster("local")
  7. conf.setAppName("Action算子演示")
  8. val context = new SparkContext(conf)
  9. //====================================================
  10. val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")
  11. /**
  12. * 转换算子:transformation 将一个RDD转换成另一个RDD,转换算子是懒执行的,需要一个action算子触发执行
  13. *
  14. * 行动算子(操作算子):action算子,触发任务执行。一个action算子就会触发一次任务执行
  15. */
  16. println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")
  17. val studentsRDD: RDD[(String, String, String, String, String)] = studentRDD.map(_.split(","))
  18. .map {
  19. case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
  20. println("**************************** ^_^ ********************************")
  21. (id, name, age, gender, clazz)
  22. }
  23. println("$$$$$$$$$$$$$$$$$$$$$$***__***$$$$$$$$$$$$$$$$$$$$$$$$$")
  24. // foreach其实就是一个action算子
  25. // studentsRDD.foreach(println)
  26. // println("="*100)
  27. // studentsRDD.foreach(println)
  28. // while (true){
  29. //
  30. // }
  31. /**
  32. * collect()行动算子 主要作用是将RDD转成scala中的数据结构
  33. *
  34. */
  35. val tuples: Array[(String, String, String, String, String)] = studentsRDD.collect()
  36. }
  37. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/640204
推荐阅读
相关标签
  

闽ICP备14008679号