赞
踩
目录
3.mapPartitionsWithIndex()带分区号
9.repartition()重新分区,默认走shuffle
要想使用Key-Value类型的算子首先需要使用特定的方法转换为PairRDD
4.reduceByKey() reduceByKey()按照K聚合V,效果等于groupBy + mapValue
6.join()等同于sql里的内连接,关联上的要,关联不上的舍弃
7.cogroup()类似于sql的全连接,但是在同一个RDD中对key聚合
takeOrdered()返回该RDD排序后前n个元素组成的数组
saveAsObjectFile(path) 序列化成对象保存到文件
1. RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。
代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
2.RDD的五大特性
(1)RDD是由一系列partition组成(block块对应partition),textFile底层调用的是MR读取hdfs上的数据的方法默认一个block块对应一个split,split的大小和block大小一致,可以自己调整
(2)函数作用在每一个partition(split)上
(3)RDD之间有一系列的依赖关系(容错机制)
(4)分区器作用在K,V格式的RDD上
(5)RDD 提供一系列最佳的计算位置
- package com.atguigu.createrdd;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
-
- import java.util.Arrays;
- import java.util.List;
- import java.util.function.Consumer;
-
- public class Test01_List {
- public static void main(String[] args) {
- //创建配置对象
- SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
- //创建sparkContext
- JavaSparkContext sc = new JavaSparkContext(conf);
- //使用sc编写代码
- JavaRDD<String> javaRDD = sc.parallelize(Arrays.asList("hello", "spark"));
-
- List<String> result = javaRDD.collect();
- //遍历打印
- for (String s : result) {
- System.out.println(s);
- }
- //函数式写法
- result.forEach(new Consumer<String>() {
- @Override
- public void accept(String s) {
- System.out.println("值为 " + s);
- }
- });
-
-
- //lambda表达式:
- //1.箭头左边给传入的参数起名字,不用写类型
- //2.箭头右边编写方法体,如果方法题有多行,使用{}括起来,
- //3. 如果有返回值 不写return 使用最后一行代码的结果作为返回值
- result.forEach(s ->{
- System.out.println("hi");
- System.out.println("值为 " + s);
- });
-
- // lambda表达式进一步简化
- result.forEach(System.out::println);
- //关闭sc
- sc.close();
- }
- }
- package com.atguigu.createrdd;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
-
- import java.util.List;
- import java.util.function.Consumer;
-
- public class Test02_FromFile {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<String> lineRDD = sc.textFile("input/2.txt");
-
- List<String> collect = lineRDD.collect();
- //将结果保存到文件中 有一个文件就是对应一个分区
- lineRDD.saveAsTextFile("output");
-
- collect.forEach(System.out::println);
-
- lineRDD.collect().forEach(new Consumer<String>() {
- @Override
- public void accept(String s) {
- System.out.println("---------");
- System.out.println(s);
- }
- });
-
- lineRDD. collect().forEach(System.out::println);
- //关闭sc
- sc.stop();
- }
- }
关于分区都不会shuffle,一个shuffle会有一个读和写的过程,只有一个写不走shuffle
默认环境的核数,可自行设置参数
- package com.atguigu.createrdd;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.util.Utils;
-
- import java.io.File;
- import java.util.Arrays;
-
- public class Test03_ListPartition {
- public static void main(String[] args) throws InterruptedException {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- //默认的分区个数 环境核数
- //可以手动填写参数修改分区
- JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1,2,3,4,5),2);
-
- // 多余的数据会放到后面一个分区,比如现在5个元素,12在一个分区,345在另一个分区
- // 将结果保存到文件中 有一个文件就是对应一个分区
- Utils.deleteRecursively(new File("output"));
- integerJavaRDD.saveAsTextFile("output");
-
- //关闭sc
- sc.stop();
- }
- }
- package com.atguigu.createrdd;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
-
- public class Test04_FilePartition {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- //默认填写最小分区数为2 min(2,环境的核数)
- JavaRDD<String> stringJavaRDD = sc.textFile("input/1.txt",2);
-
- //spark的读文件使用了老版本的hadoop代码
- // 具体的分区个数需要经过公式计算
- // 首先获取文件的总长度 totalSize
- // 计算平均长度 goalSize = totalSize / numSplits
- // 获取块大小 128M
- // 计算切分大小 splitSize = Math.max(minSize, Math.min(goalSize, blockSize));
- // 最后使用splitSize 按照1.1倍原则切分整个文件 得到几个分区就是几个分区
-
- // 实际开发中 只需要看文件总大小 / 填写的分区数 和块大小比较 谁小拿谁进行切分
- lineRDD.saveAsTextFile("output");
-
- // 数据会分配到哪个分区
- // 如果切分的位置位于一行的中间 会在当前分区读完一整行数据
-
- stringJavaRDD.saveAsTextFile("output");
-
- //关闭sc
- sc.stop();
- }
- }
参数f是一个函数可以写作匿名子类,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。即,这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。
- package com.atguigu.value;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import scala.Tuple2;
-
- import java.util.Arrays;
-
- public class Test01_Map {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
-
- //使用map转换
- //匿名函数的写法
- //填写的泛型 填一个为传入参数的类型 后一个为返回值的类型(同时作为下游RDD 的元素类型)
- JavaRDD<Integer> mapRDD = intRDD.map(new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer v1) throws Exception {
- return v1 * 2;
- }
- });
- mapRDD. collect().forEach(System.out::println);
-
- //map是可以改变元素的类型的
- JavaRDD<Tuple2<String, Integer>> tuple2JavaRDD1 = intRDD.map(new Function<Integer, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> call(Integer v1) throws Exception {
- return new Tuple2<>("值为:", v1);
- }
- });
-
- JavaRDD<Tuple2<String, Integer>> tuple2JavaRDD = intRDD.map(new Function<Integer, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> call(Integer v1) throws Exception {
- return new Tuple2<>("值为:", v1);
- }
- });
-
- intRDD. collect().forEach(System.out::println);
-
- //关闭sc
- sc.stop();
- }
- }
功能和map一样都是处理每个元素,Map是一次处理一个元素,而mapPartitions一次处理一个分区数据。
- package com.atguigu.value;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.util.Utils;
-
- import java.io.File;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Iterator;
-
- public class Test02_MapPartitions {
- public static void main(String[] args) throws InterruptedException {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5,6,7,8,9), 4);
- //int值 * 2
- //第二个泛型返回结果RDD的元素类型
- JavaRDD<Double> mapPartitionsRDD = intRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Double>() {
- @Override
- public Iterator<Double> call(Iterator<Integer> integerIterator) throws Exception {
- //将一个分区的数据全部收集到一个迭代器中 一次性处理所有的数据
- System.out.println("mapPartitions方法调用");
- ArrayList<Double> doubles = new ArrayList<>();
- while (integerIterator.hasNext()) {
- Integer next = integerIterator.next();
- doubles.add(next.doubleValue() * 2);
- }
- return doubles.iterator();
- }
- });
- mapPartitionsRDD. collect().forEach(System.out::println);
- //map操作
- JavaRDD<Double> mapRDD = intRDD.map(new Function<Integer, Double>() {
- @Override
- public Double call(Integer v1) throws Exception {
- System.out.println("map方法调用");
- return v1.doubleValue() * 2;
- }
- });
- System.out.println("----------------------------");
- mapRDD. collect().forEach(System.out::println);
- //需求是:将奇数的数字删除,偶数*2
- JavaRDD<Double> mapRDD1 = intRDD.map(new Function<Integer, Double>() {
- @Override
- public Double call(Integer v1) throws Exception {
- if (v1 % 2 == 1) {
- return null;
- } else {
- return v1.doubleValue() * 2;
- }
- }
- });
- mapRDD1. collect().forEach(System.out::println);
-
- JavaRDD<Double> mapPartitionsRDD1 = intRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Double>() {
- @Override
- public Iterator<Double> call(Iterator<Integer> integerIterator) throws Exception {
- ArrayList<Double> doubles = new ArrayList<>();
- while (integerIterator.hasNext()) {
- Integer next = integerIterator.next();
- if (next % 2 == 0) {
- doubles.add(next.doubleValue() * 2);
- }
- }
- return doubles.iterator();
- }
- });
- System.out.println("--------------------------");
- mapPartitionsRDD1. collect().forEach(System.out::println);
- JavaRDD<Integer> intMapPartitionsRDD = intRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
- @Override
- public Iterator<Integer> call(Iterator<Integer> integerIterator) throws Exception {
- ArrayList<Integer> integers = new ArrayList<>();
- while (integerIterator.hasNext()) {
- Integer next = integerIterator.next();
- if (next % 2 == 0) {
- integers.add(next * 2);
- }
- }
- return integers.iterator();
- }
- });
-
- Utils.deleteRecursively(new File("output"));//如果有output文件夹就会删除
- intMapPartitionsRDD.saveAsTextFile("output");
- //关闭sc
- sc.stop();
- }
- }
map一次处理一个元素,要求元素个数一点是1对1,不能删除元素
mapPartitions一次处理一个分区的数据,可以删除元素
如果资源充足,可以使用mapPartitions对map进行优化
如下为两个的结果
类似于mapPartitions,比mapPartitions多一个整数参数表示分区号
- package com.atguigu.value;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function2;
- import scala.Tuple2;
-
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Iterator;
-
- public class Test03_MapPartitionsWithIndex {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
- //函数逻辑和mapPartitions完全一致,只是传一个参数,多了一个分区号
- //第一个参数表示分区号,最后要的类型是
- // 需求:int -> (分区号,int)
- JavaRDD<Tuple2<Integer, Integer>> tuple2JavaRDD = parallelize.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<Tuple2<Integer, Integer>>>() {
- @Override
- public Iterator<Tuple2<Integer, Integer>> call(Integer v1, Iterator<Integer> v2) throws Exception {
- //v1表示分区号,v2表示元素值
- ArrayList<Tuple2<Integer, Integer>> tuple2s = new ArrayList<>();
- while (v2.hasNext()) {
- Integer next = v2.next();
- tuple2s.add(new Tuple2<>(v1, next));
- }
- return tuple2s.iterator();
- }
- }, false);
-
- tuple2JavaRDD. collect().forEach(System.out::println);
- //关闭sc
- sc.stop();
- }
- }
与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。
区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。
- package com.atguigu.value;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
-
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Iterator;
- import java.util.List;
-
- public class Test04_FlatMap {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- //集合嵌套的格式
- ArrayList<List<Integer>> lists = new ArrayList<>();
- lists.add(Arrays.asList(1, 2, 3));
- lists.add(Arrays.asList(4, 5, 6));
- lists.add(Arrays.asList(7, 8, 9));
- JavaRDD<List<Integer>> listRDD = sc.parallelize(lists,2);
- //flatMap()扁平化类似于hql的炸裂函数,就是将
- JavaRDD<Integer> flatRDD = listRDD.flatMap(new FlatMapFunction<List<Integer>, Integer>() {
- @Override
- public Iterator<Integer> call(List<Integer> integers) throws Exception {
- return integers.iterator();
- }
- });
- flatRDD. collect().forEach(System.out::println);
-
- //实际场景
- //文件里面是hello world hi nike james lakers list
- JavaRDD<String> lineRDD = sc.textFile("input/1.txt");
-
- JavaRDD<String> flatRDD1 = lineRDD.flatMap(new FlatMapFunction<String, String>() {
-
- @Override
- public Iterator<String> call(String s) throws Exception {
- String[] s1 = s.split(" ");//通过字符串切割出来数组
- List<String> strings = Arrays.asList(s1);//数组转化为集合
- return strings.iterator();
- }
- });
- flatRDD1. collect().forEach(System.out::println);
- //关闭sc
- sc.stop();
- }
- }
分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
- package com.atguigu.value;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.util.Utils;
-
- import java.io.File;
- import java.util.Arrays;
-
- public class Test05_GroupBy {
- public static void main(String[] args) throws InterruptedException {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- //需求:奇偶进行分组
- JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(7, 8, 10, 11, 25, 16), 2);
- //泛型1为传入参数,泛型2为分组依据
- //最终返回类型:
- // (根据分区依据计算的值(可以是任意类型的值),[根据分组依据将符合同一种的元素范进一个集合中])
- JavaPairRDD<Integer, Iterable<Integer>> groupByRDD = intRDD.groupBy(new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer integer) throws Exception {
- return integer % 3;
- }
- });
- groupByRDD. collect().forEach(System.out::println);
- //标记类型是可以改变的
- JavaPairRDD<String, Iterable<Integer>> groupByRDD1 = intRDD.groupBy(new Function<Integer, String>() {
- @Override
- public String call(Integer integer) throws Exception {
- return integer % 2 == 0 ? "偶数" : "奇数";
- }
- });
- JavaPairRDD<Integer, Iterable<Integer>> groupByRDD2 = intRDD.groupBy(new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer integer) throws Exception {
- return integer % 2 == 0 ? 2 : 5;
- }
- });
- groupByRDD1. collect().forEach(System.out::println);
- intRDD.groupBy((Function<Integer, String>) integer -> integer % 2 == 0 ? "偶数" : "奇数"). collect().forEach(System.out::println);
- Utils.deleteRecursively(new File("output"));
- groupByRDD.saveAsTextFile("output");
- //关闭sc
- sc.stop();
- }
- }
接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中
- package com.atguigu.value;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
-
- import java.util.Arrays;
-
- public class Test06_Filter {
- public static void main(String[] args) throws InterruptedException {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
- //一个元素过滤一次
- //相当于hql中的where,返回值只能是Boolean类型
- JavaRDD<Integer> filterRDD = intRDD.filter(new Function<Integer, Boolean>() {
- @Override
- public Boolean call(Integer integer) throws Exception {
- System.out.println("调用方法了");
- return integer % 2 == 0;
- }
- });
- filterRDD. collect().forEach(System.out::println);
- //关闭sc
- sc.stop();
- }
- }
-
- //代码结果
- //调用方法了
- //调用方法了
- //调用方法了
- //调用方法了
- //调用方法了
- //2
- //4
- //6
对内部的元素去重,并将去重后的元素放到新的RDD中。
- package com.atguigu.value;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
-
- import java.util.Arrays;
-
- public class Test07_Distinct {
- public static void main(String[] args) throws InterruptedException {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 3, 2, 4, 5, 6, 7, 5, 3), 2);
- //使用单个的distinct去重走shuffle的,去重的方式就是hash set
- // 所有数据要进内存,效率很高但是肯能内存溢出,oom错误
- //按照group by 进行分布式去重,效率比较低,需要走shuffle,不会内存溢出
- JavaRDD<Integer> distinct = intRDD.distinct();
- distinct. collect().forEach(System.out::println);
-
- //关闭sc
- sc.stop();
- }
- }
DAG有向无环图,中走shuffle的话正常是shuffle后有方法,但是这个distinct在shuffle前后都有是因为底层调了别的方法,map(),reduceByKey, 等,前面那个是distinct其实运行的是map()
Coalesce算子包括:配置执行Shuffle和配置不执行Shuffle两种方式。
缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
- package com.atguigu.value;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
-
- import java.util.Arrays;
-
- public class Test08_Coalesce {
- public static void main(String[] args) throws InterruptedException {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 7);
- //默认不走shuffle,可以设置参数Boolean型参数,false为不走shuffle
- JavaRDD<Integer> coalesceRDD = intRDD.coalesce(2);
- JavaRDD<Integer> coalesceRDD1 = intRDD.coalesce(2, false);
- // Utils.deleteRecursively(new File("output"));
- // coalesceRDD.saveAsTextFile("output");
- coalesceRDD. collect().forEach(System.out::println);
- //关闭sc
- sc.stop();
- }
- }
该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。分区规则不是hash,因为平时使用的分区都是按照hash来实现的,repartition一般是对hash的结果不满意,想要打散重新分区。
- package com.atguigu.value;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
-
- import java.util.Arrays;
-
- public class Test09_Repartition {
- public static void main(String[] args) throws InterruptedException {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 2);
- // 0 -> 1,2 1 -> 3,4 2 -> 5,6,7
- // 分区的规则就是打散 尽量把之前同一个分区的数据 放到不同的分区里面
- // 如果按照默认的hash分区 会发生数据倾斜 repartition重新打散分区
- // 0 -> 1 -> 1 2-> 2,5 3 -> 3,6 4 -> 4,7
- JavaRDD<Integer> repartitionRDD = intRDD.repartition(3);
- repartitionRDD. collect().forEach(System.out::println);
- //关闭sc
- sc.stop();
- }
- }
1)coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
2)repartition实际上是调用的coalesce,进行shuffle。源码如下:
- def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
- coalesce(numPartitions, shuffle = true)
- }
3)coalesce一般为缩减分区,如果扩大分区,不使用shuffle是没有意义的,repartition扩大分区执行shuffle。
10. sortBy()排序 走shuffle
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。Spark的排序结果是全局有序。
- package com.atguigu.value;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
-
- import java.util.Arrays;
-
- public class Test10_SortBy {
- public static void main(String[] args) throws InterruptedException {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(3, 3, 4, 5, 1, 7, 6, 8, 4, 2), 2);
- //(1)泛型为以谁作为标准排序 (2) true为正序 (3) 排序之后的分区个数
- //spark是全局排序
- //使用特殊分区器,range分区器
- //做法:先抽样,抽样中选出最大最小值,
- // 当分区为2的时候,取最大值最小值的中位数,为3个的时候三分数
- //比中位数大的分一个分区,小的放另一个,然后分桶,分桶后排序
- JavaRDD<Integer> sortByRDD = intRDD.sortBy(new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer v1) throws Exception {
- return v1;
- }
- }, true, 3);
- sortByRDD.collect().forEach(System.out::println);
- // Utils.deleteRecursively(new File("output"));
- // sortByRDD.saveAsTextFile("output");
- Thread.sleep(120000);
- //关闭sc
- sc.stop();
- }
- }
- package com.atguigu.keyvalue;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.PairFunction;
- import scala.Tuple2;
-
- import java.util.Arrays;
-
- public class Test01_PairRDD {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-
- //使用mapToPair()方法将JavaRDD转化为javaPairRDD
- //第一个泛型表示当前RDD的元素类型
- //第二个泛型表示k的类型
- //第三个表示value的类型
- JavaPairRDD<Integer, Integer> pairRDD = intRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
- return new Tuple2<>(integer, integer);
- }
- });
- pairRDD. collect().forEach(System.out::println);
- //直接创建JavaPairRDD的对象
- JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("b", 2)));
- pairRDD1. collect().forEach(System.out::println);
- //关闭sc
- sc.stop();
- }
- }
针对于(K,V)形式的类型只对V进行操作
- package com.atguigu.keyvalue;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import scala.Tuple2;
-
- import java.util.Arrays;
-
- public class Test02_MapValues {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- //javaRDD不能直接调mapValues
- JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("b", 2)));
- JavaPairRDD<String, String> mapValuesRDD = pairRDD1.mapValues(new Function<Integer, String>() {
- @Override
- public String call(Integer integer) throws Exception {
- return integer + "|||";
- }
- });
- mapValuesRDD. collect().forEach(System.out::println);
- //关闭sc
- sc.stop();
- }
- }
将RDD<K,V>中的K按照指定Partitioner重新进行分区;
如果原有的RDD和新的RDD分区器是一致的话就不进行分区,否则会产生Shuffle过程。
- package com.atguigu.keyvalue;
-
- import org.apache.spark.HashPartitioner;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.util.Utils;
- import scala.Tuple2;
-
- import java.io.File;
- import java.util.Arrays;
-
- public class Test03_PartitionBy {
- public static void main(String[] args) throws InterruptedException {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("b", 2)));
- //填写分区器进行分区,实现partitioner的类只有三个:
- // Hash、Range(一般不用,都是系统自己调用)、Python
- //一般都用HashPartitioner
- JavaPairRDD<String, Integer> partitionByRDD = pairRDD1.partitionBy(new HashPartitioner(2));
- JavaPairRDD<Integer, Integer> pairRDD2 = sc.parallelizePairs(Arrays.asList(new Tuple2<>(2, 1), new Tuple2<>(2, 2),new Tuple2<>(4, 3),new Tuple2<>(6, 4)));
- // 如果key为int值 hash值就是他自身 % 分区的个数
- JavaPairRDD<Integer, Integer> partitionByRDD1 = pairRDD2.partitionBy(new HashPartitioner(2));
- Utils.deleteRecursively(new File("output"));
- partitionByRDD1.saveAsTextFile("output");
- //关闭sc
- sc.stop();
- }
- }
要实现自定义分区器,需要继承org.apache.spark.Partitioner类,并实现下面三个方法。
(1)numPartitions: Int:返回创建出来的分区数。
(2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。
(3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同。
- package com.atguigu.keyvalue;
-
- import org.apache.spark.Partitioner;
-
- public class CustomPartitioner extends Partitioner {
- Integer num = 0;
- public CustomPartitioner( int n ) {
- num = n;
- }
-
- @Override
- public int numPartitions() {
- return num;
- }
-
- @Override
- public int getPartition(Object key) {
- //根据需求,但是必须按照key
- // 如果是int值 按照对num取模
- // 不为int值 发送到0号分区
- if (key instanceof Integer) {
- return (Integer) key % num;
- }
- return 0;
- }
- }
使用该分区器
- package com.atguigu.keyvalue;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.util.Utils;
- import scala.Tuple2;
-
- import java.io.File;
- import java.util.Arrays;
-
- public class Test04_CustomPartitioner {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaPairRDD<Integer, Integer> pairRDD2 = sc.parallelizePairs(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 2),new Tuple2<>(4, 3),new Tuple2<>(6, 4)));
-
- JavaPairRDD<Integer, Integer> partitionByRDD1 = pairRDD2.partitionBy(new CustomPartitioner(3));
- Utils.deleteRecursively(new File("output"));
- partitionByRDD1.saveAsTextFile("output");
- //关闭sc
- sc.stop();
- }
- }
groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。
该操作可以指定分区器或者分区数(默认使用HashPartitioner)
- package com.atguigu.keyvalue;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import scala.Tuple2;
-
- import java.util.Arrays;
-
- public class Test05_GroupByKey {
- public static void main(String[] args) throws InterruptedException {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 2),new Tuple2<>("hello", 11), new Tuple2<>("world", 21),new Tuple2<>("hello", 14), new Tuple2<>("world", 22)));
- JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD1.groupByKey();
- groupByKeyRDD. collect().forEach(System.out::println);
-
- JavaPairRDD<String, Integer> mapValuesRDD = groupByKeyRDD.mapValues(new Function<Iterable<Integer>, Integer>() {
- @Override
- public Integer call(Iterable<Integer> v1) throws Exception {
- Integer sum = 0;
- for (Integer integer : v1) {
- sum += integer;
- }
- return sum;
- }
- });
- mapValuesRDD. collect().forEach(System.out::println);
- //关闭sc
- sc.stop();
- }
- }
该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。
- package com.atguigu.keyvalue;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function2;
- import scala.Tuple2;
-
- import java.util.Arrays;
-
- public class Test06_ReduceByKey {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
-
- JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 2),new Tuple2<>("hello", 11), new Tuple2<>("world", 21),new Tuple2<>("hello", 14), new Tuple2<>("world", 22)));
- //reduceByKey = groupByKey + mapValues
- JavaPairRDD<String, Integer> reduceByKeyRDD = pairRDD1.reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- //v1为当前累计的结果,v2为当前的元素值
- //第一个sum的值取的就是第一个elem,elem是从第二个开始循环的
- public Integer call(Integer sum, Integer elem) throws Exception {
- // 第一次计算的时候 sum = 第一个元素 elem = 第二个元素
- return sum += elem;
- }
- });
- reduceByKeyRDD. collect().forEach(System.out::println);
-
- //关闭sc
- sc.stop();
- }
- }
1)reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。
2)groupByKey:按照key进行分组,直接进行shuffle。
3)开发指导:在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。影响业务逻辑时建议先对数据类型进行转换再合并。
reduceByKey()求平均值
- package com.atguigu.keyvalue;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.Function2;
- import scala.Tuple2;
-
- import java.util.Arrays;
-
- public class Test07_reduceByKeyAvg {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 2),new Tuple2<>("hello", 11), new Tuple2<>("world", 21),new Tuple2<>("hello", 14), new Tuple2<>("world", 22)),2);
- //如果需要求平均值 需要两个数据 (sum,count)
- //value -> (value,1) 表示当前的数据和为value 值的个数为1
- JavaPairRDD<String, Tuple2<Integer, Integer>> tuple2RDD = pairRDD1.mapValues(new Function<Integer, Tuple2<Integer, Integer>>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer v1) throws Exception {
- return new Tuple2<>(v1, 1);
- }
- });
- tuple2RDD. collect().forEach(System.out::println);
- System.out.println("---------------------");
- // 之后使用reduceByKey聚合数据
- JavaPairRDD<String, Tuple2<Integer, Integer>> pairRDD = tuple2RDD.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
- @Override
- public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> res, Tuple2<Integer, Integer> elem) throws Exception {
- // 将sum值相加 同时将count值相加
- return new Tuple2<>(res._1 + elem._1, res._2 + elem._2);
- }
- });
- pairRDD. collect().forEach(System.out::println);
- System.out.println("---------------");
-
- // 对value值使用sum / count
- JavaPairRDD<String, Double> resultRDD = pairRDD.mapValues(new Function<Tuple2<Integer, Integer>, Double>() {
- @Override
- public Double call(Tuple2<Integer, Integer> v1) throws Exception {
- return v1._1.doubleValue() / v1._2;
- }
- });
- resultRDD. collect().forEach(System.out::println);
-
- //关闭sc
- sc.stop();
- }
- }
- package com.atguigu.keyvalue;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.PairFunction;
- import scala.Tuple2;
-
- import java.util.Arrays;
-
- public class Test08_SortByKey {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 2),new Tuple2<>("hi", 11), new Tuple2<>("you", 21),new Tuple2<>("i", 14), new Tuple2<>("ok", 22)),2);
- JavaPairRDD<String, Integer> sortByKeyRDD = pairRDD.sortByKey();
- sortByKeyRDD. collect().forEach(System.out::println);
-
- //需求:按照后面数字大小排序
- //把需要排序的标记放在key的位置上
- JavaPairRDD<Integer, Tuple2<String, Integer>> pairRDD1 = pairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<Integer, Tuple2<String, Integer>> call(Tuple2<String, Integer> v1) throws Exception {
- return new Tuple2<>(v1._2, v1);
- }
- });
- System.out.println("-----------");
- pairRDD1. collect().forEach(System.out::println);
-
- JavaPairRDD<Integer, Tuple2<String, Integer>> pairRDD2 = pairRDD1.sortByKey();
-
- JavaPairRDD<String, Integer> resultRDD = pairRDD2.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<String, Integer>>, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(Tuple2<Integer, Tuple2<String, Integer>> v1) throws Exception {
- return v1._2;
- }
- });
- System.out.println("-------------");
- resultRDD. collect().forEach(System.out::println);
- //关闭sc
- sc.stop();
- }
- }
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
- package com.atguigu.keyvalue;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import scala.Tuple2;
-
- import java.util.Arrays;
-
- public class Test09_Join {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
-
- JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>(1, "a"), new Tuple2<>(2, "b"), new Tuple2<>(4, "m")));
-
- JavaPairRDD<Integer, String> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>(1, "p"), new Tuple2<>(2, "o"), new Tuple2<>(4, "i"), new Tuple2<>(6, "i")));
- // 相当于等值连接 rdd1.k = rdd2.k
- JavaPairRDD<Integer, Tuple2<String, String>> joinRDD = pairRDD.join(pairRDD1);
- joinRDD. collect().forEach(System.out::println);
- //关闭sc
- sc.stop();
- }
- }
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD。
操作两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。
- package com.atguigu.keyvalue;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import scala.Tuple2;
-
- import java.util.Arrays;
-
- public class Test10_Cogroup {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>(1, "a"), new Tuple2<>(2, "b"), new Tuple2<>(4, "m"),new Tuple2<>(1, "f")));
- JavaPairRDD<Integer, String> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>(1, "p"), new Tuple2<>(2, "o"), new Tuple2<>(4, "i"), new Tuple2<>(6, "i")));
- JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<String>>> result = pairRDD.cogroup(pairRDD1);
- // rdd1.k = rdd2.k
- // (k,(集合[rdd1的元素,],集合[rdd2的元素]))
- result. collect().forEach(System.out::println);
- //关闭sc
- sc.stop();
- }
- }
在驱动程序中,以数组Array的形式返回数据集的所有元素。所有的数据都会被拉取到Driver端,容易造成内存溢出,慎用。collect是按照分区的编号从0号开始一次拉取到driver
- package com.atguigu.action;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
-
- import java.util.Arrays;
- import java.util.List;
-
- public class Test01_Collect {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
- // 按照分区的编号从0号分区开始拉取数据到driver
- // 实际开发不会使用collect 因为会将所有的数据都拉取到driver端 容易内存溢出
- List<Integer> collect = intRDD.collect();
- collect.forEach(a -> System.out.println(a));
-
- //关闭sc
- sc.stop();
- }
- }
- package com.atguigu.action;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
-
- import java.util.Arrays;
- import java.util.List;
-
- public class Test02_Count {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(4, 5, 1, 2, 3), 6);
- long count = intRDD.count();
- System.out.println(count);//结果为5
-
- // first 返回0号分区的第一个元素 如果没有顺位往下
- Integer first = intRDD.first();
- System.out.println(first);//结果为4
-
- // take取前n个元素
- List<Integer> take = intRDD.take(2);
- System.out.println(take);//[4,5]
-
- List<Integer> takeOrderedRDD = intRDD.takeOrdered(2);
- System.out.println(takeOrderedRDD);//[1,2]正序排序后取前两个
- //关闭sc
- sc.stop();
- }
- }
功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
功能说明:用于将RDD中的元素序列化成对象,存储到文件中
- package com.atguigu.action;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import scala.Tuple2;
-
- import java.util.Arrays;
- import java.util.Map;
-
- public class Test03_CountByKey {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaPairRDD<String, Integer> pairRDD1 = sc.parallelizePairs(Arrays.asList(new Tuple2<>("hello", 10), new Tuple2<>("world", 20),new Tuple2<>("hello", 10), new Tuple2<>("world", 20),new Tuple2<>("hello", 10), new Tuple2<>("world", 20)),2);
- Map<String, Long> countByKeyRDD = pairRDD1.countByKey();
- System.out.println(countByKeyRDD);//{hello=3, world=3}
- // 保存为文本文件 能够直接读懂
- pairRDD1.saveAsTextFile("output");
- //保存为对象格式,无法读懂
- pairRDD1.saveAsObjectFile("output1");
- //关闭sc
- sc.stop();
- }
- }
- package com.atguigu.action;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.VoidFunction;
-
- import java.util.Arrays;
-
- public class Test04_Foreach {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 6);
-
- parallelize.foreach(new VoidFunction<Integer>() {
- @Override
- public void call(Integer integer) throws Exception {
- System.out.println(integer);
- }
- });//3,5,4,6,2,1
-
- //关闭sc
- sc.stop();
- }
- }
本地Spark程序调试需要使用Local提交模式,即将本机当做运行环境,Master和Worker都为本机。运行时直接加断点调试即可。
- package com.atguigu;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFlatMapFunction;
- import org.apache.spark.api.java.function.PairFunction;
- import scala.Tuple2;
-
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Iterator;
-
- public class WordCount {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("yarn");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<String> javaRDD = sc.textFile(args[0]);
-
- //将读进来的每一行数据进行切分,然后同过flatMap进行打散
- JavaRDD<String> wordRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String s) throws Exception {
- String[] s1 = s.split(" ");
- return Arrays.asList(s1).iterator();
- }
- });
-
- //将javaRDD转化为JavaPairRDD
- JavaPairRDD<String, Integer> wordPairRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) throws Exception {
- return new Tuple2<>(s, 1);
- }
- });
-
- //等同于flatMap + mapToPair
- JavaPairRDD<String, Integer> wordPairRDD1 = wordRDD.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
- @Override
- public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
- ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>();
- String[] words = s.split(" ");
- for (String word : words) {
- tuple2s.add(new Tuple2<>(word, 1));
- }
- return tuple2s.iterator();
- }
- });
-
- //使用reduceByKey()聚合
- JavaPairRDD<String, Integer> result = wordPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer v1, Integer v2) throws Exception {
- return v1 + v2;
- }
- });
-
- result.saveAsTextFile(args[1]);
-
- //关闭sc
- sc.stop();
- }
- }
进行打包
在HDFS上创建上传的路径,将需要WordCount的文件上传上去
然后执行(记得启动Hadoop)
[atguigu@hadoop102 spark-yarn]$ bin/spark-submit \
--class com.atguigu.spark.WordCount \
--master yarn \
./WordCount.jar \
/input \
/output
(1)需求:省份广告被点击Top3
(2)数据准备:时间戳,省份,城市,用户,广告,中间字段使用空格分割。
- package com.atguigu;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import scala.Tuple2;
-
- import java.util.ArrayList;
- import java.util.Map;
- import java.util.TreeMap;
-
- public class Top3 {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<String> lineRDD = sc.textFile("input/agent.log");
-
- // 将一整行字符串 1516609143867 山西 7 64 A
- // 转换为 key为(省份,广告) value 为 1
- JavaPairRDD<Tuple2<String, String>, Integer> tuple2RDD = lineRDD.mapToPair(new PairFunction<String, Tuple2<String, String>, Integer>() {
- @Override
- public Tuple2<Tuple2<String, String>, Integer> call(String s) throws Exception {
- String[] s1 = s.split(" ");
- Tuple2<String, String> key = new Tuple2<>(s1[1], s1[4]);
- return new Tuple2<>(key, 1);
- }
- });
-
- //将省份广告id相同的聚合
- JavaPairRDD<Tuple2<String, String>, Integer> reduceByKeyRDD = tuple2RDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer v1, Integer v2) throws Exception {
- return v1 + v2;
- }
- });// 已经完成省份+广告的点击次数统计 ((省份,广告),sum)
-
- // 转换格式 将省份作为key value为 (广告,sum) 然后聚合
- JavaPairRDD<String, Tuple2<String, Integer>> stringTuple2JavaPairRDD = reduceByKeyRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Tuple2<String, String>, Integer> v1) throws Exception {
- String province = v1._1._1;
- String id = v1._1._2;
- Integer sum = v1._2;
- return new Tuple2<>(province, new Tuple2<>(id, sum));
- }
- });
-
- //将同一个省份的数据聚合
- JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> provinceRDD = stringTuple2JavaPairRDD.groupByKey();
-
- //找出单个省份点击次数的top3
- JavaPairRDD<String, ArrayList<Tuple2<String, Integer>>> result = provinceRDD.mapValues(new Function<Iterable<Tuple2<String, Integer>>, ArrayList<Tuple2<String, Integer>>>() {
- @Override
- public ArrayList<Tuple2<String, Integer>> call(Iterable<Tuple2<String, Integer>> v1) throws Exception {
- //将二元组的数据按照value进行排序
- //创建treeMap,把需要排序的当作key
- TreeMap<Integer, String> treeMap = new TreeMap<>();
- for (Tuple2<String, Integer> tuple2 : v1) {
- Integer key = tuple2._2;
- if (treeMap.containsKey(key)) {
- //当treeMap中存在相同次数的广告id
- //将广告id进行拼接
- treeMap.put(key, treeMap.get(key) + "_" + tuple2._1);
- } else {
- //当前map中不存在相同次数的广告id
- treeMap.put(key, tuple2._1);
- }
- }
- // 放入之后已经进行排序了 只需要找出前三即可
- ArrayList<Tuple2<String, Integer>> result = new ArrayList<>();
- // 如果总数据没有了 或者 结果够3个了 停止循环
- while (treeMap.size() > 0 && result.size() < 3) {
- //treeMap的lastEntry()方法会返回key值最大的键值对
- Map.Entry<Integer, String> entry = treeMap.lastEntry();
- //将次数相同的广告id遍历
- String[] ids = entry.getValue().split("_");
- for (String id : ids) {
- result.add(new Tuple2<>(id, entry.getKey()));
- }
- // 取出一个数据 用完之后删除
- treeMap.remove(entry.getKey());
- }
- return result;
- }
- });
-
- result. collect().forEach(System.out::println);
-
- //关闭sc
- sc.stop();
- }
- }
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。
- package com.atguigu.serializable;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
-
- import java.util.Arrays;
-
- public class Test01_Ser {
- public static void main(String[] args) {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- //如果User不继承Serializable该代码会报错
- //自定义的rdd初始化是在driver端进行的,
- // 实际运行程序实在executor端进行,就涉及到网络通信,所以需要序列化
- User zhangsan = new User("zhangsan", 19);
- User lisi = new User("lisi", 18);
-
- JavaRDD<User> userRDD = sc.parallelize(Arrays.asList(zhangsan, lisi));
-
- JavaRDD<User> result = userRDD.map(new Function<User, User>() {
- @Override
- public User call(User v1) throws Exception {
- return new User(v1.getName(), v1.getAge() + 1);
- }
- });
- result. collect().forEach(System.out::println);
- //关闭sc
- sc.stop();
- }
- }
Java的序列化能够序列化任何的类。但是比较重,序列化后对象的体积也比较大。
Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
WordCount的 .toDebugString 方法调用后的结果
(2) ShuffledRDD[5] at reduceByKey at WordCount1.java:57 []
+-(2) MapPartitionsRDD[3] at mapToPair at WordCount1.java:36 []
| MapPartitionsRDD[2] at flatMap at WordCount1.java:27 []
| input/2.txt MapPartitionsRDD[1] at textFile at WordCount1.java:24 []
| input/2.txt HadoopRDD[0] at textFile at WordCount1.java:24 []
窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一or多对一),窄依赖我们形象的比喻为独生子女。
宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖(只能是一对多),会引起Shuffle,总结:宽依赖我们形象的比喻为超生。
具有宽依赖的transformations包括:sort、reduceByKey、groupByKey、join和调用rePartition函数的任何操作。
宽依赖对Spark去评估一个transformations有更加重要的影响,比如对性能的影响。
在不影响业务要求的情况下,要尽量避免使用有宽依赖的转换算子,因为有宽依赖,就一定会走shuffle,影响性能。
1)DAG有向无环图
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。
2)任务运行的整体流程
3)RDD任务切分中间分为:Application、Job、Stage和Task
(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
注意:Application->Job->Stage->Task每一层都是1对n的关系
RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
Cache底层调用的就是Persist方法,缓存级别默认用的是MEMORY_ONLY
RDD.cache()
Persist方法可以更改存储级别,使用Persist方法有以下的参数,disk表示存储再磁盘,memory表示存储再内存中,最后的2表示两份数据
存储再内存中的数据,有可能会因为运行算子内存不够的时候,会被清掉,
两个方法都不会影响血缘关系
NONE
DISK_ONLY
DISK_ONLY_2
MEMORY_ONLY
MEMORY_ONLY_2
MEMORY_ONLY_SER
MEMORY_ONLY_SER_2
MEMORY_AND_DISK
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER_2
OFF_HEAP
- package com.atguigu.cache;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFlatMapFunction;
- import org.apache.spark.storage.StorageLevel;
- import scala.Tuple2;
-
- import java.util.ArrayList;
- import java.util.Iterator;
-
- public class Test01_Cache {
- public static void main(String[] args) throws InterruptedException {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- //编写代码
- JavaRDD<String> lineRDD = sc.textFile("input/2.txt");
-
- JavaPairRDD<String, Integer> wordPairRDD = lineRDD.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
- @Override
- public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
- ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>();
- String[] words = s.split(" ");
- for (String word : words) {
- tuple2s.add(new Tuple2<>(word, 1));
- }
- return tuple2s.iterator();
- }
- });
- // wordPairRDD.cache();
- wordPairRDD.persist(StorageLevel.MEMORY_ONLY());
-
- wordPairRDD.persist(StorageLevel.MEMORY_ONLY());
-
- JavaPairRDD<String, Integer> resultRDD = wordPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer v1, Integer v2) throws Exception {
- return v1 + v2;
- }
- });
- resultRDD. collect().forEach(System.out::println);
- System.out.println(resultRDD.toDebugString());
- resultRDD. collect().forEach(System.out::println);
- // Thread.sleep(120000);
- //关闭sc
- sc.stop();
- }
- }
1)检查点:是通过将RDD中间结果写入磁盘。
2)为什么要做检查点?
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
3)检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统
4)检查点数据存储格式为:二进制的文件
5)检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。
6)检查点触发时间:对RDD进行Checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍。
- package com.atguigu.cache;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.PairFlatMapFunction;
- import scala.Tuple2;
-
- import java.util.ArrayList;
- import java.util.Iterator;
-
- public class Test02_CheckPoint {
- public static void main(String[] args) throws InterruptedException {
- //创建spark配置对象
- SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");
- //创建sc环境
- JavaSparkContext sc = new JavaSparkContext(conf);
- sc.setCheckpointDir("ck");
- //编写代码
- JavaRDD<String> lineRDD = sc.textFile("input/2.txt");
-
- JavaPairRDD<String, Integer> wordPairRDD = lineRDD.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
- @Override
- public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
- ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>();
- String[] words = s.split(" ");
- for (String word : words) {
- tuple2s.add(new Tuple2<>(word, 1));
- }
- return tuple2s.iterator();
- }
- });
- //加上cache()方法发后,就是3个job
- wordPairRDD.cache();
- //如果只使用checkpoint()方法的话,会有4个job
- //因为checkpoint会自己再计算一边
- wordPairRDD.checkpoint();
-
- wordPairRDD. collect().forEach(System.out::println);
- wordPairRDD. collect().forEach(System.out::println);
- wordPairRDD. collect().forEach(System.out::println);
-
- Thread.sleep(120000);
-
- //关闭sc
- sc.stop();
- }
Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。
1)注意:
(1)只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。