赞
踩
Spark 框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。 如下图所示,它展示了一个 Spark 执行时的基本结构。图形中的 Driver 表示 master,负责管理整个集群中的作业任务调度。图形中的 Executor 则是 slave,负责实际执行任务。
Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。Driver 在 Spark 作业执行时主要负责:
➢ 将用户程序转化为作业(job)
➢ 在 Executor 之间调度任务(task)
➢ 跟踪 Executor 的执行情况
➢ 通过 UI 展示查询运行情况
Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。
Executor 有两个核心功能:
➢ 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
➢ 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能,所以环境中还有其他两个核心组件:Master 和 Worker,这里的 Master 是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于 Yarn 环境中的 RM, 而Worker 呢,也是进程,一个 Worker 运行在集群中的一台服务器上,由 Master 分配资源对数据进行并行的处理和计算,类似于 Yarn 环境中 NM。
Hadoop 用户向 YARN 集群提交应用程序时,提交程序中应该包含 ApplicationMaster,用于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。
说的简单点就是,ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是ApplicationMaster。
Spark Executor 是集群中运行在工作节点(Worker)中的一个 JVM 进程,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源。这里的资源一般指的是工作节点 Executor 的内存大小和使用的虚拟 CPU 核(Core)数量。
将整个集群并行执行任务的数量称之为并行度。
资源申请+执行步骤
Spark 应用程序提交到 Yarn 环境中执行的时候,一般会有两种部署执行的方式:Client和 Cluster。两种模式主要区别在于:Driver 程序的运行节点位置。
读一个,写一个:效率很低
将传入的字节放入缓冲区,缓冲区超过阈值,将数据一次性输出,提高处理效率。
实际中读取一行一行的数据,大部分需要使用字符流。
三个字节转换为一个字符。
IO操作体现了装饰者设计模式
FileInputStream:文件读取
InputStreamReader:字节转换为字符
BufferedReader:进行缓冲
数据流过,并不保存,保存的只是生成RDD的操作。
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
(1)弹性
⚫ 存储的弹性:内存与磁盘的自动切换;
⚫ 容错的弹性:数据丢失可以自动恢复;
⚫ 计算的弹性:计算出错重试机制;
⚫ 分片的弹性:可根据需要重新分片。
(2)分布式:数据存储在大数据集群不同节点上
(3)数据集:RDD 封装了计算逻辑,并不保存数据
(4)数据抽象:RDD 是一个抽象类,需要子类具体实现
(5)不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
(6)可分区、并行计算
(1)分区列表
RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
(2)分区计算函数
Spark 在计算时,是使用分区函数对每一个分区进行计算(计算逻辑)
(3)RDD 之间的依赖关系
RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系
(4)分区器(可选)
当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区
(5)首选位置(可选)
计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算(判断计算到那个节点,计算效率最优)
移动数据不如移动计算
从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。
执行时,需要将计算资源和计算模型进行协调和整合。
Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的
计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计
算。最后得到计算结果。
RDD 是 Spark 框架中用于数据处理的核心模型,接下来我们看看,在 Yarn 环境中,RDD
的工作原理:
(1) 启动 Yarn 集群
(2)Spark通过申请资源创建调度节点和计算节点
(3)Spark 框架根据需求将计算逻辑根据分区划分成不同的任务
(4)调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给Executor 节点执行计算。
1)RDD创建
(1)从集合(内存)中创建RDD
从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD
package com.yu.bigdata.spark.core.rddCreate import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object RDD_Memory { def main(args: Array[String]): Unit = { //1. 准备环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") //local[*] * 表示当前可用的最大核数,local表示单线程模拟单核 val sc = new SparkContext(sparkConf) //2. 创建RDD //从内存中创建RDD,将内存中集合的数据作为处理的数据源 val seq = Seq[Int](1, 2, 3, 4) //parallelize方法 //val rdd: RDD[Int] = sc.parallelize(seq) //使用makeRDD方法 //makeRDD方法在底层实现时就是调用了rdd对象的parallelize方法,更易理解 val rdd: RDD[Int] = sc.makeRDD(seq) rdd.collect().foreach(println) //3. 关闭环境 sc.stop() } }
(2)从外部存储(文件)创建 RDD
由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase 等。
//从文件中创建RDD,将文件中的数据作为数据源 //sc.textFile("/Users/yu/Downloads/practise/com.yu.bigdata/datas/1.txt") //绝对路径
//(1)path路径默认以当前环境的根路径为基准,绝对路径和相对路径均可
val rdd: RDD[String] = sc.textFile("datas/1.txt")
//(2)路径可以是文件的具体路径,也可以是目录名称
val rdd = sc.textFile("datas") //可以读取文件夹下所有文件
//(3)路径还可以使用通配符
sc.textFile("datas/1*.txt") //这样以1开头的文件内容都会被读取到
//(4)path也可以是分布式存储系统路径:HDFS
sc.textFile("hdfs://linux1:8020/text.txt")
textFile与wholeTextFiles区分:是否能够看出从哪个文件中读取的内容
//textFile:以行为单位来读取数据,读取的数据都是字符串
//wholeTextFiles:以文件为单位来读取数据 能够看出从哪个文件中读取的内容
//读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
val rdd: RDD[(String, String)] = sc.wholeTextFiles("datas")
rdd.collect().foreach(println)
//(file:/Users/yu/Downloads/practise/com.yu.bigdata/datas/2.txt,Hello World Hello Spark)
//(file:/Users/yu/Downloads/practise/com.yu.bigdata/datas/1.txt,Hello World Hello Spark)
(3)从其他 RDD创建
主要是通过一个 RDD 运算完后,再产生新的 RDD。
(4)直接创建RDD(new)
使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。
2)RDD并行度与分区
默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能 够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。
A. 集合数据源——分区的分配
package com.yu.bigdata.spark.core.rddCreate import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object RDD_Memory_Par { def main(args: Array[String]): Unit = { // TODO 准备环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") //sparkConf.set("spark.default.parallelism", "5") //这样可以指定分区数 val sc = new SparkContext(sparkConf) // TODO 创建RDD 并行度&分区 //第二个参数表示分区数量 可以不传递,会使用默认并行度 使用totalCores即为当前环境的最大可用核数 //分区可以在第一步配置环境时指定 //val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) //产生两个分区 val rdd = sc.makeRDD(List(1, 2, 3, 4)) //产生了8个,默认并行度 //将处理的数据保存成分区文件 rdd.saveAsTextFile("output") // TODO 关闭环境 sc.stop() } }
B. 集合数据源——分区数据的分配:
package com.yu.bigdata.spark.core.rddCreate import org.apache.spark.{SparkConf, SparkContext} object RDD_Memory_Par1 { def main(args: Array[String]): Unit = { // TODO 准备环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(sparkConf) // TODO 创建RDD 并行度&分区 // 【1,2】 【3,4】 //val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) //产生两个分区 // 【1】 【2】 【3, 4】 //val rdd = sc.makeRDD(List(1, 2, 3, 4), 3) //产生3个分区 // 【1】 【2,3】 【4,5】 val rdd = sc.makeRDD(List(1, 2, 3, 4, 5), 3) //产生3个分区 //将处理的数据保存成分区文件 rdd.saveAsTextFile("output") // TODO 关闭环境 sc.stop() } }
源码切分函数:
[1, 2, 3, 4, 5] 3
length: 5 , numSlices: 3
切分数组,分配数据
numSlices : 0 => (0, 1) => [1]
numSlices : 1 => (1, 3) => [2, 3]
numSlices : 2 => (3, 5) => [4, 5]
C. 文件数据源——分区的分配:
package com.yu.bigdata.spark.core.rddCreate import org.apache.spark.{SparkConf, SparkContext} object RDD_File_Par { def main(args: Array[String]): Unit = { // TODO 准备环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(sparkConf) // TODO 创建RDD //textFile可以将文件作为数据处理的数据源,默认也可以设定分区 //minPartitions:最小分区数量 //math.min(defaultParallelism, 2) min(8,2) // val rdd = sc.textFile("datas/1.txt") // rdd.saveAsTextFile("output") //如果不想使用默认的分区数量,可以通过第二个参数指定分区数 //val rdd = sc.textFile("datas/1.txt", 3) //三个分区 val rdd = sc.textFile("datas/1.txt", 2) //两个分区 //分区数量的计算方式 1/ENTER2/ENTER3 => 1+2+1+2+1=7 //Spark //totalSize : 7 ENTER两个字节 共七个字节 //goalSize : 7 / 2 = 3 (byte)每个分区存放三个字节 //7 / 3 = 2……1 剩下1 (1.1) // 需要判断 剩下的字节数占每个分区字节数 的百分比大于10% 就要产生新的分区 否则不会 //所以 2 + 1 ,一共三个分区 // TODO 关闭环境 sc.stop() } }
D. 文件数据源——分区数据的分配:
RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型。
RDD方法:(RDD算子)
转换:功能的补充和封装,将旧的RDD包装成新的RDD(flatMap,map)
行动:触发任务的调度和作业的执行(collect)
(1)map
函数说明:
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
函数签名:
def map[U: ClassTag](f: T => U): RDD[U]
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD: RDD[Int] = rdd.map(_ * 2)
并行执行:
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark01_RDD_Operator_Transform_Par { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 —— map val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) //1. rdd计算一个分区内的数据是一个一个执行 // 只有前面的数据全部的逻辑执行完毕后,才会执行下一个数据 // 分区内数据的执行是有序的 //2. 不同分区的数据执行是无序的 val mapRDD = rdd.map ( num => { println(">>>>>>" + num) num } ) val mapRDD1 = mapRDD.map ( num => { println("######" + num) num } ) mapRDD1.collect() //一个分区时的结果 //两个分区时的结果 //>>>>>>1 >>>>>>1 //######1 >>>>>>3 //>>>>>>2 ######3 //######2 ######1 //>>>>>>3 >>>>>>4 //######3 >>>>>>2 //>>>>>>4 ######2 //######4 ######4 sc.stop() } }
(2)mapPartitions
函数说明:
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
函数签名:
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark02_RDD_MapPartitions { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 —— mapPartitions val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) //可以以分区为单位进行数据转换操作,但是会将整个分区的数据加载到内存进行引用 //处理完的数据是不会被释放掉的,因为存在对象的引用 //在数据量多,内存较小的情况下,容易出现内存溢出 那么使用map更好 val mapRDD: RDD[Int] = rdd.mapPartitions( iter => { println(">>>>>>") iter.map(_ * 2) } ) mapRDD.collect().foreach(println) //>>>>>> 有两个分区,迭代器走了两次,是以分区为单位进行的处理 //>>>>>> //2 //4 //6 //8 sc.stop() } }
获取每个数据分区的最大值:
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark02_RDD_MapPartitions_test { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 —— mapPartitions val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) // TODO 获取每个分区的最大值 val mapRDD = rdd.mapPartitions( iter => { List(iter.max).iterator } ) mapRDD.collect().foreach(println) //2 //4 sc.stop() } }
map 和 mapPartitions 的区别:
➢ 数据处理角度
Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。
➢ 功能的角度
Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据。
➢ 性能的角度
Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。
(3)mapPartitionsWithIndex
函数说明:
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
函数签名:
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark03_RDD_MapPartitionsWithIndex { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 —— mapPartitionsWithIndex val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) val mapRDD = rdd.mapPartitionsWithIndex( (index, iter) => { if ( index == 1 ) { iter } else { Nil.iterator //空集合 } } ) mapRDD.collect().foreach(println) //3 //4 sc.stop() } }
打印出每个分区内的数据
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.{SparkConf, SparkContext} object Spark03_RDD_MapPartitionsWithIndex1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 —— mapPartitionsWithIndex val rdd = sc.makeRDD(List(1, 2, 3, 4)) //这样默认会产生8个分区 //输出每个分区的数据 (分区索引, 分区内元素) val mapRDD = rdd.mapPartitionsWithIndex( (index, iter) => { iter.map( num => { (index, num) } ) } ) mapRDD.collect().foreach(println) //(1,1) //(3,2) //(5,3) //(7,4) sc.stop() } }
(4)flatMap
函数说明:
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射。
函数签名:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark04_RDD_flatMap { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("flatMap") val sc = new SparkContext(sparkConf) // TODO 算子flatMap val rdd: RDD[List[Int]] = sc.makeRDD(List( List(1, 2), List(3, 4) )) val flatRDD: RDD[Int] = rdd.flatMap( list => { list //返回一个迭代器,flatMap知道怎么拆 } ) flatRDD.collect().foreach(println) //1 //2 //3 //4 val rdd1: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark")) val flatRDD1: RDD[String] = rdd1.flatMap( s => { s.split(" ") //返回一个迭代器,告诉flatMap怎么拆分 } ) flatRDD1.collect().foreach(println) //Hello //Scala //Hello //Spark //将 List(List(1,2),3,List(4,5))进行扁平化操作 val rdd2: RDD[Any] = sc.makeRDD(List(List(1, 2), 3, List(3, 4))) val flatRDD2 = rdd2.flatMap( data => { data match { case list:List[_] => list //是list返回自身 case data => List(data) //转换为list } } ) flatRDD2.collect().foreach(println) //1 //2 //3 //3 //4 sc.stop() } }
(5)glom
函数说明:
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。
转换前后,分区不变,并行度不变。
函数签名:
def glom(): RDD[Array[T]]
一个分区的数据 => 一个数组
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark05_RDD_glom { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("glom") val sc = new SparkContext(sparkConf) // TODO -glom val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2) val glomRDD: RDD[Array[Int]] = rdd.glom() glomRDD.collect().foreach( data => println(data.mkString(","))) //1,2 //3,4 // 计算所有分区最大值求和(分区内取最大值,分区间最大值求和) //上面已经将list转换成两个分区的集合了,可以直接统计每个集合中的最大值 val maxRDD: RDD[Int] = glomRDD.map( array => { array.max } ) println(maxRDD.collect().sum) //6 } }
(6)groupBy
函数说明:
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中。
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组。
groupBy会将数据打乱(打散),重新组合,这个操作称之为shuffle。分组与分区没有必然的关系
函数签名:
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark06_RDD_groupBy { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("groupBy") val sc = new SparkContext(sparkConf) // TODO groupBy val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2) //groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组 //相同key值的数据会放置在一个组里 def groupFunction(num: Int): Int = { num % 2 } val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction) groupRDD.collect().foreach(println) //(0,CompactBuffer(2, 4)) //(1,CompactBuffer(1, 3)) // TODO 按照首字母进行分组 val rdd1: RDD[String] = sc.makeRDD(List("Hello", "Spark", "Hello", "Scala")) val groupRDD1: RDD[(Char, Iterable[String])] = rdd1.groupBy(_.charAt(0)) groupRDD1.collect().foreach(println) //(H,CompactBuffer(Hello, Hello)) //(S,CompactBuffer(Spark, Scala)) } }
从服务器日志数据 apache.log 中获取每个时间段访问量:
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import java.text.SimpleDateFormat import java.util.Date object Spark06_RDD_groupBy_test { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("groupBy") val sc = new SparkContext(sparkConf) // TODO 从服务器日志数据 apache.log 中获取每个时间段访问量 val rdd = sc.textFile("datas/apache.log") val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map( line => { val datas = line.split(" ") //根据空格切分 val time = datas(3) //取出时间段 val hour: String = time.split(":")(1) val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") val date: Date = sdf.parse(time) //按指定的源格式把String转换为Date对象 val sdf1 = new SimpleDateFormat("HH") val hour: String = sdf1.format(date) //按指定的目标格式把Date对象转换为String //date.getHours (hour, 1) } ).groupBy(_._1) val groupRDD: RDD[(String, Int)] = timeRDD.map { case (hour, iter) => { (hour, iter.size) } } groupRDD.collect().foreach(println) //(06,366) //(20,486) //…… sc.stop() } }
(7)filter
函数说明:
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
函数签名:
def filter(f: T => Boolean): RDD[T]
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark07_RDD_filter { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("filter") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(1, 2, 3, 4)) //取出奇数 val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0) filterRDD.collect().foreach(println) //1 //3 val rdd1: RDD[String] = sc.textFile("datas/apache.log") val filterRDD1: RDD[String] = rdd1.filter( line => { val data = line.split(" ") val time = data(3) time.startsWith("17/05/2015") } ) filterRDD1.collect().foreach(println) } }
(8)sample
函数说明:
根据指定的规则从数据集中抽取数据。
函数签名:
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
数据抽取不放回(伯努利算法):
第一个参数:抽取的数据是否放回,false:不放回
第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
第三个参数:随机数种子
数据抽取放回(泊松算法):
第一个参数:抽取的数据是否放回,true:放回;false:不放回
第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
第三个参数:随机数种子
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.{SparkConf, SparkContext} object Spark8_RDD_Sample { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("filter") val sc = new SparkContext(sparkConf) // TODO filter val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8)) // 第一个参数:抽取的数据是否放回,false:不放回 // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取; // 抽取不放回:伯努利分布,数据源中每条数据被抽取的概率 阈值 // 抽取放回:泊松分布,大于等于0,表示数据源中的每条数据被抽取的可能次数 // 第三个参数:随机数种子 若不传递此参数,使用的是当前系统时间,所以每次都会不同 // rdd.sample( // false, // 0.4, // 3 // ).collect().foreach(println) println(rdd.sample( true, 4, 3 ).collect().mkString(",") ) //1,1,2,2,3,3,4,4,4,4,5,5,5,5,5,6,6,6,6,6,7,7,8,8,8,8,8 } }
可以使用采样方法测试数据是否有倾斜现象。
(9)distinct
函数说明:
将数据集中重复的数据去重
函数签名:
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
package com.yu.bigdata.spark.core.operatorTransform
import org.apache.spark.{SparkConf, SparkContext}
object Spark09_RDD_distinct {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("filter")
val sc = new SparkContext(sparkConf)
// TODO distinct
// case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
val rdd = sc.makeRDD(List(1, 2, 3, 1, 2, 5, 4, 8))
val disRDD = rdd.distinct()
disRDD.collect().foreach(println)
}
}
(10)coalesce
函数说明:
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本。
函数签名:
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer:
Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.{SparkConf, SparkContext} object Spark10_RDD_coalesce { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("coalesce") val sc = new SparkContext(sparkConf) // TODO coalesce val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3) val newRDD = rdd.coalesce(2) //1 2在一个分区, 3,4,5,6在一个分区 val newRDD1 = rdd.coalesce(2, true) //进行shuffle操作 数据均衡了 //coalesce方法默认情况下不会将分区的数据打乱重新组合 //这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜 //如果想要让数据均衡,可以进行shuffle操作 } }
(11)repartition
函数说明:
该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。
函数签名:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.{SparkConf, SparkContext} object Spark11_RDD_repartition { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("repartition") val sc = new SparkContext(sparkConf) // TODO repartition val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2) //coalesce算子可以扩大分区,但是如果不进行shuffle操作是没有意义的,不起作用 //所以如果想要实现扩大分区的效果,需要使用shuffle操作 // val newRDD = rdd.coalesce(3) //1:123 2:345 3:null val newRDD = rdd.coalesce(3, true) //简化操作:缩减分区使用coalesce操作,扩大分区使用repartition // repartition底层调用了coalesce,一定调用shuffle操作 val newRDD1 = rdd.repartition(3) } }
(12)sortBy
函数说明:
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程。
函数签名:
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark_RDD_sortBy { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sortBy") val sc = new SparkContext(sparkConf) // TODO sortBy val rdd = sc.makeRDD(List(6, 2, 4, 3, 1, 5),2) //底层有shuffle val newRDD = rdd.sortBy(num => num) newRDD.collect().foreach(println) //sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序方式 //sortBy默认情况下不会改变分区,但是中间存在shuffle操作 val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2) //val newRDD1 = rdd1.sortBy( t => t._1 ) val newRDD1 = rdd1.sortBy( t => t._1.toInt , false) newRDD1.collect().foreach(println) sc.stop() } }
(13)intersection
对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
(14)union
对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
(15)substract
以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集
(16)zip
将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark13_RDD_Collect { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("operator") val sc = new SparkContext(sparkConf) // TODO 双Value类型 val rdd1 = sc.makeRDD(List(1, 2, 3, 4)) val rdd2 = sc.makeRDD(List(3, 4, 5, 6)) //交集 val rdd3 = rdd1.intersection(rdd2) println(rdd3.collect().mkString(",")) //3,4 //并集 val rdd4 = rdd1.union(rdd2) println(rdd4.collect().mkString(",")) //1,2,3,4,3,4,5,6 //差集 val rdd5 = rdd1.subtract(rdd2) println(rdd5.collect().mkString(",")) //1,2 //拉链 val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2) println(rdd6.collect().mkString(",")) //(1,3),(2,4),(3,5),(4,6) } }
PS:
交集、并集、差集要求两个数据源数据类型保持一致;
拉链操作两个数据源的数据类型可以不一致,但数据源的分区数量需要保持一致,且两个数据源要求分区中数据数量保持一致。
(17)partitionBy
将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner。
如果重分区的分区器和当前RDD的分区器名称和数量均相同,那么数据分区不会再变化。
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD: RDD[(Int, Int)] = rdd.map((_, 1))
//partitionBy根据指定的分区规则对数据进行重分区
mapRDD.partitionBy(new HashPartitioner(2))
(18)reduceByKey
可以将数据按照相同的 Key 对 Value 进行聚合
val rdd = sc.makeRDD(List(
("a", 1), ("b", 2), ("a", 3), ("c", 5)
))
//可以将数据按照相同的 Key 对 Value 进行聚合
//Scala中一般的聚合操作是两两聚合,spark基于scala开发,所以也是两两聚合
//reduceByKey中如果key的数据只有一个,是不会参与运算的
val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey((x: Int, y: Int) => x + y)
reduceRDD.collect().foreach(println)
(19)groupByKey
将数据源的数据根据 key 对 value 进行分组
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark16_RDD_groupByKey { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("operator") val sc = new SparkContext(sparkConf) // TODO Key-Value类型 val rdd = sc.makeRDD(List( ("a", 1), ("b", 2), ("a", 3), ("c", 5) )) //将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组 //元组中的第一个元素就是key //元组中的第二个元素是相同key的value集合 val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey() groupRDD.collect().foreach(println) //(a,CompactBuffer(1, 3)) //(b,CompactBuffer(2)) //(c,CompactBuffer(5)) val groupRDD1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1) groupRDD1.collect().foreach(println) //(a,CompactBuffer((a,1), (a,3))) //(b,CompactBuffer((b,2))) //(c,CompactBuffer((c,5))) sc.stop() } }
reduceByKey与groupByKey区别:
groupByKey会导致数据打乱重组,存在shuffle操作。Spark中,shuffle操作必须落盘处理,不能在内存中数据等待,会导致内存溢出,shuffle操作的性能非常低。
预聚合:
reuceBykey可以对分区内的数据进行预聚合,可以有效减少shuffle时落盘的数据量。(reduceByKey分区内和分区间计算规则是相同的。)
从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey。
(20)aggregateByKey
将数据根据不同的规则进行分区内计算和分区间计算
val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("a", 3), ("a", 4) ), 2) //( (a, [1, 2]), (a, [3, 4]) ) //(a, 2) (a, 4) //(a, 6) //aggregateByKey存在函数柯里化,有两个参数列表 //第一个参数列表,需要传递一个参数,表示为初值值 // 主要用于遇到第一个key时,和value进行分区内计算 //第二个参数列表需要传递两个参数 // 第一个参数表示分区内计算规则 // 第一个参数表示分区间计算规则 val aggRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)( (x, y) => math.max(x, y), //分区内取最大值 (x, y) => x + y //分区间求和 ) aggRDD.collect().foreach(println) //(a,6)
aggregateByKey计算流程:
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark17_RDD_aggregateByKey_test { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("operator") val sc = new SparkContext(sparkConf) // TODO Key-Value类型 val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ), 2) //aggregateByKey最终的返回数据结果应该和初始值的类型保持一致 val newRDD: RDD[(String, String)] = rdd.aggregateByKey("")(_ + _, _ + _) //获取相同key的数据的平均值 => (a, 3)(b, 4) val newRDD1: RDD[(String, (Int, Int))] = rdd.aggregateByKey((0, 0))( //第一个0代表用于比较的初始值,第二个0代表相同key的出现次数 (t, v) => { //t代表元组,v代表相应的值,t._1是加和产生的值,t._2是累加key的出现次数 (t._1 + v, t._2 + 1) }, //分区内,相同key相加和,并统计出现次数 (t1, t2) => { //将分区间的统计值相加 (t1._1 + t2._1, t1._2 + t2._2) } ) //只对v采取操作使用mapValues val AverRDD: RDD[(String, Int)] = newRDD1.mapValues { case (sum, cnt) => { sum / cnt } } newRDD1.collect().foreach(println) //(b,(12,3)) //(a,(9,3)) AverRDD.collect().foreach(println) //(b,4) //(a,3) sc.stop() } }
(21)foldByKey
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)
//如果分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
rdd.foldByKey(0)(_+_).collect().foreach(println)
//(b,12)
//(a,9)
(22)combineByKey
最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
val newRDD = rdd.combineByKey(
v => (v, 1), //进行结构转换
//下面需要加上类型,动态的
(t: (Int, Int), v) => {
(t._1 + v, t._2 + 1)
},
(t1: (Int, Int), t2: (Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别:
reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
(23)sortByKey
在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的 默认升序
val rdd = sc.makeRDD(List(
("a", 1), ("d", 2), ("b", 3)
), 2)
val sortRDD: RDD[(String, Int)] = rdd.sortByKey()
sortRDD.collect().foreach(println)
//(a,1)
//(b,3)
//(d,2)
val sortRDD1 = rdd.sortByKey(false)
sortRDD1.collect().foreach(println)
//(d,2)
//(b,3)
//(a,1)
(24)join
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2), ("c", 3)
), 2)
val rdd2 = sc.makeRDD(List(
("a", 4), ("b", 5), ("c", 6)
), 2)
//join:两个不同数据源的数据,相同的key的value会连接在一起,形成元组
// 如果两个数据源中key没有匹配上,那么数据不会出现在结果中
// 如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会指数增长,性能会降低
val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRDD.collect().foreach(println)
//(b,(2,5))
//(a,(1,4))
//(c,(3,6))
(25)leftOuterJoin
类似于 SQL 语句的左外连接
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2), ("c", 3)
), 2)
val rdd2 = sc.makeRDD(List(
("a", 4), ("b", 5)//, ("c", 6)
), 2)
//rdd1为主表,主表中的都会输出
val leftJoinRDD: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
leftJoinRDD.collect().foreach(println)
//(b,(2,Some(5)))
//(a,(1,Some(4)))
//(c,(3,None))
val rightJoinRDD = rdd1.rightOuterJoin(rdd2) //右连接
(26)cogroup
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2)//, ("c", 3)
), 2)
val rdd2 = sc.makeRDD(List(
("a", 4), ("b", 5), ("c", 6), ("c", 7)
), 2)
//cogroup: connect + group 分组 连接
val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
cgRDD.collect().foreach(println)
//(b,(CompactBuffer(2),CompactBuffer(5)))
//(a,(CompactBuffer(1),CompactBuffer(4)))
//(c,(CompactBuffer(),CompactBuffer(6, 7)))
统计出每一个省份每个广告被点击数量排行的 Top3。
package com.yu.bigdata.spark.core.operatorTransform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark24_RDD_Req { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("operator") val sc = new SparkContext(sparkConf) // TODO 案例实操 //1. 获取原始数据 时间戳,省份,城市,用户,广告 val dataRdd: RDD[String] = sc.textFile("datas/agent.log") //2. 将原始数据进行结构的转换,便于统计 //取出省份,广告 ((河北,A), 1) val mapRDD: RDD[((String, String), Int)] = dataRdd.map( line => { val datas = line.split((" ")) ((datas(1), datas(4)), 1) } ) //3. 将转换结构后的数据分组聚合 // ((河北,A), 1) => ((河北,A), sum) val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _) //4. 将聚合的结果进行结构转换 ((河北,A), sum) => (河北,(A, sum) ) //模式匹配 val newRDD: RDD[(String, (String, Int))] = reduceRDD.map { case ((prv, ad), sum) => { (prv, (ad, sum)) } } //5. 将转换结构后的数据根据省份进行分组 val groupRDD: RDD[(String, Iterable[(String, Int)])] = newRDD.groupByKey() //6. 将分组后的数据组内排序(降序),取前三 val resultRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues( iter => { iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3) } ) //7. 采集数据,进行打印 resultRDD.collect().foreach(println) //(4,List((12,25), (2,22), (16,22))) //(8,List((2,27), (20,23), (11,22))) //…… sc.stop() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。