当前位置:   article > 正文

spark中RDD编程(java)_javardd groupby

javardd groupby

目录

一、RDD的概念

二、RDD编程 

1.RDD创建

1.1从集合中创建

1.2 从文件中读取数据集创建

1.3 从其RDD创建 

2.分区规则

2.1从集合创建RDD

2.2 从文件创建RDD

3.Transformation转换算子

3.1Value类型

1.map()映射

2.mapPartitions()以分区为单位执行Map

map()和mapPartitions()区别:

 3.mapPartitionsWithIndex()带分区号

 4.flatMap()扁平化,类似与炸裂函数

5.groupBy()分组   会走shuffle

6. filter()过滤   类似于where

 7.distinct()去重   走shuffle        

8.coalesce()合并分区

 9.repartition()重新分区,默认走shuffle

coalesce和repartition区别

3.2 Key-Value类型

要想使用Key-Value类型的算子首先需要使用特定的方法转换为PairRDD

1. mapValues()只对Value进行操作

 2.partitonBy() 按照key值重新分区

自定义分区器

3.groupByKey()按照K重新分组

4.reduceByKey()   reduceByKey()按照K聚合V,效果等于groupBy  +  mapValue

reduceByKey()和groupByKey()的区别

5.sortByKey()按照K进行排序

6.join()等同于sql里的内连接,关联上的要,关联不上的舍弃

 7.cogroup()类似于sql的全连接,但是在同一个RDD中对key聚合

 4.action行动算子

1.collect() 行动算子 ,以数组的形式返回数据集

2.count()行动算子,返回RDD中元素个数

  first()返回RDD中的第一个元素

take()返回由RDD前n个元素组成的数组

takeOrdered()返回该RDD排序后前n个元素组成的数组

countByKey()统计每种key的个数

 saveAsTextFile(path)保存成Text文件

saveAsObjectFile(path) 序列化成对象保存到文件

 foreach()遍历RDD中每一个元素

5.WordCount案例实操

1,本地调试

2.集群运行

 3.进阶案例

4.RDD序列化

4.1 


一、RDD的概念

1. RDDResilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。

代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

2.RDD的五大特性

(1)RDD是由一系列partition组成(block块对应partition),textFile底层调用的是MR读取hdfs上的数据的方法默认一个block块对应一个split,split的大小和block大小一致,可以自己调整

(2)函数作用在每一个partition(split)上

(3)RDD之间有一系列的依赖关系(容错机制)

(4)分区器作用在K,V格式的RDD上

(5)RDD 提供一系列最佳的计算位置

二、RDD编程 

1.RDD创建

1.1从集合中创建

  1. package com.atguigu.createrdd;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import java.util.Arrays;
  6. import java.util.List;
  7. import java.util.function.Consumer;
  8. public class Test01_List {
  9. public static void main(String[] args) {
  10. //创建配置对象
  11. SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
  12. //创建sparkContext
  13. JavaSparkContext sc = new JavaSparkContext(conf);
  14. //使用sc编写代码
  15. JavaRDD<String> javaRDD = sc.parallelize(Arrays.asList("hello", "spark"));
  16. List<String> result = javaRDD.collect();
  17. //遍历打印
  18. for (String s : result) {
  19. System.out.println(s);
  20. }
  21. //函数式写法
  22. result.forEach(new Consumer<String>() {
  23. @Override
  24. public void accept(String s) {
  25. System.out.println("值为 " + s);
  26. }
  27. });
  28. //lambda表达式:
  29. //1.箭头左边给传入的参数起名字,不用写类型
  30. //2.箭头右边编写方法体,如果方法题有多行,使用{}括起来,
  31. //3. 如果有返回值 不写return 使用最后一行代码的结果作为返回值
  32. result.forEach(s ->{
  33. System.out.println("hi");
  34. System.out.println("值为 " + s);
  35. });
  36. // lambda表达式进一步简化
  37. result.forEach(System.out::println);
  38. //关闭sc
  39. sc.close();
  40. }
  41. }

1.2 从文件中读取数据集创建

  1. package com.atguigu.createrdd;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import java.util.List;
  6. import java.util.function.Consumer;
  7. public class Test02_FromFile {
  8. public static void main(String[] args) {
  9. //创建spark配置对象
  10. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  11. //创建sc环境
  12. JavaSparkContext sc = new JavaSparkContext(conf);
  13. //编写代码
  14. JavaRDD<String> lineRDD = sc.textFile("input/2.txt");
  15. List<String> collect = lineRDD.collect();
  16. //将结果保存到文件中 有一个文件就是对应一个分区
  17. lineRDD.saveAsTextFile("output");
  18. collect.forEach(System.out::println);
  19. lineRDD.collect().forEach(new Consumer<String>() {
  20. @Override
  21. public void accept(String s) {
  22. System.out.println("---------");
  23. System.out.println(s);
  24. }
  25. });
  26. lineRDD. collect().forEach(System.out::println);
  27. //关闭sc
  28. sc.stop();
  29. }
  30. }

1.3 从其RDD创建 

2.分区规则

关于分区都不会shuffle,一个shuffle会有一个读和写的过程,只有一个写不走shuffle

2.1从集合创建RDD

默认环境的核数,可自行设置参数

  1. package com.atguigu.createrdd;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.util.Utils;
  6. import java.io.File;
  7. import java.util.Arrays;
  8. public class Test03_ListPartition {
  9. public static void main(String[] args) throws InterruptedException {
  10. //创建spark配置对象
  11. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  12. //创建sc环境
  13. JavaSparkContext sc = new JavaSparkContext(conf);
  14. //编写代码
  15. //默认的分区个数 环境核数
  16. //可以手动填写参数修改分区
  17. JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1,2,3,4,5),2);
  18. // 多余的数据会放到后面一个分区,比如现在5个元素,12在一个分区,345在另一个分区
  19. // 将结果保存到文件中 有一个文件就是对应一个分区
  20. Utils.deleteRecursively(new File("output"));
  21. integerJavaRDD.saveAsTextFile("output");
  22. //关闭sc
  23. sc.stop();
  24. }
  25. }

2.2 从文件创建RDD

  1. package com.atguigu.createrdd;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. public class Test04_FilePartition {
  6. public static void main(String[] args) {
  7. //创建spark配置对象
  8. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  9. //创建sc环境
  10. JavaSparkContext sc = new JavaSparkContext(conf);
  11. //编写代码
  12. //默认填写最小分区数为2 min(2,环境的核数)
  13. JavaRDD<String> stringJavaRDD = sc.textFile("input/1.txt",2);
  14. //spark的读文件使用了老版本的hadoop代码
  15. // 具体的分区个数需要经过公式计算
  16. // 首先获取文件的总长度 totalSize
  17. // 计算平均长度 goalSize = totalSize / numSplits
  18. // 获取块大小 128M
  19. // 计算切分大小 splitSize = Math.max(minSize, Math.min(goalSize, blockSize));
  20. // 最后使用splitSize 按照1.1倍原则切分整个文件 得到几个分区就是几个分区
  21. // 实际开发中 只需要看文件总大小 / 填写的分区数 和块大小比较 谁小拿谁进行切分
  22. lineRDD.saveAsTextFile("output");
  23. // 数据会分配到哪个分区
  24. // 如果切分的位置位于一行的中间 会在当前分区读完一整行数据
  25. stringJavaRDD.saveAsTextFile("output");
  26. //关闭sc
  27. sc.stop();
  28. }
  29. }

3.Transformation转换算子

3.1Value类型

1.map()映射

参数f是一个函数可以写作匿名子类,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。即,这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。

  1. package com.atguigu.value;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function;
  6. import scala.Tuple2;
  7. import java.util.Arrays;
  8. public class Test01_Map {
  9. public static void main(String[] args) {
  10. //创建spark配置对象
  11. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  12. //创建sc环境
  13. JavaSparkContext sc = new JavaSparkContext(conf);
  14. //编写代码
  15. JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
  16. //使用map转换
  17. //匿名函数的写法
  18. //填写的泛型 填一个为传入参数的类型 后一个为返回值的类型(同时作为下游RDD 的元素类型)
  19. JavaRDD<Integer> mapRDD = intRDD.map(new Function<Integer, Integer>() {
  20. @Override
  21. public Integer call(Integer v1) throws Exception {
  22. return v1 * 2;
  23. }
  24. });
  25. mapRDD. collect().forEach(System.out::println);
  26. //map是可以改变元素的类型的
  27. JavaRDD<Tuple2<String, Integer>> tuple2JavaRDD1 = intRDD.map(new Function<Integer, Tuple2<String, Integer>>() {
  28. @Override
  29. public Tuple2<String, Integer> call(Integer v1) throws Exception {
  30. return new Tuple2<>("值为:", v1);
  31. }
  32. });
  33. JavaRDD<Tuple2<String, Integer>> tuple2JavaRDD = intRDD.map(new Function<Integer, Tuple2<String, Integer>>() {
  34. @Override
  35. public Tuple2<String, Integer> call(Integer v1) throws Exception {
  36. return new Tuple2<>("值为:", v1);
  37. }
  38. });
  39. intRDD. collect().forEach(System.out::println);
  40. //关闭sc
  41. sc.stop();
  42. }
  43. }

2.mapPartitions()以分区为单位执行Map

功能和map一样都是处理每个元素,Map是一次处理一个元素,而mapPartitions一次处理一个分区数据。

  1. package com.atguigu.value;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.FlatMapFunction;
  6. import org.apache.spark.api.java.function.Function;
  7. import org.apache.spark.util.Utils;
  8. import java.io.File;
  9. import java.util.ArrayList;
  10. import java.util.Arrays;
  11. import java.util.Iterator;
  12. public class Test02_MapPartitions {
  13. public static void main(String[] args) throws InterruptedException {
  14. //创建spark配置对象
  15. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  16. //创建sc环境
  17. JavaSparkContext sc = new JavaSparkContext(conf);
  18. //编写代码
  19. JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5,6,7,8,9), 4);
  20. //int值 * 2
  21. //第二个泛型返回结果RDD的元素类型
  22. JavaRDD<Double> mapPartitionsRDD = intRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Double>() {
  23. @Override
  24. public Iterator<Double> call(Iterator<Integer> integerIterator) throws Exception {
  25. //将一个分区的数据全部收集到一个迭代器中 一次性处理所有的数据
  26. System.out.println("mapPartitions方法调用");
  27. ArrayList<Double> doubles = new ArrayList<>();
  28. while (integerIterator.hasNext()) {
  29. Integer next = integerIterator.next();
  30. doubles.add(next.doubleValue() * 2);
  31. }
  32. return doubles.iterator();
  33. }
  34. });
  35. mapPartitionsRDD. collect().forEach(System.out::println);
  36. //map操作
  37. JavaRDD<Double> mapRDD = intRDD.map(new Function<Integer, Double>() {
  38. @Override
  39. public Double call(Integer v1) throws Exception {
  40. System.out.println("map方法调用");
  41. return v1.doubleValue() * 2;
  42. }
  43. });
  44. System.out.println("----------------------------");
  45. mapRDD. collect().forEach(System.out::println);
  46. //需求是:将奇数的数字删除,偶数*2
  47. JavaRDD<Double> mapRDD1 = intRDD.map(new Function<Integer, Double>() {
  48. @Override
  49. public Double call(Integer v1) throws Exception {
  50. if (v1 % 2 == 1) {
  51. return null;
  52. } else {
  53. return v1.doubleValue() * 2;
  54. }
  55. }
  56. });
  57. mapRDD1. collect().forEach(System.out::println);
  58. JavaRDD<Double> mapPartitionsRDD1 = intRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Double>() {
  59. @Override
  60. public Iterator<Double> call(Iterator<Integer> integerIterator) throws Exception {
  61. ArrayList<Double> doubles = new ArrayList<>();
  62. while (integerIterator.hasNext()) {
  63. Integer next = integerIterator.next();
  64. if (next % 2 == 0) {
  65. doubles.add(next.doubleValue() * 2);
  66. }
  67. }
  68. return doubles.iterator();
  69. }
  70. });
  71. System.out.println("--------------------------");
  72. mapPartitionsRDD1. collect().forEach(System.out::println);
  73. JavaRDD<Integer> intMapPartitionsRDD = intRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
  74. @Override
  75. public Iterator<Integer> call(Iterator<Integer> integerIterator) throws Exception {
  76. ArrayList<Integer> integers = new ArrayList<>();
  77. while (integerIterator.hasNext()) {
  78. Integer next = integerIterator.next();
  79. if (next % 2 == 0) {
  80. integers.add(next * 2);
  81. }
  82. }
  83. return integers.iterator();
  84. }
  85. });
  86. Utils.deleteRecursively(new File("output"));//如果有output文件夹就会删除
  87. intMapPartitionsRDD.saveAsTextFile("output");
  88. //关闭sc
  89. sc.stop();
  90. }
  91. }

map()和mapPartitions()区别:

 map一次处理一个元素,要求元素个数一点是1对1,不能删除元素
mapPartitions一次处理一个分区的数据,可以删除元素
如果资源充足,可以使用mapPartitions对map进行优化

如下为两个的结果

 3.mapPartitionsWithIndex()带分区号

类似于mapPartitions,比mapPartitions多一个整数参数表示分区号

  1. package com.atguigu.value;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function2;
  6. import scala.Tuple2;
  7. import java.util.ArrayList;
  8. import java.util.Arrays;
  9. import java.util.Iterator;
  10. public class Test03_MapPartitionsWithIndex {
  11. public static void main(String[] args) {
  12. //创建spark配置对象
  13. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  14. //创建sc环境
  15. JavaSparkContext sc = new JavaSparkContext(conf);
  16. //编写代码
  17. JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
  18. //函数逻辑和mapPartitions完全一致,只是传一个参数,多了一个分区号
  19. //第一个参数表示分区号,最后要的类型是
  20. // 需求:int -> (分区号,int)
  21. JavaRDD<Tuple2<Integer, Integer>> tuple2JavaRDD = parallelize.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<Tuple2<Integer, Integer>>>() {
  22. @Override
  23. public Iterator<Tuple2<Integer, Integer>> call(Integer v1, Iterator<Integer> v2) throws Exception {
  24. //v1表示分区号,v2表示元素值
  25. ArrayList<Tuple2<Integer, Integer>> tuple2s = new ArrayList<>();
  26. while (v2.hasNext()) {
  27. Integer next = v2.next();
  28. tuple2s.add(new Tuple2<>(v1, next));
  29. }
  30. return tuple2s.iterator();
  31. }
  32. }, false);
  33. tuple2JavaRDD. collect().forEach(System.out::println);
  34. //关闭sc
  35. sc.stop();
  36. }
  37. }

 4.flatMap()扁平化,类似与炸裂函数

与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。

区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。

  1. package com.atguigu.value;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.FlatMapFunction;
  6. import java.util.ArrayList;
  7. import java.util.Arrays;
  8. import java.util.Iterator;
  9. import java.util.List;
  10. public class Test04_FlatMap {
  11. public static void main(String[] args) {
  12. //创建spark配置对象
  13. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  14. //创建sc环境
  15. JavaSparkContext sc = new JavaSparkContext(conf);
  16. //编写代码
  17. //集合嵌套的格式
  18. ArrayList<List<Integer>> lists = new ArrayList<>();
  19. lists.add(Arrays.asList(1, 2, 3));
  20. lists.add(Arrays.asList(4, 5, 6));
  21. lists.add(Arrays.asList(7, 8, 9));
  22. JavaRDD<List<Integer>> listRDD = sc.parallelize(lists,2);
  23. //flatMap()扁平化类似于hql的炸裂函数,就是将
  24. JavaRDD<Integer> flatRDD = listRDD.flatMap(new FlatMapFunction<List<Integer>, Integer>() {
  25. @Override
  26. public Iterator<Integer> call(List<Integer> integers) throws Exception {
  27. return integers.iterator();
  28. }
  29. });
  30. flatRDD. collect().forEach(System.out::println);
  31. //实际场景
  32. //文件里面是hello world hi nike james lakers list
  33. JavaRDD<String> lineRDD = sc.textFile("input/1.txt");
  34. JavaRDD<String> flatRDD1 = lineRDD.flatMap(new FlatMapFunction<String, String>() {
  35. @Override
  36. public Iterator<String> call(String s) throws Exception {
  37. String[] s1 = s.split(" ");//通过字符串切割出来数组
  38. List<String> strings = Arrays.asList(s1);//数组转化为集合
  39. return strings.iterator();
  40. }
  41. });
  42. flatRDD1. collect().forEach(System.out::println);
  43. //关闭sc
  44. sc.stop();
  45. }
  46. }

5.groupBy()分组   会走shuffle

分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

  1. package com.atguigu.value;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.api.java.function.Function;
  7. import org.apache.spark.util.Utils;
  8. import java.io.File;
  9. import java.util.Arrays;
  10. public class Test05_GroupBy {
  11. public static void main(String[] args) throws InterruptedException {
  12. //创建spark配置对象
  13. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  14. //创建sc环境
  15. JavaSparkContext sc = new JavaSparkContext(conf);
  16. //编写代码
  17. //需求:奇偶进行分组
  18. JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(7, 8, 10, 11, 25, 16), 2);
  19. //泛型1为传入参数,泛型2为分组依据
  20. //最终返回类型:
  21. // (根据分区依据计算的值(可以是任意类型的值),[根据分组依据将符合同一种的元素范进一个集合中])
  22. JavaPairRDD<Integer, Iterable<Integer>> groupByRDD = intRDD.groupBy(new Function<Integer, Integer>() {
  23. @Override
  24. public Integer call(Integer integer) throws Exception {
  25. return integer % 3;
  26. }
  27. });
  28. groupByRDD. collect().forEach(System.out::println);
  29. //标记类型是可以改变的
  30. JavaPairRDD<String, Iterable<Integer>> groupByRDD1 = intRDD.groupBy(new Function<Integer, String>() {
  31. @Override
  32. public String call(Integer integer) throws Exception {
  33. return integer % 2 == 0 ? "偶数" : "奇数";
  34. }
  35. });
  36. JavaPairRDD<Integer, Iterable<Integer>> groupByRDD2 = intRDD.groupBy(new Function<Integer, Integer>() {
  37. @Override
  38. public Integer call(Integer integer) throws Exception {
  39. return integer % 2 == 0 ? 2 : 5;
  40. }
  41. });
  42. groupByRDD1. collect().forEach(System.out::println);
  43. intRDD.groupBy((Function<Integer, String>) integer -> integer % 2 == 0 ? "偶数" : "奇数"). collect().forEach(System.out::println);
  44. Utils.deleteRecursively(new File("output"));
  45. groupByRDD.saveAsTextFile("output");
  46. //关闭sc
  47. sc.stop();
  48. }
  49. }

6. filter()过滤   类似于where

接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中

  1. package com.atguigu.value;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function;
  6. import java.util.Arrays;
  7. public class Test06_Filter {
  8. public static void main(String[] args) throws InterruptedException {
  9. //创建spark配置对象
  10. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  11. //创建sc环境
  12. JavaSparkContext sc = new JavaSparkContext(conf);
  13. //编写代码
  14. JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
  15. //一个元素过滤一次
  16. //相当于hql中的where,返回值只能是Boolean类型
  17. JavaRDD<Integer> filterRDD = intRDD.filter(new Function<Integer, Boolean>() {
  18. @Override
  19. public Boolean call(Integer integer) throws Exception {
  20. System.out.println("调用方法了");
  21. return integer % 2 == 0;
  22. }
  23. });
  24. filterRDD. collect().forEach(System.out::println);
  25. //关闭sc
  26. sc.stop();
  27. }
  28. }
  29. //代码结果
  30. //调用方法了
  31. //调用方法了
  32. //调用方法了
  33. //调用方法了
  34. //调用方法了
  35. //2
  36. //4
  37. //6

 7.distinct()去重   走shuffle        

对内部的元素去重,并将去重后的元素放到新的RDD中。

  1. package com.atguigu.value;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import java.util.Arrays;
  6. public class Test07_Distinct {
  7. public static void main(String[] args) throws InterruptedException {
  8. //创建spark配置对象
  9. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  10. //创建sc环境
  11. JavaSparkContext sc = new JavaSparkContext(conf);
  12. //编写代码
  13. JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 3, 2, 4, 5, 6, 7, 5, 3), 2);
  14. //使用单个的distinct去重走shuffle的,去重的方式就是hash set
  15. // 所有数据要进内存,效率很高但是肯能内存溢出,oom错误
  16. //按照group by 进行分布式去重,效率比较低,需要走shuffle,不会内存溢出
  17. JavaRDD<Integer> distinct = intRDD.distinct();
  18. distinct. collect().forEach(System.out::println);
  19. //关闭sc
  20. sc.stop();
  21. }
  22. }

DAG有向无环图,中走shuffle的话正常是shuffle后有方法,但是这个distinct在shuffle前后都有是因为底层调了别的方法,map(),reduceByKey, 等,前面那个是distinct其实运行的是map()

 

8.coalesce()合并分区

Coalesce算子包括:配置执行Shuffle和配置不执行Shuffle两种方式。

缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

  1. package com.atguigu.value;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import java.util.Arrays;
  6. public class Test08_Coalesce {
  7. public static void main(String[] args) throws InterruptedException {
  8. //创建spark配置对象
  9. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  10. //创建sc环境
  11. JavaSparkContext sc = new JavaSparkContext(conf);
  12. //编写代码
  13. JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 7);
  14. //默认不走shuffle,可以设置参数Boolean型参数,false为不走shuffle
  15. JavaRDD<Integer> coalesceRDD = intRDD.coalesce(2);
  16. JavaRDD<Integer> coalesceRDD1 = intRDD.coalesce(2, false);
  17. // Utils.deleteRecursively(new File("output"));
  18. // coalesceRDD.saveAsTextFile("output");
  19. coalesceRDD. collect().forEach(System.out::println);
  20. //关闭sc
  21. sc.stop();
  22. }
  23. }

 9.repartition()重新分区,默认走shuffle

该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。分区规则不是hash,因为平时使用的分区都是按照hash来实现的,repartition一般是对hash的结果不满意,想要打散重新分区。

  1. package com.atguigu.value;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import java.util.Arrays;
  6. public class Test09_Repartition {
  7. public static void main(String[] args) throws InterruptedException {
  8. //创建spark配置对象
  9. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  10. //创建sc环境
  11. JavaSparkContext sc = new JavaSparkContext(conf);
  12. //编写代码
  13. JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 2);
  14. // 0 -> 1,2 1 -> 3,4 2 -> 5,6,7
  15. // 分区的规则就是打散 尽量把之前同一个分区的数据 放到不同的分区里面
  16. // 如果按照默认的hash分区 会发生数据倾斜 repartition重新打散分区
  17. // 0 -> 1 -> 1 2-> 2,5 3 -> 3,6 4 -> 4,7
  18. JavaRDD<Integer> repartitionRDD = intRDD.repartition(3);
  19. repartitionRDD. collect().forEach(System.out::println);
  20. //关闭sc
  21. sc.stop();
  22. }
  23. }

coalesce和repartition区别

1)coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。

2)repartition实际上是调用的coalesce,进行shuffle。源码如下:

  1. def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  2.     coalesce(numPartitions, shuffle = true)
  3. }

3)coalesce一般为缩减分区,如果扩大分区,不使用shuffle是没有意义的,repartition扩大分区执行shuffle

 10. sortBy()排序    走shuffle

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。Spark的排序结果是全局有序。

  1. package com.atguigu.value;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function;
  6. import java.util.Arrays;
  7. public class Test10_SortBy {
  8. public static void main(String[] args) throws InterruptedException {
  9. //创建spark配置对象
  10. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  11. //创建sc环境
  12. JavaSparkContext sc = new JavaSparkContext(conf);
  13. //编写代码
  14. JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(3, 3, 4, 5, 1, 7, 6, 8, 4, 2), 2);
  15. //(1)泛型为以谁作为标准排序 (2) true为正序 (3) 排序之后的分区个数
  16. //spark是全局排序
  17. //使用特殊分区器,range分区器
  18. //做法:先抽样,抽样中选出最大最小值,
  19. // 当分区为2的时候,取最大值最小值的中位数,为3个的时候三分数
  20. //比中位数大的分一个分区,小的放另一个,然后分桶,分桶后排序
  21. JavaRDD<Integer> sortByRDD = intRDD.sortBy(new Function<Integer, Integer>() {
  22. @Override
  23. public Integer call(Integer v1) throws Exception {
  24. return v1;
  25. }
  26. }, true, 3);
  27. sortByRDD.collect().forEach(System.out::println);
  28. // Utils.deleteRecursively(new File("output"));
  29. // sortByRDD.saveAsTextFile("output");
  30. Thread.sleep(120000);
  31. //关闭sc
  32. sc.stop();
  33. }
  34. }

3.2 Key-Value类型

要想使用Key-Value类型的算子首先需要使用特定的方法转换为PairRDD

  1. package com.atguigu.keyvalue;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.api.java.function.PairFunction;
  7. import scala.Tuple2;
  8. import java.util.Arrays;
  9. public class Test01_PairRDD {
  10. public static void main(String[] args) {
  11. //创建spark配置对象
  12. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  13. //创建sc环境
  14. JavaSparkContext sc = new JavaSparkContext(conf);
  15. //编写代码
  16. JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
  17. //使用mapToPair()方法将JavaRDD转化为javaPairRDD
  18. //第一个泛型表示当前RDD的元素类型
  19. //第二个泛型表示k的类型
  20. //第三个表示value的类型
  21. JavaPairRDD<Integer, Integer> pairRDD = intRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {
  22. @Override
  23. public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
  24. return new Tuple2<>(integer, integer);
  25. }
  26. });
  27. pairRDD. collect().forEach(System.out::println);
  28. //直接创建JavaPairRDD的对象
  29. JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("b", 2)));
  30. pairRDD1. collect().forEach(System.out::println);
  31. //关闭sc
  32. sc.stop();
  33. }
  34. }

1. mapValues()只对Value进行操作

针对于(K,V)形式的类型只对V进行操作 

  1. package com.atguigu.keyvalue;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function;
  6. import scala.Tuple2;
  7. import java.util.Arrays;
  8. public class Test02_MapValues {
  9. public static void main(String[] args) {
  10. //创建spark配置对象
  11. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  12. //创建sc环境
  13. JavaSparkContext sc = new JavaSparkContext(conf);
  14. //编写代码
  15. //javaRDD不能直接调mapValues
  16. JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("b", 2)));
  17. JavaPairRDD<String, String> mapValuesRDD = pairRDD1.mapValues(new Function<Integer, String>() {
  18. @Override
  19. public String call(Integer integer) throws Exception {
  20. return integer + "|||";
  21. }
  22. });
  23. mapValuesRDD. collect().forEach(System.out::println);
  24. //关闭sc
  25. sc.stop();
  26. }
  27. }

 2.partitonBy() 按照key值重新分区

将RDD<K,V>中的K按照指定Partitioner重新进行分区;

如果原有的RDD和新的RDD分区器是一致的话就不进行分区,否则会产生Shuffle过程。

  1. package com.atguigu.keyvalue;
  2. import org.apache.spark.HashPartitioner;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.api.java.JavaPairRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.util.Utils;
  7. import scala.Tuple2;
  8. import java.io.File;
  9. import java.util.Arrays;
  10. public class Test03_PartitionBy {
  11. public static void main(String[] args) throws InterruptedException {
  12. //创建spark配置对象
  13. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  14. //创建sc环境
  15. JavaSparkContext sc = new JavaSparkContext(conf);
  16. //编写代码
  17. JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("b", 2)));
  18. //填写分区器进行分区,实现partitioner的类只有三个:
  19. // Hash、Range(一般不用,都是系统自己调用)、Python
  20. //一般都用HashPartitioner
  21. JavaPairRDD<String, Integer> partitionByRDD = pairRDD1.partitionBy(new HashPartitioner(2));
  22. JavaPairRDD<Integer, Integer> pairRDD2 = sc.parallelizePairs(Arrays.asList(new Tuple2<>(2, 1), new Tuple2<>(2, 2),new Tuple2<>(4, 3),new Tuple2<>(6, 4)));
  23. // 如果key为int值 hash值就是他自身 % 分区的个数
  24. JavaPairRDD<Integer, Integer> partitionByRDD1 = pairRDD2.partitionBy(new HashPartitioner(2));
  25. Utils.deleteRecursively(new File("output"));
  26. partitionByRDD1.saveAsTextFile("output");
  27. //关闭sc
  28. sc.stop();
  29. }
  30. }

自定义分区器

要实现自定义分区器,需要继承org.apache.spark.Partitioner类,并实现下面三个方法。

(1)numPartitions: Int:返回创建出来的分区数。

(2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。

(3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同。

  1. package com.atguigu.keyvalue;
  2. import org.apache.spark.Partitioner;
  3. public class CustomPartitioner extends Partitioner {
  4. Integer num = 0;
  5. public CustomPartitioner( int n ) {
  6. num = n;
  7. }
  8. @Override
  9. public int numPartitions() {
  10. return num;
  11. }
  12. @Override
  13. public int getPartition(Object key) {
  14. //根据需求,但是必须按照key
  15. // 如果是int值 按照对num取模
  16. // 不为int值 发送到0号分区
  17. if (key instanceof Integer) {
  18. return (Integer) key % num;
  19. }
  20. return 0;
  21. }
  22. }

使用该分区器

  1. package com.atguigu.keyvalue;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.util.Utils;
  6. import scala.Tuple2;
  7. import java.io.File;
  8. import java.util.Arrays;
  9. public class Test04_CustomPartitioner {
  10. public static void main(String[] args) {
  11. //创建spark配置对象
  12. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  13. //创建sc环境
  14. JavaSparkContext sc = new JavaSparkContext(conf);
  15. //编写代码
  16. JavaPairRDD<Integer, Integer> pairRDD2 = sc.parallelizePairs(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 2),new Tuple2<>(4, 3),new Tuple2<>(6, 4)));
  17. JavaPairRDD<Integer, Integer> partitionByRDD1 = pairRDD2.partitionBy(new CustomPartitioner(3));
  18. Utils.deleteRecursively(new File("output"));
  19. partitionByRDD1.saveAsTextFile("output");
  20. //关闭sc
  21. sc.stop();
  22. }
  23. }

3.groupByKey()按照K重新分组

groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。

该操作可以指定分区器或者分区数(默认使用HashPartitioner)

  1. package com.atguigu.keyvalue;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function;
  6. import scala.Tuple2;
  7. import java.util.Arrays;
  8. public class Test05_GroupByKey {
  9. public static void main(String[] args) throws InterruptedException {
  10. //创建spark配置对象
  11. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  12. //创建sc环境
  13. JavaSparkContext sc = new JavaSparkContext(conf);
  14. //编写代码
  15. JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 2),new Tuple2<>("hello", 11), new Tuple2<>("world", 21),new Tuple2<>("hello", 14), new Tuple2<>("world", 22)));
  16. JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD1.groupByKey();
  17. groupByKeyRDD. collect().forEach(System.out::println);
  18. JavaPairRDD<String, Integer> mapValuesRDD = groupByKeyRDD.mapValues(new Function<Iterable<Integer>, Integer>() {
  19. @Override
  20. public Integer call(Iterable<Integer> v1) throws Exception {
  21. Integer sum = 0;
  22. for (Integer integer : v1) {
  23. sum += integer;
  24. }
  25. return sum;
  26. }
  27. });
  28. mapValuesRDD. collect().forEach(System.out::println);
  29. //关闭sc
  30. sc.stop();
  31. }
  32. }

4.reduceByKey()   reduceByKey()按照K聚合V,效果等于groupBy  +  mapValue

该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。

  1. package com.atguigu.keyvalue;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function2;
  6. import scala.Tuple2;
  7. import java.util.Arrays;
  8. public class Test06_ReduceByKey {
  9. public static void main(String[] args) {
  10. //创建spark配置对象
  11. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  12. //创建sc环境
  13. JavaSparkContext sc = new JavaSparkContext(conf);
  14. //编写代码
  15. JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 2),new Tuple2<>("hello", 11), new Tuple2<>("world", 21),new Tuple2<>("hello", 14), new Tuple2<>("world", 22)));
  16. //reduceByKey = groupByKey + mapValues
  17. JavaPairRDD<String, Integer> reduceByKeyRDD = pairRDD1.reduceByKey(new Function2<Integer, Integer, Integer>() {
  18. @Override
  19. //v1为当前累计的结果,v2为当前的元素值
  20. //第一个sum的值取的就是第一个elem,elem是从第二个开始循环的
  21. public Integer call(Integer sum, Integer elem) throws Exception {
  22. // 第一次计算的时候 sum = 第一个元素 elem = 第二个元素
  23. return sum += elem;
  24. }
  25. });
  26. reduceByKeyRDD. collect().forEach(System.out::println);
  27. //关闭sc
  28. sc.stop();
  29. }
  30. }

reduceByKey()和groupByKey()的区别

1)reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。

2)groupByKey:按照key进行分组,直接进行shuffle。

3)开发指导:在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。影响业务逻辑时建议先对数据类型进行转换再合并。

reduceByKey()求平均值

  1. package com.atguigu.keyvalue;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function;
  6. import org.apache.spark.api.java.function.Function2;
  7. import scala.Tuple2;
  8. import java.util.Arrays;
  9. public class Test07_reduceByKeyAvg {
  10. public static void main(String[] args) {
  11. //创建spark配置对象
  12. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  13. //创建sc环境
  14. JavaSparkContext sc = new JavaSparkContext(conf);
  15. //编写代码
  16. JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 2),new Tuple2<>("hello", 11), new Tuple2<>("world", 21),new Tuple2<>("hello", 14), new Tuple2<>("world", 22)),2);
  17. //如果需要求平均值 需要两个数据 (sum,count)
  18. //value -> (value,1) 表示当前的数据和为value 值的个数为1
  19. JavaPairRDD<String, Tuple2<Integer, Integer>> tuple2RDD = pairRDD1.mapValues(new Function<Integer, Tuple2<Integer, Integer>>() {
  20. @Override
  21. public Tuple2<Integer, Integer> call(Integer v1) throws Exception {
  22. return new Tuple2<>(v1, 1);
  23. }
  24. });
  25. tuple2RDD. collect().forEach(System.out::println);
  26. System.out.println("---------------------");
  27. // 之后使用reduceByKey聚合数据
  28. JavaPairRDD<String, Tuple2<Integer, Integer>> pairRDD = tuple2RDD.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
  29. @Override
  30. public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> res, Tuple2<Integer, Integer> elem) throws Exception {
  31. // 将sum值相加 同时将count值相加
  32. return new Tuple2<>(res._1 + elem._1, res._2 + elem._2);
  33. }
  34. });
  35. pairRDD. collect().forEach(System.out::println);
  36. System.out.println("---------------");
  37. // 对value值使用sum / count
  38. JavaPairRDD<String, Double> resultRDD = pairRDD.mapValues(new Function<Tuple2<Integer, Integer>, Double>() {
  39. @Override
  40. public Double call(Tuple2<Integer, Integer> v1) throws Exception {
  41. return v1._1.doubleValue() / v1._2;
  42. }
  43. });
  44. resultRDD. collect().forEach(System.out::println);
  45. //关闭sc
  46. sc.stop();
  47. }
  48. }

5.sortByKey()按照K进行排序

  1. package com.atguigu.keyvalue;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.PairFunction;
  6. import scala.Tuple2;
  7. import java.util.Arrays;
  8. public class Test08_SortByKey {
  9. public static void main(String[] args) {
  10. //创建spark配置对象
  11. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  12. //创建sc环境
  13. JavaSparkContext sc = new JavaSparkContext(conf);
  14. //编写代码
  15. JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 2),new Tuple2<>("hi", 11), new Tuple2<>("you", 21),new Tuple2<>("i", 14), new Tuple2<>("ok", 22)),2);
  16. JavaPairRDD<String, Integer> sortByKeyRDD = pairRDD.sortByKey();
  17. sortByKeyRDD. collect().forEach(System.out::println);
  18. //需求:按照后面数字大小排序
  19. //把需要排序的标记放在key的位置上
  20. JavaPairRDD<Integer, Tuple2<String, Integer>> pairRDD1 = pairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, Tuple2<String, Integer>>() {
  21. @Override
  22. public Tuple2<Integer, Tuple2<String, Integer>> call(Tuple2<String, Integer> v1) throws Exception {
  23. return new Tuple2<>(v1._2, v1);
  24. }
  25. });
  26. System.out.println("-----------");
  27. pairRDD1. collect().forEach(System.out::println);
  28. JavaPairRDD<Integer, Tuple2<String, Integer>> pairRDD2 = pairRDD1.sortByKey();
  29. JavaPairRDD<String, Integer> resultRDD = pairRDD2.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<String, Integer>>, String, Integer>() {
  30. @Override
  31. public Tuple2<String, Integer> call(Tuple2<Integer, Tuple2<String, Integer>> v1) throws Exception {
  32. return v1._2;
  33. }
  34. });
  35. System.out.println("-------------");
  36. resultRDD. collect().forEach(System.out::println);
  37. //关闭sc
  38. sc.stop();
  39. }
  40. }

6.join()等同于sql里的内连接,关联上的要,关联不上的舍弃

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

  1. package com.atguigu.keyvalue;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import scala.Tuple2;
  6. import java.util.Arrays;
  7. public class Test09_Join {
  8. public static void main(String[] args) {
  9. //创建spark配置对象
  10. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  11. //创建sc环境
  12. JavaSparkContext sc = new JavaSparkContext(conf);
  13. //编写代码
  14. JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>(1, "a"), new Tuple2<>(2, "b"), new Tuple2<>(4, "m")));
  15. JavaPairRDD<Integer, String> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>(1, "p"), new Tuple2<>(2, "o"), new Tuple2<>(4, "i"), new Tuple2<>(6, "i")));
  16. // 相当于等值连接 rdd1.k = rdd2.k
  17. JavaPairRDD<Integer, Tuple2<String, String>> joinRDD = pairRDD.join(pairRDD1);
  18. joinRDD. collect().forEach(System.out::println);
  19. //关闭sc
  20. sc.stop();
  21. }
  22. }

 7.cogroup()类似于sql的全连接,但是在同一个RDD中对key聚合

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD。

操作两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。

  1. package com.atguigu.keyvalue;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import scala.Tuple2;
  6. import java.util.Arrays;
  7. public class Test10_Cogroup {
  8. public static void main(String[] args) {
  9. //创建spark配置对象
  10. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  11. //创建sc环境
  12. JavaSparkContext sc = new JavaSparkContext(conf);
  13. //编写代码
  14. JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>(1, "a"), new Tuple2<>(2, "b"), new Tuple2<>(4, "m"),new Tuple2<>(1, "f")));
  15. JavaPairRDD<Integer, String> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>(1, "p"), new Tuple2<>(2, "o"), new Tuple2<>(4, "i"), new Tuple2<>(6, "i")));
  16. JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<String>>> result = pairRDD.cogroup(pairRDD1);
  17. // rdd1.k = rdd2.k
  18. // (k,(集合[rdd1的元素,],集合[rdd2的元素]))
  19. result. collect().forEach(System.out::println);
  20. //关闭sc
  21. sc.stop();
  22. }
  23. }

 4.action行动算子

1.collect() 行动算子 ,以数组的形式返回数据集

在驱动程序中,以数组Array的形式返回数据集的所有元素。所有的数据都会被拉取到Driver端,容易造成内存溢出,慎用。collect是按照分区的编号从0号开始一次拉取到driver

  1. package com.atguigu.action;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import java.util.Arrays;
  6. import java.util.List;
  7. public class Test01_Collect {
  8. public static void main(String[] args) {
  9. //创建spark配置对象
  10. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  11. //创建sc环境
  12. JavaSparkContext sc = new JavaSparkContext(conf);
  13. //编写代码
  14. JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
  15. // 按照分区的编号从0号分区开始拉取数据到driver
  16. // 实际开发不会使用collect 因为会将所有的数据都拉取到driver端 容易内存溢出
  17. List<Integer> collect = intRDD.collect();
  18. collect.forEach(a -> System.out.println(a));
  19. //关闭sc
  20. sc.stop();
  21. }
  22. }

2.count()行动算子,返回RDD中元素个数

  first()返回RDD中的第一个元素

take()返回由RDD前n个元素组成的数组

takeOrdered()返回该RDD排序后前n个元素组成的数组

  1. package com.atguigu.action;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import java.util.Arrays;
  6. import java.util.List;
  7. public class Test02_Count {
  8. public static void main(String[] args) {
  9. //创建spark配置对象
  10. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  11. //创建sc环境
  12. JavaSparkContext sc = new JavaSparkContext(conf);
  13. //编写代码
  14. JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(4, 5, 1, 2, 3), 6);
  15. long count = intRDD.count();
  16. System.out.println(count);//结果为5
  17. // first 返回0号分区的第一个元素 如果没有顺位往下
  18. Integer first = intRDD.first();
  19. System.out.println(first);//结果为4
  20. // take取前n个元素
  21. List<Integer> take = intRDD.take(2);
  22. System.out.println(take);//[4,5]
  23. List<Integer> takeOrderedRDD = intRDD.takeOrdered(2);
  24. System.out.println(takeOrderedRDD);//[1,2]正序排序后取前两个
  25. //关闭sc
  26. sc.stop();
  27. }
  28. }

countByKey()统计每种key的个数

 saveAsTextFile(path)保存成Text文件

功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

saveAsObjectFile(path) 序列化成对象保存到文件

功能说明:用于将RDD中的元素序列化成对象,存储到文件中

  1. package com.atguigu.action;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import scala.Tuple2;
  6. import java.util.Arrays;
  7. import java.util.Map;
  8. public class Test03_CountByKey {
  9. public static void main(String[] args) {
  10. //创建spark配置对象
  11. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  12. //创建sc环境
  13. JavaSparkContext sc = new JavaSparkContext(conf);
  14. //编写代码
  15. JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("hello", 10), new Tuple2<>("world", 20),new Tuple2<>("hello", 10), new Tuple2<>("world", 20),new Tuple2<>("hello", 10), new Tuple2<>("world", 20)),2);
  16. Map<String, Long> countByKeyRDD = pairRDD1.countByKey();
  17. System.out.println(countByKeyRDD);//{hello=3, world=3}
  18. // 保存为文本文件 能够直接读懂
  19. pairRDD1.saveAsTextFile("output");
  20. //保存为对象格式,无法读懂
  21. pairRDD1.saveAsObjectFile("output1");
  22. //关闭sc
  23. sc.stop();
  24. }
  25. }

 foreach()遍历RDD中每一个元素

  1. package com.atguigu.action;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.VoidFunction;
  6. import java.util.Arrays;
  7. public class Test04_Foreach {
  8. public static void main(String[] args) {
  9. //创建spark配置对象
  10. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  11. //创建sc环境
  12. JavaSparkContext sc = new JavaSparkContext(conf);
  13. //编写代码
  14. JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 6);
  15. parallelize.foreach(new VoidFunction<Integer>() {
  16. @Override
  17. public void call(Integer integer) throws Exception {
  18. System.out.println(integer);
  19. }
  20. });//3,5,4,6,2,1
  21. //关闭sc
  22. sc.stop();
  23. }
  24. }

5.WordCount案例实操

1,本地调试

本地Spark程序调试需要使用Local提交模式,即将本机当做运行环境,Master和Worker都为本机。运行时直接加断点调试即可。

2.集群运行

  1. package com.atguigu;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.api.java.function.FlatMapFunction;
  7. import org.apache.spark.api.java.function.Function2;
  8. import org.apache.spark.api.java.function.PairFlatMapFunction;
  9. import org.apache.spark.api.java.function.PairFunction;
  10. import scala.Tuple2;
  11. import java.util.ArrayList;
  12. import java.util.Arrays;
  13. import java.util.Iterator;
  14. public class WordCount {
  15. public static void main(String[] args) {
  16. //创建spark配置对象
  17. SparkConf conf = new SparkConf().setAppName("core").setMaster("yarn");
  18. //创建sc环境
  19. JavaSparkContext sc = new JavaSparkContext(conf);
  20. //编写代码
  21. JavaRDD<String> javaRDD = sc.textFile(args[0]);
  22. //将读进来的每一行数据进行切分,然后同过flatMap进行打散
  23. JavaRDD<String> wordRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() {
  24. @Override
  25. public Iterator<String> call(String s) throws Exception {
  26. String[] s1 = s.split(" ");
  27. return Arrays.asList(s1).iterator();
  28. }
  29. });
  30. //将javaRDD转化为JavaPairRDD
  31. JavaPairRDD<String, Integer> wordPairRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
  32. @Override
  33. public Tuple2<String, Integer> call(String s) throws Exception {
  34. return new Tuple2<>(s, 1);
  35. }
  36. });
  37. //等同于flatMap + mapToPair
  38. JavaPairRDD<String, Integer> wordPairRDD1 = wordRDD.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
  39. @Override
  40. public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
  41. ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>();
  42. String[] words = s.split(" ");
  43. for (String word : words) {
  44. tuple2s.add(new Tuple2<>(word, 1));
  45. }
  46. return tuple2s.iterator();
  47. }
  48. });
  49. //使用reduceByKey()聚合
  50. JavaPairRDD<String, Integer> result = wordPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
  51. @Override
  52. public Integer call(Integer v1, Integer v2) throws Exception {
  53. return v1 + v2;
  54. }
  55. });
  56. result.saveAsTextFile(args[1]);
  57. //关闭sc
  58. sc.stop();
  59. }
  60. }

进行打包

在HDFS上创建上传的路径,将需要WordCount的文件上传上去

然后执行(记得启动Hadoop)

[atguigu@hadoop102 spark-yarn]$ bin/spark-submit \

--class com.atguigu.spark.WordCount \

--master yarn \

./WordCount.jar \

/input \

/output

 3.进阶案例

(1)需求:省份广告被点击Top3

(2)数据准备:时间戳,省份,城市,用户,广告,中间字段使用空格分割。

  1. package com.atguigu;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.api.java.function.Function;
  7. import org.apache.spark.api.java.function.Function2;
  8. import org.apache.spark.api.java.function.PairFunction;
  9. import scala.Tuple2;
  10. import java.util.ArrayList;
  11. import java.util.Map;
  12. import java.util.TreeMap;
  13. public class Top3 {
  14. public static void main(String[] args) {
  15. //创建spark配置对象
  16. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  17. //创建sc环境
  18. JavaSparkContext sc = new JavaSparkContext(conf);
  19. //编写代码
  20. JavaRDD<String> lineRDD = sc.textFile("input/agent.log");
  21. // 将一整行字符串 1516609143867 山西 7 64 A
  22. // 转换为 key为(省份,广告) value 为 1
  23. JavaPairRDD<Tuple2<String, String>, Integer> tuple2RDD = lineRDD.mapToPair(new PairFunction<String, Tuple2<String, String>, Integer>() {
  24. @Override
  25. public Tuple2<Tuple2<String, String>, Integer> call(String s) throws Exception {
  26. String[] s1 = s.split(" ");
  27. Tuple2<String, String> key = new Tuple2<>(s1[1], s1[4]);
  28. return new Tuple2<>(key, 1);
  29. }
  30. });
  31. //将省份广告id相同的聚合
  32. JavaPairRDD<Tuple2<String, String>, Integer> reduceByKeyRDD = tuple2RDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
  33. @Override
  34. public Integer call(Integer v1, Integer v2) throws Exception {
  35. return v1 + v2;
  36. }
  37. });// 已经完成省份+广告的点击次数统计 ((省份,广告),sum)
  38. // 转换格式 将省份作为key value为 (广告,sum) 然后聚合
  39. JavaPairRDD<String, Tuple2<String, Integer>> stringTuple2JavaPairRDD = reduceByKeyRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Tuple2<String, Integer>>() {
  40. @Override
  41. public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Tuple2<String, String>, Integer> v1) throws Exception {
  42. String province = v1._1._1;
  43. String id = v1._1._2;
  44. Integer sum = v1._2;
  45. return new Tuple2<>(province, new Tuple2<>(id, sum));
  46. }
  47. });
  48. //将同一个省份的数据聚合
  49. JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> provinceRDD = stringTuple2JavaPairRDD.groupByKey();
  50. //找出单个省份点击次数的top3
  51. JavaPairRDD<String, ArrayList<Tuple2<String, Integer>>> result = provinceRDD.mapValues(new Function<Iterable<Tuple2<String, Integer>>, ArrayList<Tuple2<String, Integer>>>() {
  52. @Override
  53. public ArrayList<Tuple2<String, Integer>> call(Iterable<Tuple2<String, Integer>> v1) throws Exception {
  54. //将二元组的数据按照value进行排序
  55. //创建treeMap,把需要排序的当作key
  56. TreeMap<Integer, String> treeMap = new TreeMap<>();
  57. for (Tuple2<String, Integer> tuple2 : v1) {
  58. Integer key = tuple2._2;
  59. if (treeMap.containsKey(key)) {
  60. //当treeMap中存在相同次数的广告id
  61. //将广告id进行拼接
  62. treeMap.put(key, treeMap.get(key) + "_" + tuple2._1);
  63. } else {
  64. //当前map中不存在相同次数的广告id
  65. treeMap.put(key, tuple2._1);
  66. }
  67. }
  68. // 放入之后已经进行排序了 只需要找出前三即可
  69. ArrayList<Tuple2<String, Integer>> result = new ArrayList<>();
  70. // 如果总数据没有了 或者 结果够3个了 停止循环
  71. while (treeMap.size() > 0 && result.size() < 3) {
  72. //treeMap的lastEntry()方法会返回key值最大的键值对
  73. Map.Entry<Integer, String> entry = treeMap.lastEntry();
  74. //将次数相同的广告id遍历
  75. String[] ids = entry.getValue().split("_");
  76. for (String id : ids) {
  77. result.add(new Tuple2<>(id, entry.getKey()));
  78. }
  79. // 取出一个数据 用完之后删除
  80. treeMap.remove(entry.getKey());
  81. }
  82. return result;
  83. }
  84. });
  85. result. collect().forEach(System.out::println);
  86. //关闭sc
  87. sc.stop();
  88. }
  89. }

4.RDD序列化

在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。

4.1 序列化异常

  1. package com.atguigu.serializable;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function;
  6. import java.util.Arrays;
  7. public class Test01_Ser {
  8. public static void main(String[] args) {
  9. //创建spark配置对象
  10. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  11. //创建sc环境
  12. JavaSparkContext sc = new JavaSparkContext(conf);
  13. //编写代码
  14. //如果User不继承Serializable该代码会报错
  15. //自定义的rdd初始化是在driver端进行的,
  16. // 实际运行程序实在executor端进行,就涉及到网络通信,所以需要序列化
  17. User zhangsan = new User("zhangsan", 19);
  18. User lisi = new User("lisi", 18);
  19. JavaRDD<User> userRDD = sc.parallelize(Arrays.asList(zhangsan, lisi));
  20. JavaRDD<User> result = userRDD.map(new Function<User, User>() {
  21. @Override
  22. public User call(User v1) throws Exception {
  23. return new User(v1.getName(), v1.getAge() + 1);
  24. }
  25. });
  26. result. collect().forEach(System.out::println);
  27. //关闭sc
  28. sc.stop();
  29. }
  30. }

4.2Kryo序列化框架

Java的序列化能够序列化任何的类。但是比较重,序列化后对象的体积也比较大。

Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。

5.RDD的依赖关系

5.1查看血缘关系

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

 WordCount的  .toDebugString  方法调用后的结果

 (2) ShuffledRDD[5] at reduceByKey at WordCount1.java:57 []
 +-(2) MapPartitionsRDD[3] at mapToPair at WordCount1.java:36 []
    |  MapPartitionsRDD[2] at flatMap at WordCount1.java:27 []
    |  input/2.txt MapPartitionsRDD[1] at textFile at WordCount1.java:24 []
    |  input/2.txt HadoopRDD[0] at textFile at WordCount1.java:24 []

 5.2窄依赖

窄依赖表示每一个父RDDPartition最多被子RDD的一个Partition使用(一对一or多对一),窄依赖我们形象的比喻为独生子女。

5.3宽依赖

宽依赖表示同一个父RDDPartition被多个子RDDPartition依赖(只能是一对多),会引起Shuffle,总结:宽依赖我们形象的比喻为超生。

具有宽依赖的transformations包括:sortreduceByKeygroupByKeyjoin和调用rePartition函数的任何操作。

宽依赖对Spark去评估一个transformations有更加重要的影响,比如对性能的影响。

在不影响业务要求的情况下,要尽量避免使用有宽依赖的转换算子,因为有宽依赖,就一定会走shuffle,影响性能。

5.4Stage任务划分

1)DAG有向无环图

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。

 

 2)任务运行的整体流程

 

3)RDD任务切分中间分为:Application、Job、Stage和Task

(1)Application:初始化一个SparkContext即生成一个Application;

(2)Job:一个Action算子就会生成一个Job;

(3)Stage:Stage等于宽依赖的个数加1;

(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

注意:Application->Job->Stage->Task每一层都是1对n的关系

5.5RDD  Cache缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

Cache底层调用的就是Persist方法,缓存级别默认用的是MEMORY_ONLY

RDD.cache()

Persist方法可以更改存储级别,使用Persist方法有以下的参数,disk表示存储再磁盘,memory表示存储再内存中,最后的2表示两份数据

存储再内存中的数据,有可能会因为运行算子内存不够的时候,会被清掉,

两个方法都不会影响血缘关系

NONE

DISK_ONLY

DISK_ONLY_2

MEMORY_ONLY

 MEMORY_ONLY_2 

 MEMORY_ONLY_SER 

MEMORY_ONLY_SER_2 

 MEMORY_AND_DISK

MEMORY_AND_DISK_2

 MEMORY_AND_DISK_SER 

MEMORY_AND_DISK_SER_2 

OFF_HEAP

  1. package com.atguigu.cache;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.api.java.function.Function2;
  7. import org.apache.spark.api.java.function.PairFlatMapFunction;
  8. import org.apache.spark.storage.StorageLevel;
  9. import scala.Tuple2;
  10. import java.util.ArrayList;
  11. import java.util.Iterator;
  12. public class Test01_Cache {
  13. public static void main(String[] args) throws InterruptedException {
  14. //创建spark配置对象
  15. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  16. //创建sc环境
  17. JavaSparkContext sc = new JavaSparkContext(conf);
  18. //编写代码
  19. JavaRDD<String> lineRDD = sc.textFile("input/2.txt");
  20. JavaPairRDD<String, Integer> wordPairRDD = lineRDD.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
  21. @Override
  22. public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
  23. ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>();
  24. String[] words = s.split(" ");
  25. for (String word : words) {
  26. tuple2s.add(new Tuple2<>(word, 1));
  27. }
  28. return tuple2s.iterator();
  29. }
  30. });
  31. // wordPairRDD.cache();
  32. wordPairRDD.persist(StorageLevel.MEMORY_ONLY());
  33. wordPairRDD.persist(StorageLevel.MEMORY_ONLY());
  34. JavaPairRDD<String, Integer> resultRDD = wordPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
  35. @Override
  36. public Integer call(Integer v1, Integer v2) throws Exception {
  37. return v1 + v2;
  38. }
  39. });
  40. resultRDD. collect().forEach(System.out::println);
  41. System.out.println(resultRDD.toDebugString());
  42. resultRDD. collect().forEach(System.out::println);
  43. // Thread.sleep(120000);
  44. //关闭sc
  45. sc.stop();
  46. }
  47. }

5.6 RDD   CheckPoint检查点

1)检查点:是通过将RDD中间结果写入磁盘。

2)为什么要做检查点?

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

3)检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统

4)检查点数据存储格式为:二进制的文件

5)检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。

6)检查点触发时间:对RDD进行Checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍。

  1. package com.atguigu.cache;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.api.java.function.PairFlatMapFunction;
  7. import scala.Tuple2;
  8. import java.util.ArrayList;
  9. import java.util.Iterator;
  10. public class Test02_CheckPoint {
  11. public static void main(String[] args) throws InterruptedException {
  12. //创建spark配置对象
  13. SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
  14. //创建sc环境
  15. JavaSparkContext sc = new JavaSparkContext(conf);
  16. sc.setCheckpointDir("ck");
  17. //编写代码
  18. JavaRDD<String> lineRDD = sc.textFile("input/2.txt");
  19. JavaPairRDD<String, Integer> wordPairRDD = lineRDD.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
  20. @Override
  21. public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
  22. ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>();
  23. String[] words = s.split(" ");
  24. for (String word : words) {
  25. tuple2s.add(new Tuple2<>(word, 1));
  26. }
  27. return tuple2s.iterator();
  28. }
  29. });
  30. //加上cache()方法发后,就是3个job
  31. wordPairRDD.cache();
  32. //如果只使用checkpoint()方法的话,会有4个job
  33. //因为checkpoint会自己再计算一边
  34. wordPairRDD.checkpoint();
  35. wordPairRDD. collect().forEach(System.out::println);
  36. wordPairRDD. collect().forEach(System.out::println);
  37. wordPairRDD. collect().forEach(System.out::println);
  38. Thread.sleep(120000);
  39. //关闭sc
  40. sc.stop();
  41. }

5.7 键值对RDD数据分区

Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。

1)注意:

1)只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None

2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/466659
推荐阅读
相关标签
  

闽ICP备14008679号