当前位置:   article > 正文

Spark04: Transformation与Action开发_spark transformation action

spark transformation action

一、创建RDD的三种方式

RDD是Spark编程的核心,在进行Spark编程时,首要任务是创建一个初始的RDD这样就相当于设置了Spark应用程序的输入源数据然后在创建了初始的RDD之后,才可以通过Spark 提供的一些高阶函数,对这个RDD进行操作,来获取其它的RDD。

Spark提供三种创建RDD方式:集合、本地文件、HDFS文件

  • 使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造一些测试数据,来测试后面的spark应用程序的流程。
  • 使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件。
  • 使用HDFS文件创建RDD,是最常用的生产环境的处理方式,主要可以针对HDFS上存储的数据,进行离线批处理操作。

1. 使用集合创建RDD

如果要通过集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上,形成一个分布式的数据集合,也就是一个RDD。相当于,集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以用并行的方式来操作这个分布式数据集合了。

调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark默认会根据集群的配置来设置partition的数量。我们也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量,例如:parallelize(arr, 5)

引入依赖:

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-core_2.11</artifactId>
  4. <version>2.4.3</version>
  5. <!-- <scope>provided</scope>-->
  6. </dependency>

Scala代码:

  1. package com.sanqian.scala
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object CreateRddByArrayScala {
  4. def main(args: Array[String]): Unit = {
  5. //创建SparkContext
  6. val conf = new SparkConf()
  7. conf.setAppName("CreateRddByArray").setMaster("local")
  8. val sc = new SparkContext(conf)
  9. //创建集合
  10. val arr = Array(1, 2, 3, 4, 5)
  11. //基于集合创建RDD
  12. val rdd = sc.parallelize(arr)
  13. //对集合中的元素求和
  14. val sum = rdd.reduce(_ + _)
  15. //注意:这行println代码是在driver进程中执行的
  16. println(sum)
  17. }
  18. }

注意:val arr = Array(1,2,3,4,5)还有println(sum)代码是在driver进程中执行的,这些代码不会并行执行,parallelize还有reduce之类的操作是在worker节点中执行的

Java代码:

  1. package com.sanqian.java;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function2;
  6. import java.util.Arrays;
  7. import java.util.List;
  8. public class CreateRddByArrayJava {
  9. public static void main(String[] args) {
  10. //创建JavaSparkContext
  11. SparkConf conf = new SparkConf();
  12. conf.setAppName("CreateRddByArrayJava").setMaster("local");
  13. JavaSparkContext sc = new JavaSparkContext(conf);
  14. //创建集合
  15. List<Integer> arr = Arrays.asList(1, 2, 3, 4, 5);
  16. JavaRDD<Integer> rdd = sc.parallelize(arr);
  17. Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
  18. @Override
  19. public Integer call(Integer integer, Integer integer2) throws Exception {
  20. return integer + integer2;
  21. }
  22. });
  23. System.out.println(sum);
  24. sc.stop();
  25. }
  26. }

2. 使用本地文件和HDFS文件创建RDD

  • 通过SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD,RDD中的每个元素就是文件中的一行文本内容
  • textFile()方法支持针对目录、压缩文件以及通配符创建RDD。
  • Spark默认会为HDFS文件的每一个Block创建一个partition,也可以通过textFile()的第二个参数手动设置分区数量,只能比Block数量多,不能比Block数量少,比Block数量少的话你的设置是不生效的

 Scala代码:

  1. package com.sanqian.scala
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object CreateRddByFileScala {
  4. def main(args: Array[String]): Unit = {
  5. //创建SparkContext
  6. val conf = new SparkConf()
  7. conf.setAppName("CreateRddByArray").setMaster("local")
  8. val sc = new SparkContext(conf)
  9. var path = "D:\\data\\words.txt"
  10. path = "hdfs://bigdata01:9000/words.txt"
  11. val rdd = sc.textFile(path)
  12. val length = rdd.map(_.length).reduce(_ + _)
  13. println(length)
  14. sc.stop()
  15. }
  16. }

Java代码:

  1. package com.sanqian.java;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function;
  6. import org.apache.spark.api.java.function.Function2;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. public class CreateRddByFileJava {
  10. public static void main(String[] args) {
  11. //创建JavaSparkContext
  12. SparkConf conf = new SparkConf();
  13. conf.setAppName("CreateRddByArrayJava").setMaster("local");
  14. JavaSparkContext sc = new JavaSparkContext(conf);
  15. //创建集合
  16. String path = "D:\\data\\words.txt";
  17. path = "hdfs://bigdata01:9000/words.txt";
  18. JavaRDD<String> rdd = sc.textFile(path, 2);
  19. //获取每一行数据的长度
  20. JavaRDD<Integer> numRDD = rdd.map(new Function<String, Integer>() {
  21. @Override
  22. public Integer call(String s) throws Exception {
  23. return s.length();
  24. }
  25. });
  26. //计算文件内数据的总长度
  27. Integer sum = numRDD.reduce(new Function2<Integer, Integer, Integer>() {
  28. @Override
  29. public Integer call(Integer integer, Integer integer2) throws Exception {
  30. return integer + integer2;
  31. }
  32. });
  33. System.out.println(sum);
  34. sc.stop();
  35. }
  36. }

 二、Transformation和Action介绍

 Spark对RDD的操作可以整体分为两类:

  • Transformation和Action这里的Transformation可以翻译为转换,表示是针对RDD中数据的转换操作,主要会针对已有的RDD创建一个新的RDD:常见的有map、flatMap、filter等等
  • Action可以翻译为执行,表示是触发任务执行的操作,主要对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并且还可以把结果返回给Driver程序

不管是Transformation操作还是Action操作,一般会把它们称之为算子,例如:map算子,reduce算子。

其中Transformation算子有一个特性:lazy,lazy特性在这里指的是,如果一个spark任务中只定义了transformation算子,那么即使你执行这个任务,任务中的算子也不会执行。也就是说,transformation是不会触发spark任务的执行,它们只是记录了对RDD所做的操作,不会执行。只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。Spark通过lazy这种特性,来进行底层的spark任务执行的优化,避免产生过多中间结果。

Action的特性:执行Action操作才会触发一个Spark任务的运行,从而触发这个Action之前所有的
Transformation的执行。

三、常用Transformation介绍

那下面我们先来看一下Spark中的Transformation算子

1.官方文档


先来看一下官方文档,进入2.4.3的文档界面

 

 2. 常用算子

算子介绍
map将RDD中的每个元素进行处理,一进一出
filter对RDD中每个元素进行判断,返回true则保留
flatMap与map类似,但是每个元素都可以返回一个或多个新元素
groupByKey根据key进行分组,每个key对应一个Iterable<value>
reduceByKey对每个相同key对应的value进行reduce操作
sortByKey对每个相同key对应的value进行排序操作(全局排序)
join对两个包含<key,value>对的RDD进行join操作
distinct对RDD中的元素进行全局去重

3. Transformation操作开发实战

  • map:对集合中每个元素乘以2
  • filter:过滤出集合中的偶数
  • flatMap:将行拆分为单词
  • groupByKey:对每个大区的主播进行分组
  • reduceByKey:统计每个大区的主播数量
  • sortByKey:对主播的音浪收入排序
  • join:打印每个主播的大区信息和音浪收入
  • distinct:统计当天开播的大区信息

4. Scala代码:

  1. package com.sanqian.scala
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * 需求:transformation 实战
  5. * map: 对集合中的每个元素乘以2
  6. * filter: 过滤出集合中的偶数
  7. * flatMap: 将行拆分为单词
  8. * groupByKey:对每个大区的主播进行分组
  9. * reduceByKey: 统计每个大区的主播数量
  10. * sortByKey: 对主播的音浪收入排序
  11. * join: 打印每个主播的大区信息和音浪收入
  12. * distinct: 统计当天开播的主播数量
  13. */
  14. object TransformationOpScala {
  15. def main(args: Array[String]): Unit = {
  16. val sc = getSparkContext
  17. //map: 对集合中的每个元素乘以2
  18. // mapOp(sc)
  19. //filter: 过滤出集合中的偶数
  20. // filterOp(sc)
  21. //flatMap: 将行拆分为单词
  22. // flatMapOp(sc)
  23. //groupByKey:对每个大区的主播进行分组
  24. // groupByKeyOp2(sc)
  25. //reduceByKey: 统计每个大区的主播数量
  26. // reduceBykeyOp(sc)
  27. //sortByKey: 对主播的音浪收入排序
  28. // sortByKeyOp(sc)
  29. //join: 打印每个主播的大区信息和音浪收入
  30. // joinOp(sc)
  31. //distinct: 统计当天开播的大区信息
  32. distinctOp(sc)
  33. }
  34. def distinctOp(sc: SparkContext): Unit = {
  35. val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (15003, "CN"), (15004, "IN")))
  36. //由于是统计开播的大区信息,需要根据大区信息去重,所以只保留大区信息
  37. rdd.map(_._2).distinct().foreach(println(_))
  38. }
  39. def joinOp(sc: SparkContext): Unit = {
  40. val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (15003, "CN"), (15004, "IN")))
  41. val rdd2 = sc.parallelize(Array((150001, 400), (150002, 200), (15003, 300), (15004, 100)))
  42. rdd.join(rdd2).foreach(tup => {
  43. //用户id
  44. val uid = tup._1
  45. val area_gold = tup._2
  46. println(uid + "\t" + area_gold._1 +"\t" + area_gold._2)
  47. })
  48. }
  49. def sortByKeyOp(sc: SparkContext): Unit = {
  50. val rdd = sc.parallelize(Array((150001, 400), (150002, 200), (15003, 300), (15004, 100)))
  51. // rdd.map(tup => (tup._2, tup._1)).sortByKey(ascending = false)
  52. // .foreach(println(_))
  53. //sortBy的使用,可以动态指定排序字段比较灵活
  54. rdd.sortBy(_._2, ascending = false).foreach(println(_))
  55. }
  56. def reduceBykeyOp(sc: SparkContext): Unit = {
  57. val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (15003, "CN"), (15004, "IN")))
  58. rdd.map(tup => (tup._2, 1)).reduceByKey(_ + _).foreach(println(_))
  59. }
  60. def groupByKeyOp(sc: SparkContext): Unit = {
  61. val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (15003, "CN"), (15004, "IN")))
  62. rdd.map(tup => (tup._2, tup._1)).groupByKey().foreach(tup => {
  63. //获取大区信息
  64. val area = tup._1
  65. print(area + ":")
  66. //获取同一个大区对应的所有用户id
  67. val it = tup._2
  68. for (uid <- it) {
  69. print(uid + " ")
  70. }
  71. println()
  72. })
  73. }
  74. // 如果tuple中的数据列超过了2列怎么办?
  75. // 把需要作为key的那一列作为tuple2的第一列,剩下的可以再使用tuple2包装一下
  76. //注意:如果你的数据结构比较复杂,可以在执行每一个算子之后都调用foreach打印一下,确认数据的格式
  77. def groupByKeyOp2(sc: SparkContext): Unit = {
  78. val rdd = sc.parallelize(Array((150001, "US", "male"), (150002, "CN", "female"), (15003, "CN", "male"), (15004, "IN", "male")))
  79. rdd.map(tup => (tup._2, (tup._1, tup._3))).groupByKey().foreach(tup => {
  80. //获取大区信息
  81. val area = tup._1
  82. print(area + ":")
  83. //获取同一个大区对应的所有用户id
  84. val it = tup._2
  85. for ((uid, sex) <- it) {
  86. print("<" + uid + "," + sex + "> ")
  87. }
  88. println()
  89. })
  90. }
  91. def mapOp(sc: SparkContext): Unit = {
  92. val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
  93. rdd.map(_ * 2).foreach(println(_))
  94. }
  95. def flatMapOp(sc: SparkContext): Unit = {
  96. val rdd = sc.parallelize(Array("good good study", "day day up"))
  97. rdd.flatMap(_.split(" ")).foreach(println(_))
  98. }
  99. def filterOp(sc: SparkContext): Unit = {
  100. val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
  101. //满足条件的保存下来
  102. rdd.filter(_ % 2 == 0).foreach(println(_))
  103. }
  104. def getSparkContext = {
  105. //创建SparkContext
  106. val conf = new SparkConf()
  107. conf.setAppName("TransformationOpScala").setMaster("local")
  108. new SparkContext(conf)
  109. }
  110. }

5. Java代码:

  1. package com.sanqian.java;
  2. import jdk.nashorn.internal.ir.FunctionCall;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.api.java.JavaPairRDD;
  5. import org.apache.spark.api.java.JavaRDD;
  6. import org.apache.spark.api.java.JavaSparkContext;
  7. import org.apache.spark.api.java.function.*;
  8. import scala.Tuple2;
  9. import scala.Tuple3;
  10. import java.util.Arrays;
  11. import java.util.Iterator;
  12. public class TransformationOpJava {
  13. public static void main(String[] args) {
  14. JavaSparkContext sc = getSparkContext();
  15. //map: 对集合中的每个元素乘以2
  16. // mapOp(sc);
  17. //filter: 过滤出集合中的偶数
  18. // filterOp(sc);
  19. //flatMap: 将行拆分为单词
  20. // flatMapOp(sc);
  21. //groupByKey:对每个大区的主播进行分组
  22. // groupByKeyOp(sc);
  23. // groupByKeyOp2(sc);
  24. //reduceByKey: 统计每个大区的主播数量
  25. // reduceByKeyOp(sc);
  26. //sortByKey: 对主播的音浪收入排序
  27. // sortByKeyOp(sc);
  28. //join: 打印每个主播的大区信息和音浪收入
  29. // joinOp(sc);
  30. //distinct: 统计每个大区的主播
  31. distinctOp(sc);
  32. }
  33. private static void distinctOp(JavaSparkContext sc) {
  34. Tuple2<Integer, String> t5 = new Tuple2<Integer, String>(150001, "US");
  35. Tuple2<Integer, String> t6 = new Tuple2<Integer, String>(150002, "CN");
  36. Tuple2<Integer, String> t7 = new Tuple2<Integer, String>(150003, "CN");
  37. Tuple2<Integer, String> t8 = new Tuple2<Integer, String>(150004, "IN");
  38. JavaRDD<Tuple2<Integer, String>> rdd2 = sc.parallelize(Arrays.asList(t5, t6, t7, t8));
  39. rdd2.map(new Function<Tuple2<Integer, String>, String>() {
  40. @Override
  41. public String call(Tuple2<Integer, String> v1) throws Exception {
  42. return v1._2;
  43. }
  44. }).distinct().foreach(new VoidFunction<String>() {
  45. @Override
  46. public void call(String s) throws Exception {
  47. System.out.println(s);
  48. }
  49. });
  50. }
  51. private static void joinOp(JavaSparkContext sc) {
  52. Tuple2<Integer, Integer> t1 = new Tuple2<Integer, Integer>(150002, 200);
  53. Tuple2<Integer, Integer> t3 = new Tuple2<Integer, Integer>(150001, 400);
  54. Tuple2<Integer, Integer> t2 = new Tuple2<Integer, Integer>(150003, 300);
  55. Tuple2<Integer, Integer> t4 = new Tuple2<Integer, Integer>(150004, 100);
  56. JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
  57. Tuple2<Integer, String> t5 = new Tuple2<Integer, String>(150001, "US");
  58. Tuple2<Integer, String> t6 = new Tuple2<Integer, String>(150002, "CN");
  59. Tuple2<Integer, String> t7 = new Tuple2<Integer, String>(150003, "CN");
  60. Tuple2<Integer, String> t8 = new Tuple2<Integer, String>(150004, "IN");
  61. JavaRDD<Tuple2<Integer, String>> rdd2 = sc.parallelize(Arrays.asList(t5, t6, t7, t8));
  62. JavaPairRDD<Integer, Integer> rddPair = rdd.mapToPair(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
  63. @Override
  64. public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tup) throws Exception {
  65. return new Tuple2<Integer, Integer>(tup._1, tup._2);
  66. }
  67. });
  68. JavaPairRDD<Integer, String> rdd2Pair = rdd2.mapToPair(new PairFunction<Tuple2<Integer, String>, Integer, String>() {
  69. @Override
  70. public Tuple2<Integer, String> call(Tuple2<Integer, String> tup) throws Exception {
  71. return new Tuple2<Integer, String>(tup._1, tup._2);
  72. }
  73. });
  74. rddPair.join(rdd2Pair).foreach(new VoidFunction<Tuple2<Integer, Tuple2<Integer, String>>>() {
  75. @Override
  76. public void call(Tuple2<Integer, Tuple2<Integer, String>> tuple) throws Exception {
  77. //主播编号
  78. Integer uid = tuple._1;
  79. //大区和音浪收入信息
  80. Tuple2<Integer, String> tu = tuple._2();
  81. System.out.println(uid + "\t" + tu._1 + "\t" + tu._2);
  82. }
  83. });
  84. }
  85. private static void sortByKeyOp(JavaSparkContext sc) {
  86. //(150001, 400), (150002, 200), (15003, 300), (15004, 100)
  87. Tuple2<Integer, Integer> t1 = new Tuple2<Integer, Integer>(150002, 200);
  88. Tuple2<Integer, Integer> t3 = new Tuple2<Integer, Integer>(150001, 400);
  89. Tuple2<Integer, Integer> t2 = new Tuple2<Integer, Integer>(150003, 300);
  90. Tuple2<Integer, Integer> t4 = new Tuple2<Integer, Integer>(150004, 100);
  91. JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
  92. /*
  93. rdd.mapToPair(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
  94. @Override
  95. public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tup) throws Exception {
  96. return new Tuple2<Integer, Integer>(tup._2, tup._1);
  97. }
  98. }).sortByKey(false).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
  99. @Override
  100. public void call(Tuple2<Integer, Integer> tup) throws Exception {
  101. System.out.println(tup);
  102. }
  103. });
  104. */
  105. rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
  106. @Override
  107. public Integer call(Tuple2<Integer, Integer> v1) throws Exception {
  108. return v1._2();
  109. }
  110. }, false, 1).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
  111. @Override
  112. public void call(Tuple2<Integer, Integer> tup) throws Exception {
  113. System.out.println(tup);
  114. }
  115. });
  116. }
  117. private static void reduceByKeyOp(JavaSparkContext sc) {
  118. Tuple2<Integer, String> t1 = new Tuple2<Integer, String>(150001, "US");
  119. Tuple2<Integer, String> t2 = new Tuple2<Integer, String>(150002, "CN");
  120. Tuple2<Integer, String> t3 = new Tuple2<Integer, String>(150003, "CN");
  121. Tuple2<Integer, String> t4 = new Tuple2<Integer, String>(150004, "IN");
  122. JavaRDD<Tuple2<Integer, String>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
  123. rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
  124. @Override
  125. public Tuple2<String, Integer> call(Tuple2<Integer, String> tup) throws Exception {
  126. return new Tuple2<String, Integer>(tup._2, 1);
  127. }
  128. }).reduceByKey(new Function2<Integer, Integer, Integer>() {
  129. @Override
  130. public Integer call(Integer integer, Integer integer2) throws Exception {
  131. return integer + integer2;
  132. }
  133. }).foreach(new VoidFunction<Tuple2<String, Integer>>() {
  134. @Override
  135. public void call(Tuple2<String, Integer> tuple) throws Exception {
  136. System.out.println(tuple);
  137. }
  138. });
  139. }
  140. private static void groupByKeyOp2(JavaSparkContext sc) {
  141. Tuple3<Integer, String, String> t1 = new Tuple3<Integer, String, String>(150001, "US", "male");
  142. Tuple3<Integer, String, String> t2 = new Tuple3<Integer, String, String>(150002, "CN", "female");
  143. Tuple3<Integer, String, String> t3 = new Tuple3<Integer, String, String>(150003, "CN", "female");
  144. Tuple3<Integer, String, String> t4 = new Tuple3<Integer, String, String>(150004, "IN", "male");
  145. JavaRDD<Tuple3<Integer, String, String>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
  146. rdd.mapToPair(new PairFunction<Tuple3<Integer, String, String>, String, Tuple2<String, Integer>>() {
  147. @Override
  148. public Tuple2<String, Tuple2<String, Integer>> call(Tuple3<Integer, String, String> tup) throws Exception {
  149. return new Tuple2<String, Tuple2<String, Integer>>(tup._2(), new Tuple2<String, Integer>(tup._3(), tup._1()));
  150. }
  151. }).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<String, Integer>>>>() {
  152. @Override
  153. public void call(Tuple2<String, Iterable<Tuple2<String, Integer>>> tup) throws Exception {
  154. //大区信息
  155. String area = tup._1;
  156. System.out.print(area + ":");
  157. //获取同一个大区所有用户对应的性别信息
  158. Iterable<Tuple2<String, Integer>> it = tup._2;
  159. for (Tuple2<String, Integer> tu : it) {
  160. System.out.print("<" + tu._2 + "," + tu._1 + ">");
  161. }
  162. System.out.println();
  163. }
  164. });
  165. }
  166. private static void groupByKeyOp(JavaSparkContext sc) {
  167. Tuple2<Integer, String> t1 = new Tuple2<Integer, String>(150001, "US");
  168. Tuple2<Integer, String> t2 = new Tuple2<Integer, String>(150002, "CN");
  169. Tuple2<Integer, String> t3 = new Tuple2<Integer, String>(150003, "CN");
  170. Tuple2<Integer, String> t4 = new Tuple2<Integer, String>(150004, "IN");
  171. JavaRDD<Tuple2<Integer, String>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
  172. rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
  173. @Override
  174. public Tuple2<String, Integer> call(Tuple2<Integer, String> tup) throws Exception {
  175. return new Tuple2<String, Integer>(tup._2, tup._1);
  176. }
  177. }).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
  178. @Override
  179. public void call(Tuple2<String, Iterable<Integer>> tup) throws Exception {
  180. //获取大区信息
  181. String area = tup._1;
  182. System.out.print(area + ":");
  183. Iterable<Integer> it = tup._2;
  184. for (Integer uid : it) {
  185. System.out.print(uid + " ");
  186. }
  187. System.out.println();
  188. }
  189. });
  190. }
  191. private static void flatMapOp(JavaSparkContext sc) {
  192. JavaRDD<String> rdd = sc.parallelize(Arrays.asList("good good study", "day day up"));
  193. rdd.flatMap(new FlatMapFunction<String, String>() {
  194. @Override
  195. public Iterator<String> call(String line) throws Exception {
  196. return Arrays.asList(line.split(" ")).iterator();
  197. }
  198. }).foreach(new VoidFunction<String>() {
  199. @Override
  200. public void call(String word) throws Exception {
  201. System.out.println(word);
  202. }
  203. });
  204. }
  205. private static void filterOp(JavaSparkContext sc) {
  206. JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
  207. rdd.filter(new Function<Integer, Boolean>() {
  208. @Override
  209. public Boolean call(Integer integer) throws Exception {
  210. return integer % 2 == 0;
  211. }
  212. }).foreach(new VoidFunction<Integer>() {
  213. @Override
  214. public void call(Integer integer) throws Exception {
  215. System.out.println(integer);
  216. }
  217. });
  218. }
  219. private static void mapOp(JavaSparkContext sc) {
  220. JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
  221. rdd.map(new Function<Integer, Integer>() {
  222. @Override
  223. public Integer call(Integer integer) throws Exception {
  224. return integer * 2;
  225. }
  226. }).foreach(new VoidFunction<Integer>() {
  227. @Override
  228. public void call(Integer integer) throws Exception {
  229. System.out.println(integer);
  230. }
  231. });
  232. }
  233. public static JavaSparkContext getSparkContext() {
  234. SparkConf conf = new SparkConf();
  235. conf.setAppName("TransformationOpJava").setMaster("local");
  236. return new JavaSparkContext(conf);
  237. }
  238. }

四、常用Action介绍

1. 官方文档

RDD Programming Guide - Spark 2.4.3 Documentation (apache.org)

 2. 常用action算子

算子       介绍
reduce将RDD中的所有元素进行聚合操作
collect将RDD中的所有元素拉取到本地客户端(Driver)
take(n)获取RDD中前n个元素
count获取RDD元素总数
saveAsTextFile将RDD中元素保存在文件中,对每个元素调用toStrin
countByKey对每个key对应的值进行count计数
foreach便利RDD中的每个元素

3. Scala代码

  1. package com.sanqian.scala
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * 需求:Action实战
  5. * reduce:聚合计算
  6. * collect:获取元素集合
  7. * take(n):获取前n个元素
  8. * count:获取元素总数
  9. * saveAsTextFile:保存文件
  10. * countByKey:统计相同的key出现多少次
  11. * foreach:迭代遍历元素
  12. * Created by
  13. */
  14. object ActionOpScala {
  15. def main(args: Array[String]): Unit = {
  16. val sc = getSparkContext
  17. //reduce:聚合计算
  18. // reduceOp(sc)
  19. //collect:获取元素集合
  20. // collectOp(sc)
  21. //take(n):获取前n个元素
  22. // takeOp(sc)
  23. //count:获取元素总数
  24. // countOp(sc)
  25. //saveAsTextFile:保存文件
  26. // saveAsTextFileOp(sc)
  27. //countByKey:统计相同的key出现多少次
  28. // countByKeyOp(sc)
  29. //foreach:迭代遍历元素
  30. // foreachOp(sc)
  31. }
  32. private def foreachOp(sc: SparkContext) = {
  33. val rdd = sc.parallelize(Array(1,2,3,4,5))
  34. rdd.foreach(println(_))
  35. }
  36. private def countByKeyOp(sc: SparkContext) = {
  37. val rdd = sc.parallelize(Array(("A",1001),("B",1002),("A",1003),("C",1004)))
  38. val res = rdd.countByKey()
  39. for((k, v) <- res){
  40. println(k + "," + v)
  41. }
  42. }
  43. private def saveAsTextFileOp(sc: SparkContext) = {
  44. val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
  45. //指定HDFS的路径信息即可,需要指定一个不存在的目录
  46. rdd.saveAsTextFile("hdfs://bigdata01:9000/out0104")
  47. }
  48. private def countOp(sc: SparkContext) = {
  49. val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
  50. //如果想要获取几条数据,查看一下数据格式,可以使用take(n)
  51. val count = rdd.count()
  52. print(count)
  53. }
  54. private def takeOp(sc: SparkContext) = {
  55. val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
  56. //如果想要获取几条数据,查看一下数据格式,可以使用take(n)
  57. val res = rdd.take(2)
  58. for (i <- res) {
  59. println(i)
  60. }
  61. }
  62. private def collectOp(sc: SparkContext) = {
  63. val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
  64. //注意:如果RDD中数据量过大,不建议使用collect,因为最终的数据会返回给Driver进程所在的节点
  65. //如果想要获取几条数据,查看一下数据格式,可以使用take(n)
  66. val res = rdd.collect()
  67. for (i <- res) {
  68. println(i)
  69. }
  70. }
  71. private def reduceOp(sc: SparkContext) = {
  72. val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
  73. val num = rdd.reduce(_ + _)
  74. println(num)
  75. }
  76. private def getSparkContext = {
  77. val conf = new SparkConf()
  78. conf.setAppName("ActionOpScala").setMaster("local[*]")
  79. new SparkContext(conf)
  80. }
  81. }

4. Java代码

  1. package com.sanqian.java;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function2;
  6. import org.apache.spark.api.java.function.PairFunction;
  7. import org.apache.spark.api.java.function.VoidFunction;
  8. import scala.Tuple2;
  9. import sun.awt.windows.WPrinterJob;
  10. import java.util.Arrays;
  11. import java.util.List;
  12. import java.util.Map;
  13. import java.util.Set;
  14. public class ActionOpJava {
  15. public static void main(String[] args) {
  16. JavaSparkContext sc = getSparkContext();
  17. //reduce:聚合计算
  18. // reduceOp(sc);
  19. //collect:获取元素集合
  20. // collectOp(sc);
  21. //take(n):获取前n个元素
  22. // takeOp(sc);
  23. //count:获取元素总数
  24. // countOp(sc);
  25. //saveAsTextFile:保存文件
  26. // saveAsTextFileOp(sc);
  27. //countByKey:统计相同的key出现多少次
  28. // countByKeyOp(sc);
  29. //foreach:迭代遍历元素
  30. foreachOp(sc);
  31. }
  32. public static void foreachOp(JavaSparkContext sc) {
  33. JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
  34. rdd.foreach(new VoidFunction<Integer>() {
  35. @Override
  36. public void call(Integer integer) throws Exception {
  37. System.out.println(integer);
  38. }
  39. });
  40. }
  41. public static void countByKeyOp(JavaSparkContext sc) {
  42. Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("A", 1001);
  43. Tuple2<String, Integer> t2 = new Tuple2<String, Integer>("B", 1002);
  44. Tuple2<String, Integer> t3 = new Tuple2<String, Integer>("A", 1003);
  45. Tuple2<String, Integer> t4 = new Tuple2<String, Integer>("C", 1004);
  46. JavaRDD<Tuple2<String, Integer>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
  47. Map<String, Long> res = rdd.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
  48. @Override
  49. public Tuple2<String, Integer> call(Tuple2<String, Integer> tup) throws Exception {
  50. return new Tuple2<String, Integer>(tup._1, tup._2);
  51. }
  52. }).countByKey();
  53. for (Map.Entry<String, Long> entry : res.entrySet()) {
  54. System.out.println(entry.getKey() + "," + entry.getValue());
  55. }
  56. }
  57. public static void countOp(JavaSparkContext sc) {
  58. JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
  59. long num = rdd.count();
  60. System.out.println(num);
  61. }
  62. public static void saveAsTextFileOp(JavaSparkContext sc) {
  63. JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
  64. rdd.saveAsTextFile("hdfs://bigdata01:9000/output0104");
  65. }
  66. public static void takeOp(JavaSparkContext sc) {
  67. JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
  68. List<Integer> res = rdd.take(2);
  69. for (Integer i : res) {
  70. System.out.println(i);
  71. }
  72. }
  73. public static void collectOp(JavaSparkContext sc) {
  74. JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
  75. List<Integer> res = rdd.collect();
  76. for (Integer i : res) {
  77. System.out.println(i);
  78. }
  79. }
  80. public static void reduceOp(JavaSparkContext sc) {
  81. JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
  82. Integer num = rdd.reduce(new Function2<Integer, Integer, Integer>() {
  83. @Override
  84. public Integer call(Integer v1, Integer v2) throws Exception {
  85. return v1 + v2;
  86. }
  87. });
  88. System.out.println(num);
  89. }
  90. public static JavaSparkContext getSparkContext() {
  91. SparkConf conf = new SparkConf();
  92. conf.setAppName("ActionOpJava").setMaster("local[*]");
  93. return new JavaSparkContext(conf);
  94. }
  95. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/736465
推荐阅读
相关标签
  

闽ICP备14008679号