赞
踩
视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili
- 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】
- 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy、filter、sample、distinct、coalesce、repartition、sortBy、intersection、union、subtract、zip)】
- 尚硅谷大数据技术Spark教程-笔记03【SparkCore(核心编程,partitionBy、reduceByKey、groupByKey、aggregateByKey、foldByKey、combineByKey、sortByKey、join、leftOuterJoin、cogroup)】
目录
P022【022.尚硅谷_SparkCore - 分布式计算模拟 - 搭建基础的架子】12:48
P023【023.尚硅谷_SparkCore - 分布式计算模拟 - 客户端向服务器发送计算任务】10:50
P024【024.尚硅谷_SparkCore - 分布式计算模拟 - 数据结构和分布式计算】11:39
P025【025.尚硅谷_SparkCore - 核心编程 - RDD - 概念介绍】05:31
P026【026.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 1】10:11
P027【027.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 2】08:49
P028【028.尚硅谷_SparkCore - 核心编程 - RDD - RDD和IO之间的关系】12:24
P029【029.尚硅谷_SparkCore - 核心编程 - RDD - 特点】13:34
P030【030.尚硅谷_SparkCore - 核心编程 - RDD - 五大主要配置】11:19
P031【031.尚硅谷_SparkCore - 核心编程 - RDD - 执行原理】03:05
P032【032.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 内存】11:02
P033【033.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件】06:28
P034【034.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件1】04:42
P035【035.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区的设定】11:41
P036【036.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区数据的分配】13:54
P037【037.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区的设定】11:33
P038【038.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配】08:21
P039【039.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配 - 案例分析】06:13
P040【040.尚硅谷_SparkCore - 核心编程 - RDD - 算子介绍】07:49
P041【041.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map】07:46
P042【042.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 小功能】05:12
P043【043.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 并行计算效果演示】08:54
P044【044.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions】06:12
P045【045.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions - 小练习】03:49
P046【046.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions & map的区别 - 完成比完美更重要】02:21
P047【047.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitionsWithIndex】06:30
P048【048.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap】05:07
P049【049.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap - 小练习】02:41
P050【050.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - glom】06:33
P051【051.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 理解分区不变的含义】06:48
P052【052.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy】05:25
P053【053.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - shuffle来袭】06:01
P054【054.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - 小练习】07:51
P055【055.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - filter - 数据倾斜】07:11
P056【056.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sample - 抽奖喽】16:11
P057【057.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - distinct】06:13
P058【058.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - coalesce】11:11
P059【059.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - repartition】07:28
P060【060.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sortBy】06:31
P061【061.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链】08:19
P062【062.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链 - 注意事项】08:10
第5章 Spark核心编程
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
➢ RDD : 弹性分布式数据集
➢ 累加器:分布式共享只写变量
➢ 广播变量:分布式共享只读变量
5.1 RDD
5.1.1 什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
- package com.atguigu.bigdata.spark.core.test
-
- import java.io.OutputStream
- import java.net.Socket
-
- object Driver {
- def main(args: Array[String]): Unit = {
- //连接服务器
- val client = new Socket("localhost", 9999)
- val out: OutputStream = client.getOutputStream
- out.write(2) //发送数据
- out.flush()
- out.close()
-
- client.close()
- }
- }
- package com.atguigu.bigdata.spark.core.test
-
- import java.io.InputStream
- import java.net.{ServerSocket, Socket}
-
- object Executor {
- def main(args: Array[String]): Unit = {
- //启动服务器,接收数据
- val server = new ServerSocket(9999)
- println("服务器启动,等待接收数据...")
-
- //等待客户端的连接
- val client: Socket = server.accept()
- val in: InputStream = client.getInputStream
-
- val i: Int = in.read()
- println("接收到客户端发送的数据:" + i)
- in.close()
- client.close()
- server.close()
- }
- }
- package com.atguigu.bigdata.spark.core.test
-
- class Task extends Serializable { //最基本的计算任务
- val datas = List(1, 2, 3, 4)
-
- //val logic = (num: Int) => { num * 2 }
- val logic: (Int) => Int = _ * 2
-
- //计算
- def compute() = {
- datas.map(logic)
- }
- }
- package com.atguigu.bigdata.spark.core.test
-
- import java.io.{ObjectOutputStream, OutputStream}
- import java.net.Socket
-
- object Driver {
- def main(args: Array[String]): Unit = {
- //连接服务器
- val client = new Socket("localhost", 9999)
-
- val out: OutputStream = client.getOutputStream
- val objOut = new ObjectOutputStream(out)
-
- val task = new Task()
- objOut.writeObject(task)
- objOut.flush()
- objOut.close()
- client.close()
- println("客户端数据发送完毕。")
- }
- }
- package com.atguigu.bigdata.spark.core.test
-
- import java.io.{InputStream, ObjectInputStream}
- import java.net.{ServerSocket, Socket}
-
- object Executor {
- def main(args: Array[String]): Unit = {
- //启动服务器,接收数据
- val server = new ServerSocket(9999)
- println("服务器启动,等待接收数据...")
-
- //等待客户端的连接
- val client: Socket = server.accept()
- val in: InputStream = client.getInputStream
-
- val objIn = new ObjectInputStream(in)
- val task: Task = objIn.readObject().asInstanceOf[Task]
- val ints: List[Int] = task.compute()
-
- println("节点计算任务的节点为:" + ints)//计算节点计算的结果为
- objIn.close()
- client.close()
- server.close()
- }
- }
Task与SubTask需要具有相同的逻辑。
- package com.atguigu.bigdata.spark.core.test
-
- class Task extends Serializable { //最基本的计算任务
- val datas = List(1, 2, 3, 4)
-
- //val logic = (num: Int) => { num * 2 }
- val logic: (Int) => Int = _ * 2
-
- //计算
- def compute() = {
- datas.map(logic)
- }
- }
- package com.atguigu.bigdata.spark.core.test
-
- class SubTask extends Serializable {
- var datas: List[Int] = _
- var logic: (Int) => Int = _
-
- //计算
- def compute() = {
- datas.map(logic)
- }
- }
- package com.atguigu.bigdata.spark.core.test
-
- import java.io.{ObjectOutputStream, OutputStream}
- import java.net.Socket
-
- object Driver {
- def main(args: Array[String]): Unit = {
- //连接服务器
- val client1 = new Socket("localhost", 9999)
- val client2 = new Socket("localhost", 8888)
-
- val task = new Task()
-
- val out1: OutputStream = client1.getOutputStream
- val objOut1 = new ObjectOutputStream(out1)
-
- val subTask = new SubTask()
- subTask.logic = task.logic
- subTask.datas = task.datas.take(2)
-
- objOut1.writeObject(subTask)
- objOut1.flush()
- objOut1.close()
- client1.close()
-
- val out2: OutputStream = client2.getOutputStream
- val objOut2 = new ObjectOutputStream(out2)
-
- val subTask1 = new SubTask()
- subTask1.logic = task.logic
- subTask1.datas = task.datas.takeRight(2)
- objOut2.writeObject(subTask1)
- objOut2.flush()
- objOut2.close()
- client2.close()
- println("客户端数据发送完毕...")
- }
- }
- package com.atguigu.bigdata.spark.core.test
-
- import java.io.{InputStream, ObjectInputStream}
- import java.net.{ServerSocket, Socket}
-
- object Executor {
- def main(args: Array[String]): Unit = {
- //启动服务器,接收数据
- val server = new ServerSocket(9999)
- println("服务器启动,等待接收数据...")
-
- //等待客户端的连接
- val client: Socket = server.accept()
- val in: InputStream = client.getInputStream
- val objIn = new ObjectInputStream(in)
- val task: SubTask = objIn.readObject().asInstanceOf[SubTask]
- val ints: List[Int] = task.compute()
- println("计算节点[9999]计算的结果为:" + ints)
- objIn.close()
- client.close()
- server.close()
- }
- }
- package com.atguigu.bigdata.spark.core.test
-
- import java.io.{InputStream, ObjectInputStream}
- import java.net.{ServerSocket, Socket}
-
- object Executor2 {
- def main(args: Array[String]): Unit = {
- //启动服务器,接收数据
- val server = new ServerSocket(8888)
- println("服务器启动,等待接收数据...")
-
- //等待客户端的连接
- val client: Socket = server.accept()
- val in: InputStream = client.getInputStream
- val objIn = new ObjectInputStream(in)
- val task: SubTask = objIn.readObject().asInstanceOf[SubTask]
- val ints: List[Int] = task.compute()
- println("计算节点[8888]计算的结果为:" + ints)
- objIn.close()
- client.close()
- server.close()
- }
- }
5.1 RDD
5.1.1 什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
画图工具:Balsamiq Mockups 3
5.1.1 什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
- ➢ 弹性
- ⚫ 存储的弹性:内存与磁盘的自动切换;
- ⚫ 容错的弹性:数据丢失可以自动恢复;
- ⚫ 计算的弹性:计算出错重试机制;
- ⚫ 分片的弹性:可根据需要重新分片。
- ➢ 分布式:数据存储在大数据集群不同节点上
- ➢ 数据集:RDD封装了计算逻辑,并不保存数据
- ➢ 数据抽象:RDD是一个抽象类,需要子类具体实现
- ➢ 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑
- ➢ 可分区、并行计算
5.1.2 核心属性
RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
5.1.3 执行原理
从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务;然后将任务发到已经分配资源的计算节点上,按照指定的计算模型进行数据计算;最后得到计算结果。
5.1.4 基础编程
5.1.4.1 RDD创建
在Spark中创建RDD的创建方式可以分为四种:
- 1) 从集合(内存)中创建 RDD
- 2) 从外部存储(文件)创建 RDD
- 3) 从其他 RDD 创建
- 4) 直接创建 RDD(new)
ctrl+p:快捷键,提示函数参数列表。
- package com.atguigu.bigdata.spark.core.rdd.builder
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark01_RDD_Memory {
- def main(args: Array[String]): Unit = {
- //TODO 准备环境
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") //*表示当前系统的最大可用核数
- val sc = new SparkContext(sparkConf)
-
- //TODO 创建RDD
- //从内存中创建RDD,将内存中集合的数据作为处理的数据源
- val seq = Seq[Int](1, 2, 3, 4)
-
- //parallelize:并行
- //val rdd: RDD[Int] = sc.parallelize(seq)
-
- //makeRDD方法在底层实现时其实就是调用了rdd对象的parallelize方法。
- val rdd: RDD[Int] = sc.makeRDD(seq)
-
- rdd.collect().foreach(println)
-
- //TODO 关闭环境
- sc.stop()
- }
- }
- package com.atguigu.bigdata.spark.core.rdd.builder
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark02_RDD_File {
- def main(args: Array[String]): Unit = {
- //TODO 准备环境
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
- val sc = new SparkContext(sparkConf)
-
- //TODO 创建RDD
- //从文件中创建RDD,将文件中的数据作为处理的数据源
- //path路径默认以当前环境的根路径为基准,可以写绝对路径,也可以写相对路径
- //sc.textFile("D:\\allCode\\JetBrains\\IdeaProjects\\atguigu-classes\\datas\\1.txt")
- //val rdd: RDD[String] = sc.textFile("datas/1.txt")
-
- //path路径可以是文件的具体路径,也可以目录名称
- //val rdd = sc.textFile("datas")
-
- //path路径还可以使用通配符 *
- val rdd = sc.textFile("datas/1*.txt")
-
- //path还可以是分布式存储系统路径:HDFS
- //val rdd = sc.textFile("hdfs://node1:8020/test.txt")
-
- rdd.collect().foreach(println)
-
- //TODO 关闭环境
- sc.stop()
- }
- }
- package com.atguigu.bigdata.spark.core.rdd.builder
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark02_RDD_File1 {
- def main(args: Array[String]): Unit = {
- //TODO 准备环境
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
- val sc = new SparkContext(sparkConf)
-
- //TODO 创建RDD
- //从文件中创建RDD,将文件中的数据作为处理的数据源
-
- //textFile:以行为单位来读取数据,读取的数据都是字符串
- //wholeTextFiles:以文件为单位读取数据
- //读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
- val rdd = sc.wholeTextFiles("datas")
-
- rdd.collect().foreach(println)
-
- //TODO 关闭环境
- sc.stop()
- }
- }
5.1.4.2 RDD 并行度与分区
- package com.atguigu.bigdata.spark.core.rdd.builder
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark01_RDD_Memory_Par {
- def main(args: Array[String]): Unit = {
- //TODO 准备环境
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
- sparkConf.set("spark.default.parallelism", "5")//5个分区
- val sc = new SparkContext(sparkConf)
-
- //TODO 创建RDD
- //RDD的并行度 & 分区
- //makeRDD方法可以传递第二个参数,这个参数表示分区的数量
- //第二个参数可以不传递的,那么makeRDD方法会使用默认值:defaultParallelism(默认并行度)
- // scheduler.conf.getInt("spark.default.parallelism", totalCores)
- // spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
- // 如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数
-
- //val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
- val rdd = sc.makeRDD(List(1, 2, 3, 4))
-
- //将处理的数据保存成分区文件
- rdd.saveAsTextFile("output")
-
- //TODO 关闭环境
- sc.stop()
- }
- }
- package com.atguigu.bigdata.spark.core.rdd.builder
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark01_RDD_Memory_Par1 {
- def main(args: Array[String]): Unit = {
- //TODO 准备环境
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
- val sc = new SparkContext(sparkConf)
-
- //TODO 创建RDD
-
- //【1,2】,【3,4】
- //val rdd = sc.makeRDD(List(1,2,3,4), 2)
-
- //【1】,【2】,【3,4】
- //val rdd = sc.makeRDD(List(1,2,3,4), 3)
-
- //【1】,【2,3】,【4,5】
- val rdd = sc.makeRDD(List(1, 2, 3, 4, 5), 3)
-
- //将处理的数据保存成分区文件
- rdd.saveAsTextFile("output")
-
- //TODO 关闭环境
- sc.stop()
- }
- }
- package com.atguigu.bigdata.spark.core.rdd.builder
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark02_RDD_File_Par {
- def main(args: Array[String]): Unit = {
- // TODO 准备环境
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
- val sc = new SparkContext(sparkConf)
-
- // TODO 创建RDD
- // textFile可以将文件作为数据处理的数据源,默认也可以设定分区。
- // minPartitions : 最小分区数量
- // math.min(defaultParallelism, 2)
-
- //val rdd = sc.textFile("datas/1.txt")
- //如果不想使用默认的分区数量,可以通过第二个参数指定分区数
- //Spark读取文件,底层其实使用的就是Hadoop的读取方式
- //分区数量的计算方式:
- // totalSize = 7
- // goalSize = 7 / 2 = 3(byte)
-
- //7 / 3 = 2...1 (1.1) + 1 = 3(分区)
-
- val rdd = sc.textFile("datas/1.txt", 2)
-
- rdd.saveAsTextFile("output")
-
- // TODO 关闭环境
- sc.stop()
- }
- }
- package com.atguigu.bigdata.spark.core.rdd.builder
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark02_RDD_File_Par1 {
- def main(args: Array[String]): Unit = {
- //TODO 准备环境
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
- val sc = new SparkContext(sparkConf)
-
- //TODO 创建RDD
- //TODO 数据分区的分配
- //1. 数据以行为单位进行读取
- // spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,和字节数没有关系
- //2. 数据读取时以偏移量为单位,偏移量不会被重复读取
- /*
- 偏移量
- 1@@ => 012
- 2@@ => 345
- 3 => 6
- */
- //3. 数据分区的偏移量范围的计算
- // 0 => [0, 3] => 12
- // 1 => [3, 6] => 3
- // 2 => [6, 7] =>
-
- //【1,2】,【3】,【】
- val rdd = sc.textFile("datas/1.txt", 2)
-
- rdd.saveAsTextFile("output")
-
- //TODO 关闭环境
- sc.stop()
- }
- }
- package com.atguigu.bigdata.spark.core.rdd.builder
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark03_RDD_File_Par2 {
- def main(args: Array[String]): Unit = {
- // TODO 准备环境
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
- val sc = new SparkContext(sparkConf)
-
- // TODO 创建RDD
-
- // 14byte / 2 = 7byte
- // 14 / 7 = 2(分区)
-
- /*
- 1234567@@ => 012345678
- 89@@ => 9101112
- 0 => 13
- [0, 7] => 1234567
- [7, 14] => 890
- */
-
- // 如果数据源为多个文件,那么计算分区时以文件为单位进行分区
- val rdd = sc.textFile("datas/word.txt", 2)
-
- rdd.saveAsTextFile("output003")
-
- // TODO 关闭环境
- sc.stop()
- }
- }
5.1.4.3 RDD转换算子
1) map
RDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型。
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark01_RDD_Operator_Transform {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - map
-
- val rdd = sc.makeRDD(List(1, 2, 3, 4))
- // 1,2,3,4
- // 2,4,6,8
-
- //转换函数
- def mapFunction(num: Int): Int = {
- num * 2
- }
-
- //val mapRDD: RDD[Int] = rdd.map(mapFunction)
- //val mapRDD: RDD[Int] = rdd.map((num: Int) => { num * 2 })
- //val mapRDD: RDD[Int] = rdd.map((num: Int) => num * 2)
- //val mapRDD: RDD[Int] = rdd.map((num) => num * 2)
- //val mapRDD: RDD[Int] = rdd.map(num => num * 2)
- val mapRDD: RDD[Int] = rdd.map(_ * 2)
-
- mapRDD.collect().foreach(println)
-
- sc.stop()
- }
- }
小功能:从服务器日志数据apache.log中获取用户请求URL资源路径。
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark01_RDD_Operator_Transform_Test {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - map
- val rdd = sc.textFile("datas/apache.log")
-
- // 长的字符串
- // 短的字符串
- val mapRDD: RDD[String] = rdd.map(
- line => {
- val datas = line.split(" ")
- datas(6)
- }
- )
- mapRDD.collect().foreach(println)
-
- sc.stop()
- }
- }
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark01_RDD_Operator_Transform_Par {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - map
-
- // 1. rdd的计算一个分区内的数据是一个一个地执行逻辑
- // 只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。
- // 分区内数据的执行是有序的。
- // 2. 不同分区数据计算是无序的。
- val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)//2个分区
-
- val mapRDD = rdd.map(
- num => {
- println(">>>>>>>> " + num)
- num
- }
- )
- val mapRDD1 = mapRDD.map(
- num => {
- println("######" + num)
- num
- }
- )
-
- mapRDD1.collect()
-
- sc.stop()
- }
- }
2) mapPartitions
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark02_RDD_Operator_Transform {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - mapPartitions
- val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
-
- // mapPartitions:可以以分区为单位进行数据转换操作
- // 但是会将整个分区的数据加载到内存进行引用
- // 如果处理完的数据是不会被释放掉,存在对象的引用。
- // 在内存较小,数据量较大的场合下,容易出现内存溢出。
- val mpRDD: RDD[Int] = rdd.mapPartitions(
- iter => {
- println(">>>>>>>>>>")
- iter.map(_ * 2)
- }
- )
- mpRDD.collect().foreach(println)
-
- sc.stop()
- }
- }
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark02_RDD_Operator_Transform_Test {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - mapPartitions
- val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
-
- // 【1,2】,【3,4】
- // 【2】,【4】
- val mpRDD = rdd.mapPartitions(
- iter => {
- List(iter.max).iterator
- }
- )
- mpRDD.collect().foreach(println)
-
- sc.stop()
- }
- }
思考一个问题:map和mapPartitions的区别?
- 数据处理角度
- Map 算子是分区内一个数据一个数据的执行,类似于串行操作。
- 而 mapPartitions 算子是以分区为单位进行批处理操作。
- 功能的角度
- Map 算子主要目的将数据源中的数据进行转换和改变,但是不会减少或增多数据。
- MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变, 所以可以增加或减少数据。
- 性能的角度
- Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。
- 但是mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。
完成比完美更重要。
3) mapPartitionsWithIndex
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark03_RDD_Operator_Transform1 {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - mapPartitions
- val rdd = sc.makeRDD(List(1, 2, 3, 4))
-
- val mpiRDD = rdd.mapPartitionsWithIndex(
- (index, iter) => {
- // 1, 2, 3, 4
- //(0,1)(2,2),(4,3),(6,4)
- iter.map(
- num => {
- (index, num)
- }
- )
- }
- )
-
- mpiRDD.collect().foreach(println)
-
- sc.stop()
- }
- }
4) flatMap
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark04_RDD_Operator_Transform {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - flatMap
- val rdd: RDD[List[Int]] = sc.makeRDD(List(
- List(1, 2), List(3, 4)
- ))
- val flatRDD: RDD[Int] = rdd.flatMap(
- list => {
- list
- }
- )
- flatRDD.collect().foreach(println)
-
- sc.stop()
- }
- }
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark04_RDD_Operator_Transform1 {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - flatMap
- val rdd: RDD[String] = sc.makeRDD(List(
- "Hello Scala", "Hello Spark"
- ))
-
- val flatRDD: RDD[String] = rdd.flatMap(
- s => {
- s.split(" ")
- }
- )
- flatRDD.collect().foreach(println)
-
- sc.stop()
- }
- }
小功能:将 List(List(1,2),3,List(4,5))进行扁平化操作。
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark04_RDD_Operator_Transform2 {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - flatMap
- val rdd = sc.makeRDD(List(List(1, 2), 3, List(4, 5)))
-
- val flatRDD = rdd.flatMap(
- data => {
- data match {//模式匹配
- case list: List[_] => list
- case dat => List(dat)
- }
- }
- )
-
- flatRDD.collect().foreach(println)
-
- sc.stop()
- }
- }
5) glom
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark05_RDD_Operator_Transform_Test {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - glom
- val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
-
- // 【1,2】,【3,4】
- // 【2】,【4】
- // 【6】
- val glomRDD: RDD[Array[Int]] = rdd.glom()
-
- val maxRDD: RDD[Int] = glomRDD.map(
- array => {
- array.max
- }
- )
- println(maxRDD.collect().sum)
-
- sc.stop()
- }
- }
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark01_RDD_Operator_Transform_Part {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - map
- val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
- // 【1,2】,【3,4】
- rdd.saveAsTextFile("output1")
- val mapRDD = rdd.map(_ * 2)
- // 【2,4】,【6,8】
- mapRDD.saveAsTextFile("output2")
-
- sc.stop()
- }
- }
6) groupBy
小功能:将 List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark06_RDD_Operator_Transform {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - groupBy
- val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
-
- // groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
- // 相同的key值的数据会放置在一个组中
- def groupFunction(num: Int) = {
- num % 2
- }
-
- val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
-
- groupRDD.collect().foreach(println)
- sc.stop()
- }
- }
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark06_RDD_Operator_Transform1 {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - groupBy
- val rdd = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)
-
- // 分组和分区没有必然的关系
- val groupRDD = rdd.groupBy(_.charAt(0))
-
- groupRDD.collect().foreach(println)
- sc.stop()
- }
- }
6) groupBy
小功能:从服务器日志数据apache.log中获取每个时间段访问量。
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import java.text.SimpleDateFormat
- import java.util.Date
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark06_RDD_Operator_Transform_Test {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - groupBy
- val rdd = sc.textFile("datas/apache.log")
-
- val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
- line => {
- val datas = line.split(" ")
- val time = datas(3)
- //time.substring(0, )
- val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
- val date: Date = sdf.parse(time)
- val sdf1 = new SimpleDateFormat("HH")
- val hour: String = sdf1.format(date)
- (hour, 1)
- }
- ).groupBy(_._1)
-
- timeRDD.map {
- case (hour, iter) => {
- (hour, iter.size)
- }
- }.collect.foreach(println)
-
- sc.stop()
- }
- }
7) filter
小功能:从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径。
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import java.text.SimpleDateFormat
- import java.util.Date
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark07_RDD_Operator_Transform {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - filter
- val rdd = sc.makeRDD(List(1, 2, 3, 4))
-
- val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0)
-
- filterRDD.collect().foreach(println)
-
- sc.stop()
- }
- }
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark07_RDD_Operator_Transform_Test {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - filter
- val rdd = sc.textFile("datas/apache.log")
-
- rdd.filter(
- line => {
- val datas = line.split(" ")
- val time = datas(3)
- time.startsWith("17/05/2015")
- }
- ).collect().foreach(println)
-
- sc.stop()
- }
- }
8) sample
思考一个问题:有啥用,抽奖吗?使用场景:数据倾斜,分区:均衡、shuffle。
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark08_RDD_Operator_Transform {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - sample
- val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
-
- // sample算子需要传递三个参数
- // 1. 第一个参数表示,抽取数据后是否将数据返回,true(放回)、false(丢弃)
- // 2. 第二个参数表示,
- // 如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
- // 如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数
- // 3. 第三个参数表示,抽取数据时随机算法的种子
- // 如果不传递第三个参数,那么使用的是当前系统时间
-
- // println(rdd.sample(
- // false,
- // 0.4,
- // 1
- // ).collect().mkString(",")
- // )
-
- println(rdd.sample(
- true,
- 2,
- 1
- ).collect().mkString(","))
-
- sc.stop()
- }
- }
9) distinct
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark09_RDD_Operator_Transform {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - distinct
- val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))
-
- // map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
-
- // (1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null)
- // (1, null)(1, null)(1, null)
- // (null, null) => null
- // (1, null) => 1
- val rdd1: RDD[Int] = rdd.distinct()
-
- rdd1.collect().foreach(println)
-
- sc.stop()
- }
- }
10) coalesce
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark10_RDD_Operator_Transform {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - coalesce
- val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
-
- // coalesce方法默认情况下不会将分区的数据打乱重新组合
- // 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
- // 如果想要让数据均衡,可以进行shuffle处理
- // val newRDD: RDD[Int] = rdd.coalesce(2)
- val newRDD: RDD[Int] = rdd.coalesce(2, true)
-
- newRDD.saveAsTextFile("output004")
-
- sc.stop()
- }
- }
11) repartition
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark11_RDD_Operator_Transform {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - repartition
- val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
-
- // coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。
- // 所以如果想要实现扩大分区的效果,需要使用shuffle操作
- // spark提供了一个简化的操作
- // 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
- // 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle
-
- // val newRDD: RDD[Int] = rdd.coalesce(3, true)
- val newRDD: RDD[Int] = rdd.repartition(3)
-
- newRDD.saveAsTextFile("output005")
-
- sc.stop()
- }
- }
12) sortBy
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark12_RDD_Operator_Transform1 {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - sortBy
- val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)
-
- // sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式
- // sortBy默认情况下,不会改变分区,但是中间存在shuffle操作。
- val newRDD = rdd.sortBy(t => t._1.toInt, false)
-
- newRDD.collect().foreach(println)
-
- sc.stop()
- }
- }
13) intersection
14) union
15) subtract
16) zip
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark13_RDD_Operator_Transform {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - 双Value类型
-
- // 交集,并集和差集要求两个数据源数据类型保持一致
- // 拉链操作两个数据源的类型可以不一致
-
- val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
- val rdd2 = sc.makeRDD(List(3, 4, 5, 6))
-
- // 交集 : 【3,4】
- val rdd3: RDD[Int] = rdd1.intersection(rdd2)
- println(rdd3.collect().mkString(","))
-
- // 并集 : 【1,2,3,4,3,4,5,6】
- val rdd4: RDD[Int] = rdd1.union(rdd2)
- println(rdd4.collect().mkString(","))
-
- // 差集 : 【1,2】
- val rdd5: RDD[Int] = rdd1.subtract(rdd2)
- println(rdd5.collect().mkString(","))
-
- // 拉链 : 【1-3,2-4,3-5,4-6】
- val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
- println(rdd6.collect().mkString(","))
-
- sc.stop()
- }
- }
13) intersection
14) union
15) subtract
16) zip
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark13_RDD_Operator_Transform {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - 双Value类型
-
- // 交集,并集和差集要求两个数据源数据类型保持一致
- // 拉链操作两个数据源的类型可以不一致
-
- val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
- val rdd2 = sc.makeRDD(List(3, 4, 5, 6))
- val rdd7 = sc.makeRDD(List("3", "4", "5", "6"))
-
- // 交集 : 【3,4】
- val rdd3: RDD[Int] = rdd1.intersection(rdd2)
- //val rdd8 = rdd1.intersection(rdd7)
- println(rdd3.collect().mkString(","))
-
- // 并集 : 【1,2,3,4,3,4,5,6】
- val rdd4: RDD[Int] = rdd1.union(rdd2)
- println(rdd4.collect().mkString(","))
-
- // 差集 : 【1,2】
- val rdd5: RDD[Int] = rdd1.subtract(rdd2)
- println(rdd5.collect().mkString(","))
-
- // 拉链 : 【1-3,2-4,3-5,4-6】
- val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
- val rdd8 = rdd1.zip(rdd7)
- println(rdd6.collect().mkString(","))
-
- sc.stop()
- }
- }
- package com.atguigu.bigdata.spark.core.rdd.operator.transform
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark13_RDD_Operator_Transform1 {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // TODO 算子 - 双Value类型
-
- // Can't zip RDDs with unequal numbers of partitions: List(2, 4)
- // 两个数据源要求分区数量要保持一致
- // Can only zip RDDs with same number of elements in each partition
- // 两个数据源要求分区中数据数量保持一致
- val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
- val rdd2 = sc.makeRDD(List(3, 4, 5, 6), 2)
-
- val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
- println(rdd6.collect().mkString(","))
-
- sc.stop()
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。