赞
踩
目录
■ join、leftOuterJoin、rightOuterJoin、fullOuterJoin
Spark计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
接下来我们一起看看这三大数据结构是如何在数据处理中使用的。
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。源代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
Spark在计算时,是使用分区函数对每一个分区进行计算
RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个RDD建立依赖关系
当数据为KV类型数据时,可以通过设定分区器自定义数据的分区
从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。
RDD是Spark框架中用于数据处理的核心模型,接下来我们看看,在Yarn环境中,RDD的工作原理:
1、启动Yarn集群环境
2、Spark通过申请资源创建调度节点和计算节点
3、Spark框架根据需求将计算逻辑根据分区划分成不同的任务
3、调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
从以上流程可以看出RDD在整个流程中主要用于将逻辑进行封装,并生成Task发送给Executor节点执行计算,接下来我们就一起看看Spark框架中RDD是具体是如何进行数据处理的。
在Spark中创建RDD的创建方式可以分为四种:
一、从集合(内存)中创建RDD
从集合中创建RDD,Spark主要提供了两个方法:parallelize和makeRDD
- val sparkConf =new SparkConf().setMaster("local[*]").setAppName("spark")
- val sparkContext = new SparkContext(sparkConf)
-
- val rdd1 = sparkContext.parallelize(List(1,2,3,4))
- val rdd2 = sparkContext.makeRDD(List(1,2,3,4))
-
- rdd1.collect().foreach(println)
- rdd2.collect().foreach(println)
-
- sparkContext.stop()
从底层代码实现来讲,makeRDD方法其实就是parallelize方法
- def makeRDD[T: ClassTag](
- seq: Seq[T],
- numSlices: Int = defaultParallelism): RDD[T] = withScope {
- parallelize(seq, numSlices)
- }
二、从外部存储(文件)创建RDD
由外部存储系统的数据集创建RDD包括:本地的文件系统,所有Hadoop支持的数据集,比如HDFS、HBase等。
- val sparkConf =new SparkConf().setMaster("local[*]").setAppName("spark")
-
- val sparkContext = new SparkContext(sparkConf)
- val fileRDD: RDD[String] = sparkContext.textFile("input")
-
- fileRDD.collect().foreach(println)
- sparkContext.stop()
三、从其他RDD创建
主要是通过一个RDD运算完后,再产生新的RDD。详情请参考后续章节
四、直接创建RDD(new)
使用new的方式直接构造RDD,一般由Spark框架自身使用。
默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建RDD时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。
- val sparkConf =new SparkConf().setMaster("local[*]").setAppName("spark")
-
- val sparkContext = new SparkContext(sparkConf)
- val dataRDD: RDD[Int] =sparkContext.makeRDD(List(1,2,3,4),4) //默认本机总核数
-
- val fileRDD: RDD[String] =sparkContext.textFile("input",2) //默认为2
-
- fileRDD.collect().foreach(println)
- sparkContext.stop()
1、读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的Spark核心源码如下:
- def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
- (0 until numSlices).iterator.map { i =>
-
- val start = ((i * length) / numSlices).toInt
- val end = (((i + 1) * length) / numSlices).toInt
- (start, end)
- }
- }
2、读取文件数据时,数据是按照Hadoop文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体Spark核心源码如下
- public InputSplit[] getSplits(JobConf job, int numSplits)
- throws IOException {
- long totalSize = 0; // compute total size
- for (FileStatus file: files) { // check we have valid files
- if (file.isDirectory()) {
- throw new IOException("Not a file: "+ file.getPath());
- }
- totalSize += file.getLen();
- }
-
- long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
- long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
- FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
- ...
- for (FileStatus file: files) {
- ...
- if (isSplitable(fs, path)) {
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(goalSize, minSize, blockSize);
- ...
- }
- protected long computeSplitSize(long goalSize, long minSize,
- long blockSize) {
- return Math.max(minSize, Math.min(goalSize, blockSize));
- }
RDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型
- --算子:map(形参):def map[U: ClassTag](f: T => U): RDD[U]
- 1. 作用:将处理的数据逐条进行映射处理,"类比scala中的map,对数据进行结构转换"
- 2. 形参:def map[U: ClassTag](f: T => U): RDD[U]
- 3. 基本使用如下:
-
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD并将每一个元素扩大两倍
- val value: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
-
- value.map((x: Int) => x * 2).collect().foreach(println)
- value.map((x: Int) => x + "").collect().foreach(println)
-
- //4.从服务器日志数据apache.log中获取用户请求URL资源路径
- val lineRDD: RDD[String] = sc.textFile("./input/apache.log")
-
- val urlRDD: RDD[(String, String)] = lineRDD.map(line => {
- val arr: Array[String] = line.split(" ")
- (arr(3), arr(6))
- })
-
- //5.打印
- urlRDD.collect().foreach(println)
-
- //关闭连接
- sc.stop()
- }
- -- 关于map算子的两个问题
-
- --问题1:分区的问题:RDD有分区列表,每个RDD都有相同的分区计算函数,那么新的RDD与旧的RDD的分区关系是什么?
- 默认分区的数量保持不变,数据会转换后输出。
-
- --问题2:Map中数据处理的顺序是怎么样的?
- 通过如下验证发现:
- a、分区内数据按照顺序依次执行,且第一条数据的所有逻辑执行完成以后再执行第二条数据,依次类推
- b、分区间的数据执行是没有顺序,而且无需等待,即分区间执行逻辑互不影响,各自执行各自的逻辑。
-
-
- 验证如下:
-
- //测试:新旧RDD分区的关系
- val rdd1: RDD[Int] = sc.makeRDD(list,2)
- val rddmap1: RDD[Int] = rdd1.map( num => num * 2})
- //将数据输出到本地文件中,查看分区数量及分区内的数据
- rdd1.saveAsTextFile("output1")
- rddmap1.saveAsTextFile("output")
-
- //测试:分区间的执行顺序
- val rddmap2: RDD[Int] = rdd1.map( num => {println("mapA ->" + num );num * 2})
- val rddmap3: RDD[Unit] = rddmap1.map(num => println("mapB ->" + num))
- //collect方法不会转换RDD,会触发作业的执行,所以将collect这样的方法称之为行动(action)算子
- rddmap3.collect()
- --map()算子问题:
- 在分区内每次只能获取一个数据,而且只有当前一个数据的所有逻辑执行完成以后才会执行下一个数据,这样一来,效率就相对比较慢。
-
- --引出了另外一个算子:mapPartitions(形参):def mapPartitions[U: ClassTag](
- f: Iterator[T] => Iterator[U],
- preservesPartitioning: Boolean = false): RDD[U]
-
- --1. 形参:(f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]
- 形参1:f: Iterator[T] => Iterator[U],是一个函数
- 函数的形参:一个迭代器,内容为一个分区中所有的数据;
- 函数的返回:分区内数据经过转换以后数据形成的"迭代器"。
- 参数2:暂时不管。
- --2. 返回结果:返回一个新的RDD
- --3. 算子的作用:
- 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行"任意的处理",哪怕是过滤数据
- --4. 与map()算子的不同点:
- map 算子是一个全量数据处理,不能丢失数据;
- mapPartitions 算子一次获取分区中所有的数据,那么可以执行迭代器所有的操作,如可以进行数据的过滤。
- --5. mapPartitions算子存在的问题
- 如果一个分区的数据没有处理完,那么该分区内所有的数据都不会释放,即使是前面已经处理完的数据也不会释放,
- 容易出现内存溢出。
- --6. map和mapPartitions()算子的选择:
- 如果内存空间足够大,为了提高效率时,推荐使用mapPartitions()算子
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val value: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
-
- //4.将每一个元素扩大两倍
- // value.mapPartitions(iter =>
- // iter.map(_ * 2)
- // ).collect().foreach(println)
-
- //5.获取每个数据分区的最大值
- //注意事项:
- //a.在计算过程中使用第三方存储框架时,使用mapPartitions代替map,减少连接的创建和释放
- //b.入参封装为一个迭代器,当我们需要多次遍历某个分区数据的时候,需要将迭代器转换为List
- value.mapPartitions(iter => {
- Iterator(iter.max)
- }).collect().foreach(println)
-
- //6.获取每个数据分区的最大值(2)
- value.mapPartitions(iter =>
- List(iter.max).iterator
- ).collect().foreach(println)
-
- //关闭连接
- sc.stop()
- }
- 函数签名
- --1. 算子:mapPartitionsWithIndex (形参):def mapPartitionsWithIndex[U: ClassTag](
- f: (Int, Iterator[T]) => Iterator[U],
- preservesPartitioning: Boolean = false): RDD[U]
-
- --2. 形参:(f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false)
- 形参1:f: (Int, Iterator[T]) => Iterator[U],是一个函数
- 函数的形参:
- 参数1:为分区号
- 参数2:为一个迭代器,内容为一个分区中所有的数据;
- 函数的返回:分区内每个数据经过转换以后数据形成的"迭代器"。
- 形参2:暂时不管。
-
- --3.算子的作用:将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,
- "在处理时同时可以获取当前分区索引"
-
-
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val value: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 3)
-
- //4.将每一个元素转换为元组,key是分区号
- value.mapPartitionsWithIndex((index, iter) => {
- iter.map(x => (index, x))
- }).collect().foreach(println)
-
- //5.获取第二个数据分区的数据
- value.mapPartitionsWithIndex(
- (index, iter) => {
- if (index == 1)
- iter
- else
- Nil.iterator
- }
- ).collect().foreach(println)
-
- //6.获取每个分区最大值以及分区号
- value.mapPartitionsWithIndex(
- (index,iter) => {
- List(index,iter.max).iterator
- }).collect().foreach(println)
-
- //关闭连接
- sc.stop()
- }
- --1. 算子:flatMap(形参):def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
- --2. 形参:(f: T => TraversableOnce[U]):是一个函数
- 函数的参数:分区内的一个一个的元素
- 返回值:经过映射以后将数据进行扁平化,返回一个可迭代的集合
- --3. 作用:和scala中的作用完全一致,映射扁平
-
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val listRDD: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4, 5)), 1)
- val anyRDD: RDD[Any] = sc.makeRDD(List(List(1, 2), 3, List(4, 5)), 1)
-
- //4.扁平操作 ==> RDD[Int](1,2,3,4,5)
- listRDD.flatMap(list => list).collect().foreach(println)
-
- anyRDD.flatMap(data => {
- data match {
- case list: List[_] => list
- case value: Int => List(value)
- }
- })
-
- //简写
- anyRDD.flatMap {
- case list: List[_] => list
- case value: Int => List(value)
- }.collect().foreach(println)
-
- //关闭连接
- sc.stop()
- }
- --1. 算子:glom(形参):def glom(): RDD[Array[T]]
- --2. 形参:空,无形参
- --3. 返回值:RDD[Array[T]],返回一个一个的数组,数组的数据来自同一个分区
- --4. 作用:将同一个分区内的数据转换成数组。
-
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val value: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
-
- //4.将每个分区的所有元素放入一个数组中进行返回
- // val arrRDD: RDD[Array[Int]] = value.glom()
- // //5.打印
- // arrRDD.collect().foreach(x => {
- // x.foreach(print)
- // println
- // })
-
- //6.计算所有分区最大值(一)
- value.glom().map(_.max)
- .collect()
- .foreach(println)
-
- //6.计算所有分区最大值(二)
- value.glom().flatMap(arr => List(arr.max).iterator).collect().foreach(println)
-
- //关闭连接
- sc.stop()
- }
- --1. 算子:groupBy(形参)
- --2. 形参:def groupBy[K](f: T => K,p:Partitioner)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
- 形参1:f: T => K:是一个函数
- 函数的形参为:数据集中的一个一个的元素
- 返回值为:返回分组的K
- 形参2:p:Partitioner,指设定下游的分区数量,如果不设置,则默认为旧RDD的分区数量
- --3. 算子的返回值:返回一个元组
- 元组的第一个元素:表示分组的Key
- 元组的第二个元素:表示相同的key形成可迭代的集合
- --4. 作用:将数据根据指定的规则进行分组。
- --5. 特点:
- a、分区默认不变
- b、不同分区的数据会被重新打乱进入到不同的分区中;
- c、我们将上游的分区数据打乱重新组合到下游的分区中,这个操作称之为shuffle
- d、极限情况下,所有的数据会被分到一个分区
- e、一个组的数据在一个分区,但是并不是说一个分区中只有一个组,
- "如当分组数量大于分区数量时,那么一个分区就可能有多个组"。
- --6. 存在的问题:
- groupby方法会导致数据重新组合以后不均匀
- --7. 解决方案:
- 通过传递参数,改变下游分区的数量。
-
- 举例:
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val value: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 5, 6, 5, 6, 8, 9, 1), 2)
-
- //4.按照奇数偶数进行分组
- val intToIter: RDD[(Int, Iterable[Int])] = value.groupBy(x => x % 2)
-
- //5.打印数据
- intToIter.collect().foreach(println)
-
- //6.计算WordCount
- value.groupBy(x => x).map { case (value, iter) =>
- (value, iter.size)
- }.collect().foreach(println)
-
- //关闭连接
- sc.stop()
- }
代码:
- /*
- 1.一个组的数据在一个分区,但是并不是说一个分区中只有一个组
- 奇偶分组,将数据分成两个组,结果文件中只有一个分区文件,分区文件中有两个分组。
- */
-
- val list = List(1,2,3,4,5,6,7,8)
- val rdd: RDD[Int] = sc.makeRDD(list,1)
- val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
- rdd1.saveAsTextFile("output")
-
- /*
- 2.当前有4个分区,奇偶分组只会有两个分组,所以结果文件中有4个分区文件,但是有两个分区分件中没有数据
- */
-
- val rdd: RDD[Int] = sc.makeRDD(list,4)
- val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
- rdd1.saveAsTextFile("output")
-
-
- /*
- 3.通过设置下游的分区数量解决分区无数据的情况,此时生成的结果文件只有两个分区
- */
- val rdd: RDD[Int] = sc.makeRDD(list,4)
- val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(((num :Int) => num % 2),2)
- rdd1.saveAsTextFile("output")
练习:
- // 小功能:将List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。
- val list = List("Hello", "hive", "hbase", "Hadoop")
- val rdd: RDD[String] = sc.makeRDD(list)
- val rdd1: RDD[(String, Iterable[String])] = rdd.groupBy(word => word.substring(0,1))
- println(rdd1.collect().mkString(","))
-
- // 小功能:从服务器日志数据apache.log中获取每个时间段访问量。
- val rdd: RDD[String] = sc.textFile("input/apache.log")
- val rdd1: RDD[String] = rdd.flatMap(str => {
- val datas: ArrayOps.ofRef[String] = str.split(" ")
- List(datas(3).substring(11, 13))
- })
- val rdd2: RDD[(String, Iterable[String])] = rdd1.groupBy(time=>time)
- println(rdd2.flatMap(data => List((data._1, data._2.size))).sortBy(_._1).collect().mkString(","))
-
- // 小功能:WordCount。
- val rdd: RDD[String] = sc.textFile("input/word1")
- val wordcount: String = rdd.flatMap(_.split(" ")).groupBy(word => word)
- .map(tuple => (tuple._1, tuple._2.size)).collect().mkString(",")
- println(wordcount)
- --1. 算子:Filter(形参):def filter(f: T => Boolean): RDD[T]
- --2. 形参:(f: T => Boolean):是一个函数,用法和scala中的fliter类似
- 函数的形参:RDD中数据集的一个一个的数据
- 返回值:ture或者false
- true:表示数据被保留下来
- false:表示数据被过滤掉
- --3. 作用:将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
- --4. 特点:
- a、分区不变
- b、分区内的数据可能不均衡,生产环境下,可能会导致数据倾斜
-
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val value: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
-
- //4.保留偶数
- value.filter(x => x % 2 == 0)
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
- }
- --1. 算子:Sample(形参):def sample(
- withReplacement: Boolean,
- fraction: Double,
- seed: Long = Utils.random.nextLong): RDD[T]
-
- --2. 形参: 有三个参数:
- 参数1:withReplacement: Boolean,表示数据从原数据集中抽取以后是否还放回
- 参数2:fraction: Double:和参数1配合一起使用,参数1的值不同,参数2表示含义不相同。
- 情况1:参数1为ture,表示抽取以后放回,此时参数2表示重复抽取的次数
- 情况2:参数1为false,表示抽取后不放回,此时参数2表示数据被抽取的几率。
- 说明:几率 != 返回数据集数量 / 原数据集的数量
- 参数3:seed: Long = Utils.random.nextLong:表示随机数的种子,可以确定数据抽取,可以理解
- 为数据的伪随机。所谓的随机是通过某种算法计算得来的,一旦设置了这个参数,每次获取的随机
- 数都是固定的。这个参数可选,如果没有设置,那么就是真的随机数,每次返回的结果集都可能是不一样的
- --3. 作用:
- 根据指定的规则从数据中抽取数据。
- --4. 使用场景:
- 在实际开发中,往往会出现数据倾斜的情况,那么可以从数据倾斜的分区中抽取部分数据,
- 通过抽取的数据,分析造成数据倾斜的原因。
- 函数签名
- def sample(
- withReplacement: Boolean,
- fraction: Double,
- seed: Long = Utils.random.nextLong): RDD[T]
-
- 函数说明
- 根据指定的规则从数据集中抽取数据
-
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val value: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2)
-
- //4.对数据进行抽样
- value.sample(false, 0.3)
- .collect()
- .foreach(println)
-
- value.sample(true, 0.3)
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
- }
-
- // 抽取数据不放回(伯努利算法)
- // 伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。
- // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
- // 第一个参数:抽取的数据是否放回,false:不放回
- // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
- // 第三个参数:随机数种子
- val dataRDD1 = dataRDD.sample(false, 0.5)
-
- // 抽取数据放回(泊松算法)
- // 第一个参数:抽取的数据是否放回,true:放回;false:不放回
- // 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数
- // 第三个参数:随机数种子
- val dataRDD2 = dataRDD.sample(true, 2)
- --1. 算子:distinct(形参):
- --2. 形参:有两个重载的方法:
- 方法1:无参
- 方法2:有一个参数:numPartitions: Int:用来改变去重以后的分区数量。
- --3. 作用:对数据进行去重操作
- 函数签名
- def distinct()(implicit ord: Ordering[T] = null): RDD[T]
- def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
-
- 函数说明
- 将数据集中重复的数据去重
-
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val value: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 1, 8, 3, 8), 2)
-
- //4.元素去重
- value.distinct()
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
- }
-
- --1. 算子:coalesce(形参)
- --2. 形参:有三个参数
- 参数1:numPartitions: Int:重置的分区数量
- 参数2:shuffle: Boolean = false:是否需要打乱数据,shuffle
- 参数3:partitionCoalescer: Option[PartitionCoalescer] = Option.empty
-
- --3. 作用:缩减分区数量
- --4. 说明:
- a、该算子重点在减少分区,我们在重置分区的个数的时候,参数值不要比原有分区数量多,因为"该算子默认是不会
- 打乱数据重新,没有shuffle",所以分区设置多了,多余的分区不会有数据。
- b、我们在使用这个算子的时候,只需要传递重置的分区数量即可,其他的参数使用默认值;
- c、如果想扩大分区,有新的算子可以实现,不过底层还是调用coalesce,只是将参数2设置为true
- --5. 应用场景:
- a、数据经过过滤以后,发现数据不均匀,使用这个算子来减少分区的数量
- b、数据分区设置的不合理,也可以使用这个方法。
- 函数签名
- def coalesce(numPartitions: Int, shuffle: Boolean = false,
- partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
- (implicit ord: Ordering[T] = null)
- : RDD[T]
-
- 函数说明
- 根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
- 当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本
-
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val valueRDD1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8), 4)
-
- //4.打印当前RDD的分区数
- println(s"valueRDD:${valueRDD1.getNumPartitions}")
-
- //5.缩减分区
- val valueRDD2: RDD[Int] = valueRDD1.coalesce(2, true)
- println(valueRDD2.getNumPartitions)
-
- //6.两个RDD的分区数据
- valueRDD1.mapPartitionsWithIndex((index, iter) => {
- iter.map(x => (index, x))
- }).collect().foreach(println)
-
- valueRDD2.mapPartitionsWithIndex((index, iter) => {
- iter.map(x => (index, x))
- }).collect().foreach(println)
-
- //关闭连接
- sc.stop()
- }
缩减分区不带Shuffle图解:
缩减分区带Shuffle图解:
- --1. 算子:rePartition(形参)
- --2. 底层逻辑:调用了coalesce算子,只是将shuffle的值改为了true,执行shuffle过程。
- def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
- coalesce(numPartitions, shuffle = true)
- }
- --3. 参数:numPartitions: Int:重新分区的数量
- --4. 作用:扩大分区,重分区
- --5. 说明:
- 这个参数即可以扩大分区,也可以缩小分区的数量,但是我们一般用来扩大分区。
- 缩小分区可以使用coalesce算子
- --6. coalesce 和 rePartition算子的使用选择
- a、如果是减少分区,那么就使用coalesce
- b、如果是扩大分区,那么就使用rePartition
- 函数签名
- def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
-
- 函数说明
- 该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。
-
- shuffle:指的是父RDD的某个分区中的数据被子RDD多个分区继承
-
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val valueRDD1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8), 4)
-
- //4.打印当前RDD的分区数
- println(s"valueRDD:${valueRDD1.getNumPartitions}")
-
- //5.扩大分区
- val valueRDD2: RDD[Int] = valueRDD1.repartition(8)
- println(valueRDD2.getNumPartitions)
-
- //6.两个RDD的分区数据
- valueRDD1.mapPartitionsWithIndex((index, iter) => {
- iter.map(x => (index, x))
- }).collect().foreach(println)
-
- valueRDD2.mapPartitionsWithIndex((index, iter) => {
- iter.map(x => (index, x))
- }).collect().foreach(println)
-
- //关闭连接
- sc.stop()
- }
- 我们常认为coalesce不产生shuffle会比repartition 产生shuffle效率高,而实际情况往往要根据具体问题具体分析,coalesce效率不一定高,有时还有大坑,大家要慎用。 coalesce 与 repartition 他们两个都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的实现(假设源RDD有N个分区,需要重新划分成M个分区)
-
- 1)如果N<M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true(repartition实现,coalesce也实现不了)。
-
- 2)如果N>M并且N和M相差不多,(假如N是1000,M是100)那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuff设置为false(coalesce实现),如果M>N时,coalesce是无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系,无法使文件数(partiton)变多。 总之如果shuffle为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDD的分区数变多的
-
- 3)如果N>M并且两者相差悬殊,这时你要看executor数与要生成的partition关系,如果executor数 <= 要生成partition数,coalesce效率高,反之如果用coalesce会导致(executor数-要生成partiton数)个excutor空跑从而降低效率。如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以将shuffle设置为true。
- val list = List(1, 2, 3, 5, 4, 12, 3, 1,6)
- val rdd: RDD[Int] = sc.makeRDD(list,3)
- val rdd1: RDD[Int] = rdd.distinct()
- rdd1.saveAsTextFile("output1")
- /*
- output1:
- 分区0:6 3 12
- 分区1:4 1
- 分区2:5 2
- */
-
- val rdd2: RDD[Int] = rdd1.coalesce(2)
- rdd2.saveAsTextFile("output2")
- /*
- output2:同一分区的数据还在一起
- 分区0:6 3 12 4 1
- 分区1:5 2
- */
-
- val rdd3: RDD[Int] = rdd1.repartition(2)
- rdd3.saveAsTextFile("output3")
- /*
- output3:数据从原来的分区打乱重组
- 分区0:6 12 1 5
- 分区1:3 4 2
- */
- --1. 算子:sortBy(形参)
- --2. 形参:参数有3个:
- 形参1:f: (T) => K:
- T:数据集中的每一个元素
- K:排序的K
- 形参2:ascending: Boolean = true
- 排序的方式,默认值为ture,为升序,
- 如果改为false,则是降序
- 形参3: numPartitions: Int = this.partitions.length
- 排序后的分区数量,默认值为前一个RDD的分区数量。
-
- --3. 作用:按照指定的规则进行排序
- 函数签名
- def sortBy[K](
- f: (T) => K,
- ascending: Boolean = true,
- numPartitions: Int = this.partitions.length)
- (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
-
- 函数说明
- 该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle的过程
-
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val valueRDD: RDD[Int] = sc.makeRDD(Array(1, 5, 3, 4, 2, 6, 9, 8), 4)
-
- //4.按照数据大小进行排序
- valueRDD.sortBy(x => x)
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
- }
双Value:表示是两个RDD之间进行操作,类似sacla中集合的并集(union)、交集(intersect)、差集(diff)、拉链(zip)
- --1. 算子:intersection
- --2. 作用:交集
- --3. 说明
- a、数据打乱重组,有shuffle过程;
- b、两个RDD的数据类型必须保持一致,否者编译时报错
- c、返回的RDD的分区数量保留两个RDD最大的分区数量
-
- 函数签名
- def intersection(other: RDD[T]): RDD[T]
-
- 函数说明
- 对源RDD和参数RDD求交集后返回一个新的RDD
-
- val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
- val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
- val dataRDD = dataRDD1.intersection(dataRDD2)
- --1. 算子:union
- --2. 作用:并集
- --3. 说明
- a、分区:分区合并
- b、数据:数据合并
- c、两个RDD的数据类型必须保持一致,否者编译不通过
-
- 函数签名
- def union(other: RDD[T]): RDD[T]
-
- 函数说明
- 对源RDD和参数RDD求并集后返回一个新的RDD
-
- val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
- val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
- val dataRDD = dataRDD1.union(dataRDD2)
- --1. 算子:subtract
- --2. 作用:差集
- --3. 说明
- a、分区:返回的RDD的分区数量等于调用这个方法的RDD的分区数量
- b、有数据打乱重组过程,有shuffle过程
- c、数据:返回当前RDD除去和参数RDD共同的数据集
- d、两个RDD的数据类型必须保持一致,否者编译时报错
-
- 函数签名
- def subtract(other: RDD[T]): RDD[T]
-
- 函数说明
- 以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集
-
- val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
- val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
- val dataRDD = dataRDD1.subtract(dataRDD2)
举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val valueRDD1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val valueRDD2: RDD[Int] = sc.makeRDD(Array(3, 4, 5, 6), 2)
-
- //4.并集
- valueRDD1.union(valueRDD2).collect().foreach(println)
-
- //5.交集
- println("***********")
- valueRDD1.intersection(valueRDD2).collect().foreach(println)
-
- //6.差集
- println("***********")
- valueRDD1.subtract(valueRDD2).collect().foreach(println)
-
- //关闭连接
- sc.stop()
- }
- --1. 算子:zip
- --2. 作用:拉链
- --3. 说明:
- a、分区数量相同,每个分区的数据量不相等,
- 报错:Can only zip RDDs with same number of elements in each partition
- 只有两个RDD的每个分区数据量相同才能拉链
- b、分区数量不相同,每个分区的数量量相同,
- 报错:Can't zip RDDs with unequal numbers of partitions
- RDD的分区数量不同不能拉链
- 综上:只要两个RDD的分区数量和每个分区数据量相等,才不会报错。
- c、返回的RDD的数据是元组
- 函数签名
- def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
-
- 函数说明
- 将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素。
- 注意:两个RDD数据类型可以不一致,但是两个RDD数据分区与两个RDD分区数据数量必须一致,否则报错
-
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val valueRDD1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val valueRDD2: RDD[String] = sc.makeRDD(Array("3", "4", "5", "6"), 2)
-
- //4.拉链操作
- val zipRDD: RDD[(Int, String)] = valueRDD1.zip(valueRDD2)
-
- //5.打印
- zipRDD.collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
- }
- --1.Spark中有很多方法都是基于Key进行操作,所以数据格式应该为键值对(对偶元素)才能使用这些方法
- --2.如果数据类型是kv类型,那么Spark会将RDD自动转换补充很多新的功能-->功能的扩展
- --3.那么是如果实现的?
- a、通过隐式转换
- b、如果数据类型为kv类型,在RDD的伴生对象中会将当前的RDD会转换为PairRDDFunctions对象
- c、如下的partitionBy就是来自PairRDDFunctions类中的方法
- --1. 算子:partitionBy(形参) def partitionBy(partitioner: Partitioner): RDD[(K, V)]
- --2. 形参:partitioner: Partitioner:是一个分区器对象。
- --3. 作用:根据指定的规则对数据进行分区,指定数据进入到哪一个分区。
-
- --4.当前能改变分区的算子有:
- groupBy、coalesce、rePartition -->改变分区的数据,但是并不能指定数据去到指定的分区
- 而partitionBy就是来处理将数据指定去到哪个分区。
-
- --5. 什么是分区器?
- a、Partitioner是一个抽象类,有两个抽象方法:
- 方法1:def numPartitions: Int --用来获取当前的分区数量
- 方法2:def getPartition(key: Any): Int --根据数据的key,返回数据所在的分区号
- b、Partitioner有三个实现类:
- 1.HashPartitioner
- 2.RangePartitioner
- 3.PythonPartitioner
- c、HashPartitioner:
- 1.是Spark默认的分区器
- 2.分区规则:将当前数据的Key的哈希值 % 分区数量
- 3.形参需要传递分区的数量。
- d、RangePartitioner:范围分区器,指定每个分区的key范围,在这个范围的key就进入这个分区
- 这需要key能比较大小。
-
- --6.注意事项:如果重分区的分区器和当前RDD的分区器相同,那么数据不会重新分区。
-
- --7.自定义分区器:
- 步骤:
- a、创建一个类,有一个分区数量的属性,然后extends Partitioner
- b、重写Partitioner中的两个抽象方法
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD: RDD[(String, Int)] = sc.makeRDD(Array(("a", 1), ("b", 1), ("c", 1), ("d", 1)), 2)
-
- //4.打印当前分区分配
- kvRDD.mapPartitionsWithIndex((index, iter) => {
- iter.map(x => (index, x))
- }).collect()
- .foreach(println)
-
- //5.重新分区
- kvRDD.partitionBy(new HashPartitioner(2))
- .mapPartitionsWithIndex((index, iter) => {
- iter.map(x => (index, x))
- }).collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
- }
- val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
- val sc = new SparkContext(sparkConf)
-
- val list = List((1, "a"), (2, "b"), (2, "c"))
- val rdd: RDD[(Int, String)] = sc.makeRDD(list, 3)
- // rdd.saveAsTextFile("output1")
- val rdd1: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(3))
- // rdd1.saveAsTextFile("output2")
-
- val list1 = List(
- ("cba", "消息1"),
- ("nba", "消息5"),
- ("wnba", "消息10"),
- ("cba", "消息2"),
- ("nba", "消息2"),
- ("wnba", "消息6"),
- ("cba", "消息1"),
- )
-
- val rddInfo: RDD[(String, String)] = sc.makeRDD(list1,2)
- val partitionRDD: RDD[(String, String)] = rddInfo.partitionBy(new MyPartitioner(2))
- partitionRDD.saveAsTextFile("output")
-
- sc.stop()
- }
- //自定义分区器
- class MyPartitioner(num:Int) extends Partitioner {
- override def numPartitions: Int = num
-
- override def getPartition(key: Any): Int = {
- key match {
- case "nba" => 0
- case _ => 1
- }
- }
- }
- --1. 算子:reduceByKey(形参)
- def reduceByKey(func: (V, V) => V): RDD[(K, V)]
- def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
- --2. 形参:有三个重载的方法,我们这里介绍两种。
- 方法1:
- 形参:func: (V, V) => V ,是一个函数
- 函数的形参为:表示相同Key的value
- 函数返回值:经过聚合以后的结果,返回值数据类型和原数据value类型一致
-
- 方法2:
- 形参:func: (V, V) => V, numPartitions: Int
- 形参1:与方法1一致
- 形参2:定义聚合以后分区的数量
-
- --3. 作用:根据数据的Key进行分组,对相同Key的value进行数据处理
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD1: RDD[(String, Int)] = sc.makeRDD(Array(("a", 1), ("b", 1), ("a", 1), ("d", 1)), 2)
- val kvRDD2: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (1, "c"), (3, "d")), 2)
-
- //4.统计每个字母出现的次数
- println(kvRDD1.reduceByKey(_ + _).partitioner.get)
- kvRDD2.reduceByKey(_ + _).collect().foreach(println)
-
- //关闭连接
- sc.stop()
- }
- val list = List(("a",2),("b",1),("a",3),("b",5))
- val rdd: RDD[(String, Int)] = sc.makeRDD(list,2)
- rdd.saveAsTextFile("output")
-
- val rdd1: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
- rdd1.saveAsTextFile("output1")
- --1. 算子:groupByKey(形参)
- def groupByKey(): RDD[(K, Iterable[V])]
- def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
- def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
- --2. 形参:有三个重载的方法:分别是(无参)、(partitioner: Partitioner)、(numPartitions: Int)
- --3. 返回值:RDD[(K, Iterable[V])],是一个元组
- 第一元素:表示用于分组的key
- 第二元素:表示分组后相同key的value的集合
-
- --4. 作用:根据数据的key进行分组
- --5. 对比groupBy:按照指定的规则进行分组
-
- --6. 补充知识点:
- 1. shuffle过程必须落盘。
- 2. 一个分区就是一个task,如果处理过程有shuffle过程,那么将会把task一分为二
- 有shuffle过程,会生产新的分区,生产新的task。
- 3. 判断一个算子的效率,取决于shuffle的效率,落盘的数据越少,读取的数据越少,则效率越高
- --7. 关于reduceByKey和groupByKey的区别。
- 1. 算子的作用:
- reduceByKey:根据key进行分组,对相同的key的value进行操作
- groupByKey:对key进行分组
- 2. groupByKey
- a、对一个分区的数据分区后不能继续执行后续的操作,需要等到其他分区的数据全部到达后,才能执行后续的操作
- b、groupByKey是面向整个数据集,而不是面向一个分区
- c、但是如果在内存等待,那么可能由于内存不够,导致执行失败,所以这个等待的过程依靠落盘.
-
- 3.reduceByKey:
- a、在shuffle之前进行分区内的聚合操作,称之为预聚合,这样shuffle时,落盘的数据就会减少,提高了shuff的
- 效率
- b、分区内和分区间的规则相同。
- 举例:
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD1: RDD[(String, Int)] = sc.makeRDD(Array(("a", 1), ("b", 1), ("a", 1), ("d", 1)), 2)
-
- //4.按照key进行分组
- val value: RDD[(String, Iterable[Int])] = kvRDD1.groupByKey()
-
- //5.统计每个字母出现的次数
- value.map { case (value, iter) =>
- (value, iter.size)
- }.collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
- }
- val list = List(("a",2),("b",1),("a",3),("b",5))
- val rdd: RDD[(String, Int)] = sc.makeRDD(list,2)
- val rdd1: RDD[(String, Iterable[Int])] = rdd.groupByKey(1)
- rdd1.saveAsTextFile("output")
-
- println(rdd1.collect().mkString(","))
- //(b,CompactBuffer(1, 5)),(a,CompactBuffer(2, 3))
- --1. 算子:aggregateByKey(形参)
- --2. 形参:两个形参列表
- a、形参列表1:(zeroValue: U)
- 参数:表示计算的初始值
- b、形参列表2:(seqOp: (U, V) => U,combOp: (U, U) => U)
- 参数1:seqOp: (U, V) => U:是一个函数,表示分区内相同key的value的计算规则
- 函数的形参:第一个参数按照计算规则和第一个value计算的结果,类型和初始值类型相同,第二个参数为数据的V
- 函数的返回值:返回和第一个参数一样的数据类型
-
- 参数2:combOp: (U, U) => U:是一个函数,表示分区间,相同key的value的计算规则
- 函数的形参:两个参数为每个分区的计算结果,类型和初始值类型相同
- 函数的返回值:返回和第一个参数一样的数据类型
- --3. 作用:根据key进行聚合,分区内和分区间的执行逻辑均是针对于value的操作
- --4. 使用场景:
- a、当出现分区内和分区间对数据处理的规则不一样时,使用这个算子。
- --5. 当分区内计算规则和分区间的计算规则相同时,可以使用foldByKey进行替代。
- def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
- --6. 说明:
- a、初始值只参与分区内相同key的第一次运算,而且初始值为value值
- b、可以初始值的方式改变数据结构
- 函数签名
- def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
- combOp: (U, U) => U): RDD[(K, U)]
-
- zeroValue:指的是给每个分区中每种Key分配一次初始值
- seqOp:指的是分区内对于相同的Key对应的Value进行迭代计算(预聚合)
- combOp:指的是分区间对于相同的Key对应的Value进行迭代计算
-
- aggregateByKey算子是函数柯里化,存在两个参数列表
- 第一个参数列表中的参数表示初始值
- 第二个参数列表中含有两个参数
- a、第一个参数表示分区内的计算规则
- b、第二个参数表示分区间的计算规则
-
- 举例:
- 取出每个分区内相同key的最大值然后分区间相加
-
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
- //3.创建RDD
- val kvRDD: RDD[(String, Int)] = sc.makeRDD(List(
- ("a", 3), ("a", 2), ("c", 4),
- ("b", 3), ("c", 6), ("c", 8)
- ), 2)
-
- //4.取出每个分区内相同key的最大值然后分区间相加
- // kvRDD.aggregateByKey(0)((x, y) => math.max(x, y), (a, b) => a + b)
- // .collect()
- // .foreach(println)
-
- //5.WordCount
- kvRDD.aggregateByKey(1000)(_ + _, _ + _)
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
- }
aggregateByKey案例解析:
- 函数签名
- def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
-
- 函数说明
- 当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey
-
- 举例:
-
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD: RDD[(String, Int)] = sc.makeRDD(List(
- ("a", 3), ("a", 2), ("c", 4),
- ("b", 3), ("c", 6), ("c", 8)
- ), 2)
-
- //4.计算WordCount
- kvRDD.foldByKey(0)(_ + _)
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
- }
- --1. 算子:combineByKey(形参)
- --2. 形参: 相同key内进行操作
- 参数1:createCombiner: V => C,表示将计算的第一个值进行结构转化
- 形参:相同key组内的第一个value元素
- 返回:value经过转换后的数据
- 参数2:mergeValue: (C, V) => C,表示分区内的计算规则
- 形参:参数1为经过处理后value,参数2为组内的一个一个的value
- 返回:value经过处理后的数据
- 参数3:mergeCombiners: (C, C) => C):表示分区间的计算规则
- 形参:相同key,两个经过分区内处理过的v
- 返回:返回两个v的处理结果
-
- --3. 作用:均是对key为组进行合并,对value进行数据处理
- --4. 使用场景:当计算时发现key的value不符合计算规则的格式时,那么选择conbineByKey
- 函数签名
- def combineByKey[C](
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C): RDD[(K, C)]
-
- 函数说明
- 最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
-
- 举例:
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD: RDD[(String, Int)] = sc.makeRDD(List(
- ("a", 3), ("a", 2), ("c", 4),
- ("b", 3), ("c", 6), ("c", 8)
- ), 2)
-
- //4.计算WordCount
- kvRDD.combineByKey((x: Int) => x, (x: Int, y: Int) => x + y, (a: Int, b: Int) => a + b)
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
- }
举例:将数据List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个key的平均值
combineByKey()案例分析
代码实现:
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD: RDD[(String, Int)] = sc.makeRDD(List(
- ("a", 88), ("b", 95), ("a", 91),
- ("b", 93), ("a", 95), ("b", 98))
- , 2)
-
- //4.求每个key对应value的和以及出现的次数
- kvRDD.combineByKey(
- (x: Int) => (x, 1),
- (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1),
- (a: (Int, Int), b: (Int, Int)) => (a._1 + b._1, a._2 + b._2))
- .map { case (word, (sum, count)) =>
- (word, sum / count.toDouble)
- }
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
-
- }
aggregateByKey与combineByKey的应用对比
举例:取出每个分区内相同key的最大值然后分区间相加
combineByKey解法:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD: RDD[(String, Int)] = sc.makeRDD(List(
- ("a", -3), ("a", -2), ("c", -4),
- ("b", -3), ("c", -6), ("c", -8)
- ), 2)
-
- //4.取出每个分区内相同key的最大值然后分区间相加
- kvRDD.combineByKey((x: Int) => x,
- (x: Int, y: Int) => math.max(x, y),
- (a: Int, b: Int) => a + b)
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
-
- }
aggregateByKey解法:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
- //3.创建RDD
- val kvRDD: RDD[(String, Int)] = sc.makeRDD(List(
- ("a", 3), ("a", 2), ("c", 4),
- ("b", 3), ("c", 6), ("c", 8)
- ), 2)
-
- //4.取出每个分区内相同key的最大值然后分区间相加
- kvRDD.aggregateByKey(0)((x, y) => math.max(x, y), (a, b) => a + b)
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
- }
举例:求每个key对应value的和以及出现的次数
combineByKey解法
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD: RDD[(String, Int)] = sc.makeRDD(List(
- ("a", 88), ("b", 95), ("a", 91),
- ("b", 93), ("a", 95), ("b", 98))
- , 2)
-
- //4.求每个key对应value的和以及出现的次数
- kvRDD.combineByKey(
- (x: Int) => (x, 1),
- (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1),
- (a: (Int, Int), b: (Int, Int)) => (a._1 + b._1, a._2 + b._2))
- .map { case (word, (sum, count)) =>
- (word, sum / count.toDouble)
- }
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
-
- }
aggregateByKey解法:
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD: RDD[(String, Int)] = sc.makeRDD(List(
- ("a", 88), ("b", 95), ("a", 91),
- ("b", 93), ("a", 95), ("b", 98))
- , 2)
-
- //4.求每个key对应value的和以及出现的次数
- kvRDD.aggregateByKey((0, 0))(
- (x, y) => (x._1 + y, x._2 + 1),
- (a, b) => (a._1 + b._1, a._2 + b._2))
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
-
- }
- -- reduceByKey、foldByKey、aggregateByKey、combineByKey的区别(groupByKey不做聚合)
-
- 1.从源码的角度发现,如上4个算子底层逻辑是相同,唯一不同的区别是参数不同。
- 参数1: createCombiner,分区内每个key的第一个v的转换逻辑
- 参数2: mergeValue,分区内部的计算逻辑
- 参数3: mergeCombiners,分区间的计算逻辑
- def combineByKeyWithClassTag[C](
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C,
- partitioner: Partitioner,
- mapSideCombine: Boolean = true,
- serializer: Serializer = null)
- 算子的灵活性: ① => ② => ③ => ④
- --2.reduceByKey:①
- 源码如下:
- --参数1:没有任何的转换,对key的第一个value没有转换
- --参数2和参数3相同,即分区内和分区间的计算逻辑保持一致。
- combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
-
- --3.aggregateByKey:③
- 源码如下:
- --参数1:传递的初始值会和每一个key的第一个value按照分区内计算逻辑进行计算
- --参数2:分区内计算逻辑
- --参数3:分区间的计算逻辑
- combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),cleanedSeqOp, combOp, partitioner)
-
- --4.foldByKey:②
- 源码如下:
- --参数1:传递的初始值会和每一个key的第一个value按照分区内计算逻辑进行计算
- --参数2和参数3一致:分区内和分区间的计算逻辑保持一致
- combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner)
- --5.combineByKey:④
- 源码如下:
- --参数1:分区内每个key的第一个v的转换逻辑,所以去无需传递初始值
- --参数2:分区内计算逻辑
- --参数3:分区间的计算逻辑
- combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)
-
-
- 比较算子
- 不做聚合:
- groupByKey
-
- 做聚合(预聚合):从上往下,算子使用方式越来越灵活
- reduceByKey(_+_):初始化不需要自定义,分区内以及分区间的计算逻辑相同
-
- foldByKey(0)(_+_):初始化需要自定义(计算逻辑依赖于区内计算逻辑),
- 分区内以及分区间的计算逻辑相同
-
- aggregateByKey(0)(_+_,_+_):初始化需要自定义(计算逻辑依赖于区内计算逻辑),
- 分区内和分区间的计算逻辑可以不同
-
- combineByKey(createCom,_+_,_+_):初始化需要自定义(计算逻辑与区内计算逻辑可以不同),
- 分区内和分区间的计算逻辑可以不同
- --1. 算子:sortByKey(形参)
- --2. 形参:有两个形参,均有默认值:
- 形参1:ascending: Boolean = true,
- 排序的顺序,默认是升序,如果需要降序,则输入false
- 形参2:numPartitions: Int = self.partitions.length:
- 排序以后分区的数量,默认等于上一个rdd的分区的数量。
- --3. 作用:根据key进行排序,默认是升序
- --4. 说明:
- 还可以自定义分区的规则。步骤:
- 1.继承与ordered,并混入serializable
- 2.重写compare方法,指定排序比较的规则
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD: RDD[(String, Int)] = sc.makeRDD(List(
- ("a", -3), ("e", -2), ("c", -4),
- ("b", -3), ("d", -6), ("c", -8)
- ), 2)
-
- //4.按照Key进行排序
- kvRDD.sortByKey()
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
- }
- val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("b", 4)), 2)
- println(rdd.sortByKey().collect().mkString(","))
-
-
- val user1 = User("scala", 20)
- val user2: User = User("scala", 21)
- val user3: User = User("hadoop", 25)
-
- val rdd2: RDD[(User, Int)] = sc.makeRDD(List((user1, 1), (user2, 2),(user3,3)),3)
- println(rdd2.sortByKey().collect().mkString(","))
-
- }
-
- //自定义排序方式
- case class User(name:String,age:Int) extends Ordered[User] with Serializable{
- override def compare(that: User): Int = {
- if (this.name > that.name){
- 1
- }else if (this.name == that.name){
- this.age - that.age
- }else{
- -1
- }
-
-
- }
- }
- =====>join
- --1. 算子:Join(形参)
- def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
- --2. 形参:(other: RDD[(K, W)]): 另外一个RDD
- --3. 算子的返回值:RDD[(K, (V, W))]
- 返回一个元组:
- 元组的第一个元素:两个rdd连接的Key
- 元组的第二个元素:相同key的value,一个来自当前RDD,一个来自另外一个RDD
- --4. 作用:将两个RDD中,key相同的value一一进行连接,类似mysql中的join,会出现笛卡尔积错误
-
- --5. 说明:
- 情况1:如果当前RDD中key在连接的RDD中没有,那么这条数据就不会被关联,数据则没有
- 情况2:如果当前RDD中相同的Key有多条数据,且另外一个RDD与子相同的key也有多条数据,那么就出现了笛卡尔积错误
-
-
- =====> Left/rightOuterJoin
- --1. 算子:LeftOuterJoin/RightOuterJoin
- --2. 形参:另外一个RDD
- --3. 类似mysql中的左外连接和右外连接,同样会出现笛卡尔积错误
- --4. 注意返回值:
- 如果两个RDD有相同的key,则为:(a,(1,Some(21)))
- 如果主RDD中的key,在从RDD没有对应的key,则为:(d,(2,None))
-
-
- 举例:
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 1), ("d", 1)), 2)
- val kvRDD2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 1), ("c", 1)), 2)
-
- //4.JOIN
- val joinRDD: RDD[(String, (Int, Int))] = kvRDD1.join(kvRDD2)
- val value1: RDD[(String, (Int, Option[Int]))] = kvRDD1.leftOuterJoin(kvRDD2)
- val value2: RDD[(String, (Option[Int], Int))] = kvRDD1.rightOuterJoin(kvRDD2)
- val value3: RDD[(String, (Option[Int], Option[Int]))] = kvRDD1.fullOuterJoin(kvRDD2)
-
- //5.打印
- joinRDD.collect().foreach(println)
- println("**********")
- value1.collect().foreach(println)
- println("**********")
- value2.collect().foreach(println)
- println("**********")
- value3.collect().foreach(println)
-
- //关闭连接
- sc.stop()
- }
-
-
- 运行结果:
- (b,(1,1))
- (a,(1,1))
- (a,(2,1))
- **********
- (d,(1,None))
- (b,(1,Some(1)))
- (a,(1,Some(1)))
- (a,(2,Some(1)))
- **********
- (b,(Some(1),1))
- (a,(Some(1),1))
- (a,(Some(2),1))
- (c,(None,1))
- **********
- (d,(Some(1),None))
- (b,(Some(1),Some(1)))
- (a,(Some(1),Some(1)))
- (a,(Some(2),Some(1)))
- (c,(None,Some(1)))
- --1. 算子:cogroup(形参) -->co 是connect的简写
- --2. 形参:other: RDD[(K, W)]:另外一个RDD
- --3. 返回值:(K,(Iterable<V>,Iterable<W>)),是一个元组
- 元组的第一个元素:RDD的key
- 元组的第二个元素:还是一个元组
- 元组的第一个元素:当前相同key的所有value的集合,是一个迭代器
- 元组的第二个元素:另外一个RDD的key的所有value的集合,是一个迭代器
-
- --4. 作用:将两个RDD中,key相同的value组合在一起。
- 函数签名
- def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
-
- 函数说明
- 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
-
- 举例:
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 1), ("d", 1)), 2)
- val kvRDD2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 1), ("c", 1)), 2)
-
- //4.CoGroup
- val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = kvRDD1.cogroup(kvRDD2)
-
- //5.打印
- value.collect().foreach(println)
-
- //关闭连接
- sc.stop()
- }
-
-
- 函数签名
- def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {}
-
- --1. 算子:mapValues(形参)
- --2. 形参:(f: V => U),是一个函数,仅对value进行处理,key不变
- --3. 返回值:kv类型的RDD。
- --4. 作用:对value的数据结构进行转换
-
- 举例:
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 1), ("b", 1), ("d", 1)), 2)
-
- //4.对Value扩大两倍
- kvRDD.mapValues(_ * 2)
- .collect()
- .foreach(println)
-
- //关闭连接
- sc.stop()
- }
- RDD转换算子(懒执行,只会创建一个新的RDD,不会触发任务)====> 返回值为RDD
- RDD行动算子(真正的触发任务的计算)====> 返回值为非RDD(eg:Array、Unit)
- --什么是行动算子:
- a、所谓行动算子,就是不会产生一个新的RDD,而是触发作业的执行
- b、而之前的转换算子,只是功能的扩展和包装,不会触发作业的执行
- c、行动算子执行以后,会获取当前作业的执行结果
- d、Spark的行动算子执行时,会产生job对象,然后提交job对象
- 函数签名
- def reduce(f: (T, T) => T): T
-
- 函数说明
- 聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
-
- 举例:
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val valueRDD: RDD[String] = sc.makeRDD(Array("1", "2", "3", "4"), 2)
-
- //4.累加求和
- println(valueRDD.reduce(_ + _))
- println(valueRDD.reduce(_ + _)) //为演示其先聚合分区内,在聚合分区间
- println(valueRDD.reduce(_ + _))
- //关闭连接
- sc.stop()
- }
-
- 运行结果:
- 1234
- 3412
- 1234
- 函数签名
- def collect(): Array[T]
- 函数说明
- 在驱动程序中,以数组Array的形式返回数据集的所有元素
- 采集数据,纯粹的将所有分区的计算结果拉取到当前的节点,可能会出现内存溢出的情况
-
- 举例:
- val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
-
- // 收集数据到Driver
- rdd.collect().foreach(println)
注意:collect源码中表明,会将所有分区的数据收集到Driver的内存中,在生产环境中数据过大时,会内存溢出
-
- 函数签名
- def count(): Long
-
- 函数说明
- 返回RDD中元素的个数
-
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val valueRDD: RDD[String] = sc.makeRDD(Array("1", "2", "3", "4"), 2)
-
- //4.求当前RDD中的元素个数
- println(valueRDD.count())
- //关闭连接
- sc.stop()
-
- }
-
- 函数签名
- def first(): T
-
- 函数说明
- 返回RDD中的第一个元素
-
-
- 举例:
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val valueRDD: RDD[String] = sc.makeRDD(Array("1", "2", "3", "4"), 2)
-
- //4.取出当前RDD中的第一个元素
- println(valueRDD.first())
-
- //关闭连接
- sc.stop()
- }
- 函数签名
- def take(num: Int): Array[T]
-
- 函数说明
- 返回一个由RDD的前n个元素组成的数组
-
- 举例:
-
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val valueRDD: RDD[String] = sc.makeRDD(Array("1", "2", "3", "4"), 2)
-
- //4.取出当前RDD中的前几个元素
- println(valueRDD.take(2))
-
- //关闭连接
- sc.stop()
- }
- --1. 算子:sum
- --2. 作用:对RDD集合数据求和,但是返回值类型为Double类型
- 函数签名
- def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
-
- 函数说明
- 返回该RDD排序后的前n个元素组成的数组
-
- 举例:
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val valueRDD: RDD[String] = sc.makeRDD(Array("1", "2", "3", "4"), 2)
-
- //4.取出当前RDD中Top2
- valueRDD.sortBy(x => x, false).take(2) //降序
- println(valueRDD.takeOrdered(2)) //升序
-
- //关闭连接
- sc.stop()
-
- }
- --1. 算子:aggregate ,与aggregateByKey的用法类似
- def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
- --2. 形参:有两个形参列表:
- 形参列表1: (zeroValue: U) 初始值
- 形参列表2:(seqOp: (U, T) => U, combOp: (U, U) => U)
- 参数1:分区内的计算逻辑
- 参数2:分区间的计算逻辑
- --3. 与aggregateByKey的区别:
- aggregate的初始值参与分区间和分区内的计算
- aggregateByKey的初始值只参与分区内的计算
-
- --4. 重点:初始值可以和集合数据的类型不一致,分区内和分区间最后的结果和初始值的类型一致,说明是可以起到转换结构的作用。
-
- 举例:
- val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 4)
-
- // 将该RDD所有元素相加得到结果
- val result: Int = rdd.aggregate(0)(_ + _, _ + _) //10
- val result: Int = rdd.aggregate(10)(_ + _, _ + _) //15
-
-
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val valueRDD: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4)
-
- //4.累加求和
- println(valueRDD.aggregate(1)(_ + _, _ + _))
-
- //关闭连接
- sc.stop()
-
- }
- --1. 算子:fold
- (zeroValue: T)(op: (T, T) => T): T
- --2. 作用:当aggregate的分区间和分区内的计算逻辑相同时,可以使用fold进行简化
- --3. 要求:初始值数据类型和集合中的数据类型保持一致
- 举例:
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val valueRDD: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4)
-
- //4.累加求和
- println(valueRDD.fold(1)(_ + _)) //15
-
- //关闭连接
- sc.stop()
-
- }
- --1. 算子:countBykey
- --2. 形参:无参
- --3. 返回值:Map[K, Long],返回一个元组
- 元组的第一个元素:RDD中的Key
- 元组的第二个元素:RDD中Key出现的次数
- --4. 作用:计算key出现的次数
-
- --5. 底层源码:
- a、调用mapValues算子,将V转换为1
- b、然后再调用reduceByKey,将相同的key的value值进行相加
- c、最后转换成map结构
- self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap(慎用)
- 举例:
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD: RDD[(String, Int)] = sc.makeRDD(Array(("a", 1), ("a", 2), ("b", 1),
- ("a", 3), ("b", 4), ("b", 5)), 2)
-
- //4.求每个key出现的次数
- val result: collection.Map[String, Long] = kvRDD.countByKey()
-
- //5.打印结果
- result.foreach(println)
-
- //关闭连接
- sc.stop()
-
- }
- --1. 算子:countByValues
- --2. 形参:无
- --3. 返回值:Map[T, Long] ,返回一个元组
- 元组的第一个元素:原数据的kv
- 元组的第二个元素:原数据kv出现的次数
-
- --4. 作用:计算value出现的次数
- --5. 源码:
- a、将原数据进行处理,(k,v) =>(v,null)
- b、调用countByKey算子,计算v出现的次数
- map(value => (value, null)).countByKey()
- 举例:
- val rdd: RDD[Int] = sc.makeRDD(List(1, 4, 5, 2), 2)
- println(rdd1.countByValue())
- //Map((a,1) -> 1, (b,1) -> 1, (a,2) -> 1)
- --1. 算子:
- saveAsTextFile
- savaAsObjectFile
- savaAsSequenceFile
- --2. 形参:无
- --3. 作用:将RDD的结果以不同的形式保存到文件中,其中savaAsSequenceFile要求数据结构的kv键值对
- 举例:
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建RDD
- val kvRDD: RDD[(String, Int)] = sc.makeRDD(Array(("a", 1), ("a", 2), ("b", 1),
- ("a", 3), ("b", 4), ("b", 5)), 2)
-
- //4.保存至文件中
- kvRDD.saveAsTextFile("./outText")
- kvRDD.saveAsObjectFile("./outObj")
- kvRDD.saveAsSequenceFile("./outSeq")
-
- //关闭连接
- sc.stop()
-
- }
- --1. 算子:foreach
- --2. 形参:无
- --3. 作用:遍历RDD中的数据
- --4. 算子与方法的区别:
- a、rdd.collect().foreach(println) -->foreach:方法 (打印数据在Driver)
- b、rdd.foreach(println) -->foreach:算子 (区内操作)(打印数据在Excutor)
-
- 1.只要看到rdd的算子,一定要想到两个块,Driver和Executor
- 2.算子逻辑代码是在分布式计算节点executor中执行的,算子以外代码是在Driver端执行
- 3.foreach是算子时,那么将在不同的executor中同时执行,互不影响。
- 4.foreach是方法时,那么是在当前的节点的内存中完成数据的循环。
- 5.结果就是:两种方法的结果的顺序会不同。
- 举例:
-
- val rdd: RDD[Int] = sc.makeRDD(List(1, 4, 5, 2), 2)
-
- rdd.collect().foreach(println)//1 4 5 2
- println("=====================")
- rdd.foreach(println) //5 2 1 4
一、闭包检查
从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12版本后闭包编译方式发生了改变
二、序列化方法和属性
从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行,看如下代码:
- object serializable02_function {
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.创建一个RDD
- val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
-
- //3.1创建一个Search对象
- val search = new Search("hello")
-
- //3.2 函数传递,打印:ERROR Task not serializable
- search.getMatch1(rdd).collect().foreach(println)
-
- //3.3 属性传递,打印:ERROR Task not serializable
- search.getMatch2(rdd).collect().foreach(println)
-
- //4.关闭连接
- sc.stop()
- }
- }
- class Search(query:String) extends Serializable {
- def isMatch(s: String): Boolean = {
- s.contains(query)
- }
- // 函数序列化案例
- def getMatch1 (rdd: RDD[String]): RDD[String] = {
- //rdd.filter(this.isMatch)
- rdd.filter(isMatch)
- }
- // 属性序列化案例
- def getMatch2(rdd: RDD[String]): RDD[String] = {
- //rdd.filter(x => x.contains(this.query))
- rdd.filter(x => x.contains(query))
- //val q = query
- //rdd.filter(x => x.contains(q))
- }
- }
三、Kryo序列化框架
参考地址: https://github.com/EsotericSoftware/kryo
Java的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。
注意:即使使用Kryo序列化,也要继承Serializable接口。
- 举例:
- object serializable_Kryo {
- def main(args: Array[String]): Unit = {
- val conf: SparkConf = new SparkConf()
- .setAppName("SerDemo")
- .setMaster("local[*]")
- // 替换默认的序列化机制
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- // 注册需要使用 kryo 序列化的自定义类
- .registerKryoClasses(Array(classOf[Searcher]))
- val sc = new SparkContext(conf)
- val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2)
-
- val searcher = new Searcher("hello")
- val result: RDD[String] = searcher.getMatchedRDD1(rdd)
-
- result.collect.foreach(println)
- }
- }
- case class Searcher(val query: String) {
-
- def isMatch(s: String) = {
- s.contains(query)
- }
-
- def getMatchedRDD1(rdd: RDD[String]) = {
- rdd.filter(isMatch)
- }
-
- def getMatchedRDD2(rdd: RDD[String]) = {
- val q = query
- rdd.filter(_.contains(q))
- }
- }
1、RDD 血缘关系
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
- //3.1 读取数据
- val str: RDD[String] = sc.textFile("input")
- println(str.toDebugString)
-
- //3.2 扁平化数据
- val words: RDD[String] = str.flatMap(_.split(" "))
- println(words.toDebugString)
-
- //3.3 分组
- val wordtocount: RDD[(String, Iterable[String])] = words.groupBy(word => word)
- println(wordtocount.toDebugString)
-
- //3.4 结构化处理=
- val wordcount: RDD[(String, Int)] = wordtocount.map {
- case (word, iter) => (word, iter.size)
- }
- println(wordcount.toDebugString)
-
-
- 打印结果:
- (4) input MapPartitionsRDD[1] at textFile at Spark_WordCount.scala:25 []
- | input HadoopRDD[0] at textFile at Spark_WordCount.scala:25 []
- (4) MapPartitionsRDD[2] at flatMap at Spark_WordCount.scala:29 []
- | input MapPartitionsRDD[1] at textFile at Spark_WordCount.scala:25 []
- | input HadoopRDD[0] at textFile at Spark_WordCount.scala:25 []
- (4) ShuffledRDD[4] at groupBy at Spark_WordCount.scala:33 []
- +-(4) MapPartitionsRDD[3] at groupBy at Spark_WordCount.scala:33 []
- | MapPartitionsRDD[2] at flatMap at Spark_WordCount.scala:29 []
- | input MapPartitionsRDD[1] at textFile at Spark_WordCount.scala:25 []
- | input HadoopRDD[0] at textFile at Spark_WordCount.scala:25 []
- (4) MapPartitionsRDD[5] at map at Spark_WordCount.scala:38 []
- | ShuffledRDD[4] at groupBy at Spark_WordCount.scala:33 []
- +-(4) MapPartitionsRDD[3] at groupBy at Spark_WordCount.scala:33 []
- | MapPartitionsRDD[2] at flatMap at Spark_WordCount.scala:29 []
- | input MapPartitionsRDD[1] at textFile at Spark_WordCount.scala:25 []
- | input HadoopRDD[0] at textFile at Spark_WordCount.scala:25 []
- 总结:
- 1. RDD的依赖关系:RDD与直接上级RDD之间的关系
- 2. RDD的血缘关系:包含直接依赖和间接依赖
- 3. 如果Spark的计算过程中某个节点计算失败,那么框架会重新计算
- 4. Spark想要重新对失败的task重新计算,那么需要知道数据来源以及需要知道数据需要经过哪些计算
- 5. RDD不保存数据,仅保持计算的逻辑
- 6. 依赖关系主要是用来解决容错计算
二、RDD 狭义的依赖关系
这里所谓的依赖关系,其实就是两个相邻RDD之间的关系
- //3.2 扁平化数据
- val words: RDD[String] = str.flatMap(_.split(" "))
- println(words.dependencies)
-
- //3.3 分组
- val wordtocount: RDD[(String, Iterable[String])] = words.groupBy(word => word)
- println(wordtocount.dependencies)
-
- //3.4 结构化处理
- val wordcount: RDD[(String, Int)] = wordtocount.map {
- case (word, iter) => (word, iter.size)
- }
- println(wordcount.dependencies)
-
- 打印结果:
- List(org.apache.spark.OneToOneDependency@4ca8dbfa)
- List(org.apache.spark.ShuffleDependency@4d654825)
- List(org.apache.spark.OneToOneDependency@62db0521)
三、RDD 窄依赖
窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
四、RDD 宽依赖
宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖,会引起Shuffle,总结:宽依赖我们形象的比喻为多生。(Shuffle依赖)
- class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
- @transient private val _rdd: RDD[_ <: Product2[K, V]],
- val partitioner: Partitioner,
- val serializer: Serializer = SparkEnv.get.serializer,
- val keyOrdering: Option[Ordering[K]] = None,
- val aggregator: Option[Aggregator[K, V, C]] = None,
- val mapSideCombine: Boolean = false)
- extends Dependency[Product2[K, V]]
总结:
- --第一种:OneToOneDependency
- 指上游RDD的一个分区最多只能被下游RDD一个分区使用,称之为窄依赖,类比独生子女
-
- --第二种:ShuffleDependency
- 指上游RDD的一个分区被下游RDD的多个分区使用,称之为宽依赖,形成1对n的关系
五、RDD 阶段划分
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。
六、RDD 阶段划分源码
- //*************Stage划分*************
- //1.行动算子
- collect()
-
- //2.运行任务
- runJob()
-
- //3.交给dagScheduler切分任务(SparkContext的核心)
- dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
-
- //4.提交任务
- val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
-
- //5.发送提交任务消息
- eventProcessLoop.post(JobSubmitted())
-
- //6.接收并处理消息
- doOnReceive(
- dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
- )
-
- =====>先构建了最后一个(RDD)阶段 var finalStage : ResultStage = null ====>根据RDD往前找依赖
-
- //7.创建ResultStage
- finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite){
- val parents = getOrCreateParentStages(rdd, jobId){ ======>寻找所有的Shuule依赖,从后面往前进行划分
- getShuffleDependencies(rdd).map { shuffleDep => ======> 没有一个Shuffle依赖就多一个stage
- getOrCreateShuffleMapStage(shuffleDep, firstJobId)
- }}
- val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
- }
总结:
七、RDD任务划分
- RDD任务切分中间分为:Application、Job、Stage和Task
- Application:初始化一个SparkContext即生成一个Application;
- Job:一个Action算子就会生成一个Job;
- Stage:Stage等于宽依赖(ShuffleDependency)的个数加1;
- Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
- 注意:Application->Job->Stage->Task每一层都是1对n的关系。
- 四个步骤
- 1.构建DAG(有向无环图)(调用RDD上的方法)(生成一个finalStage,根据依赖关系往外前进行划分)
- 2.DAGScheduler将DAG切分Stage(切分的依据是Shuffle),将Stage中生成的Task以TaskSet的形式给TaskScheduler
- 3.TaskScheduler调度Task(根据资源情况将Task调度到相应的Executor中)
- 4.Executor接收Task,然后将Task丢入到线程池中执行
八、RDD 任务划分源码
- val tasks: Seq[Task[_]] = try {
- stage match {
- case stage: ShuffleMapStage =>
- partitionsToCompute.map { id =>
- val locs = taskIdToLocations(id)
- val part = stage.rdd.partitions(id)
- new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
- Option(sc.applicationId), sc.applicationAttemptId)
- }
-
- case stage: ResultStage =>
- partitionsToCompute.map { id =>
- val p: Int = stage.partitions(id)
- val part = stage.rdd.partitions(p)
- val locs = taskIdToLocations(id)
- new ResultTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
- Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
- }
- }
-
- ……
-
- val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
-
- ……
-
- override def findMissingPartitions(): Seq[Int] = {
- mapOutputTrackerMaster
- .findMissingPartitions(shuffleDep.shuffleId)
- .getOrElse(0 until numPartitions)
- }
Spark运行时,首先启动一个客户端(Driver),也可以时spark-shell客户端
spark-submit --master spark://hadoop-master:7077 --executor-memory 4g --total-executor-cores 12
1,客户端和Master建立链接并且申请资源,每个executor需要4g内存,总共需要12核
2,master进行资源调度(节点向master注册的时候,会将自己的资源情况一并提交给master)
3,master和worker进行RPC通信,启动executor
4,启动各个worker节点上的executor
5,executor和Driver端进行通信
6,RDD触发Action后,会根据最后这个RDD往前推断依赖关系(宽依赖或者窄依赖),遇到Shuffle就切分Stage,会递归切分,递归的出口是RDD没有父RDD
7,DAGScheduler切分完Stage后,会进行提交Stage,先提交前面的Stage,前面的Stage执行完之后再提交后面的Stage,每一个stage都会产生很多业务逻辑相同的Task,然后以TaskSet的形式将task传递给TaskScheduler,然后TaskScheduler将Task进行序列化,根据资源情况,将task发送给Executor
8,将Driver端产生的task发送给executor
9,executor在接收到task之后,先将task进行反序列化,然后将task用一个实现了runnable接口的实现类包装起来,然后将该包装类丢入线程池,包装类实现的run方法就会被执行,进而调用task的计算逻辑。
1、问题
- -- 问题
- RDD中是不保存数据的,所以如果多个RDD需要共享其中一个RDD的数据,那么必须重头执行,效率非常低,所以需要将一些重复性比较高,
- 比较耗时的操作的结果缓存起来,提高效率,这样,不需要重头执行。
-
2、 cache
- -- 上述问题的解决方法:将计算结果进行缓存,重复使用,提高效率
-
- -- cache解析:
- 1. 缓存cache底层其实调用的persist方法
- 2. persist方法在持久化数据时会采用不同的存储级别对数据进行持久化操作
- 3. cache缓存的默认操作就是将数据保存到内存
- 4. cache存储的数据在内存中,如果内存不够用,executor可以将内存的数据进行整理,然后可以丢弃数据。
- 5. 如果由于executor端整理内存导致缓存的数据丢失,那么数据操作依然要重头执行。
- 6. 如果cache后的数据重头执行数据操作的话,那么必须要遵循血缘关系,所以cache操作不能删除血缘关系。
- 7. cache操作在行动算子执行后,会在血缘关系中增加和缓存相关的依赖
- 8. cache操作不会切断血缘,一旦发生错误,可以重新执行。
cache源码:
验证缓存的现象 :
- val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
-
- val rdd1 = rdd.map(num => {
- println("map...." + num)
- num
- })
- //将RDD的计算结果缓存到内存中
- rdd1.cache()
-
- rdd1.map(_ * 2).collect().foreach(print)
- println("\n*************")
-
- rdd1.collect().foreach(print)
- // 未加rdd1.cache()代码,打印结果为:
- map....3
- map....1
- map....4
- map....2
- 2468
- *************
- map....1
- map....2
- map....3
- map....4
- 1234
- // 加rdd1.cache()代码以后,打印结果为:
- map....3
- map....1
- map....4
- map....2
- 2468
- *************
- 1234
验证缓存的依赖关系
- val rdd1 = rdd.map(num => num)
- rdd1.cache()
-
- println(rdd1.toDebugString)
- rdd1.collect()
- println(rdd1.toDebugString)
- //打印结果:
- (2) MapPartitionsRDD[1] at map at Spark3_persist.scala:20 [Memory Deserialized 1x Replicated]
- | ParallelCollectionRDD[0] at makeRDD at Spark3_persist.scala:18 [Memory Deserialized 1x Replicated]
-
- (2) MapPartitionsRDD[1] at map at Spark3_persist.scala:20 [Memory Deserialized 1x Replicated]
- | CachedPartitions: 2; MemorySize: 48.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
- | ParallelCollectionRDD[0] at makeRDD at Spark3_persist.scala:18 [Memory Deserialized 1x Replicated]
3、 persist
- -- 问题:默认的缓存是存储在Executor端的内存中,数据量大的时候,该如何处理?
- 可以使用persist,将数据保存到当前RDD的磁盘中,但是依然有数据丢失风险。所以我们一般也不这么使用。
- rdd1.persist()
persist的数据保存级别 :
- val NONE = new StorageLevel(false, false, false, false)
- val DISK_ONLY = new StorageLevel(true, false, false, false)
- val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
- val MEMORY_ONLY = new StorageLevel(false, true, false, true)
- val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
- val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
- val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
- val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
- val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
- val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
- val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
- val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
- RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被
- 调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
- 缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。
- 通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算
- 全部Partition。
- Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了
- 避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。
4 、checkPoint
- 所谓的检查点其实就是通过将RDD中间结果写入磁盘
- 由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,
- 减少了开销。
- 对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。
- 1. 将比较耗时,比较重要的数据一般会保存到分布式文件系统中。
- 2. 使用checkpoint方法将数据保存到文件中
- SparkException: Checkpoint directory has not been set in the SparkContext
- 3. 执行checkpoint方法前应该设定检查点的保存目录
- 4. 检查点的操作中为了保证数据的准确性,执行时,会启动新的job(重新执行任务,并切断血缘关系)
- 4.1当对一个RDD执行checkpoint的时候,(行动算子执行的时候)会从当前位置再次提交一次任务,也就是执行一个行动算子,
- 第一次提交两次任务,后面子在执行就是一个任务
- 5. 为了提高性能,检查点操作一般会和cache联合使用,先将数据缓存到内存中,这样再进行checkpoint,这样在执行checkpoint的
- 的时候,就不需重头执行。
- //设定检查点的保存目录
- sc.setCheckpointDir("output")
-
- val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
-
- val rdd1 = rdd.map(num =>{
- println("map...")
- num
- }
- )
- //缓存和检查点联合使用
- rdd1.cache().checkpoint()
-
- println(rdd1.map(_ * 2).collect().mkString(","))
-
- println(rdd1.collect().mkString(","))
5、 缓存和检查点区别
- -- 缓存和检查点区别
- 1. Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
- 2. Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
- 3. 建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD
- // 设置检查点路径
- sc.setCheckpointDir("./checkpoint1")
-
- // 创建一个RDD,读取指定位置文件:hello atguigu atguigu
- val lineRdd: RDD[String] = sc.textFile("input/1.txt")
-
- // 业务逻辑
- val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
-
- val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
- word => {
- (word, System.currentTimeMillis())
- }
- }
-
- // 增加缓存,避免再重新跑一个job做checkpoint
- wordToOneRdd.cache()
- // 数据检查点:针对wordToOneRdd做检查点计算
- wordToOneRdd.checkpoint()
-
- // 触发执行逻辑
- wordToOneRdd.collect().foreach(println)
- -- 1. 分区器类别:
- a、Hash分区,"默认分区器"
- b、Range分区
- c、用户自定义分区
- -- 2. 分区器的作用:
- 直接决定了RDD分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数
-
- --3. 说明
- a、只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
- b、每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
-
- -- 4 分区规则:
- Hash分区:对于给定的key,计算其hashCode,并除以分区个数取余
- Range分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序
- HashPartitioner分区
-
- class HashPartitioner(partitions: Int) extends Partitioner {
- require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
-
- def numPartitions: Int = partitions
-
- def getPartition(key: Any): Int = key match {
- case null => 0
- case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
- }
-
- override def equals(other: Any): Boolean = other match {
- case h: HashPartitioner =>
- h.numPartitions == numPartitions
- case _ =>
- false
- }
-
- override def hashCode: Int = numPartitions
Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
- 文件格式分为:text文件、csv文件、sequence文件以及Object文件;
- 文件系统分为:本地文件系统、HDFS、HBASE以及数据库。
- // 读取输入文件
- val inputRDD: RDD[String] = sc.textFile("input/1.txt")
-
- // 保存数据
- inputRDD.saveAsTextFile("output")
- SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)。
-
- // 保存数据为SequenceFile
- dataRDD.saveAsSequenceFile("output")
-
- // 读取SequenceFile文件
- sc.sequenceFile[Int,Int]("output").collect().foreach(println)
对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[T: ClassTag](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。
- // 保存数据
- dataRDD.saveAsObjectFile("output")
-
- // 读取数据
- sc.objectFile[Int]("output").collect().foreach(println)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。