当前位置:   article > 正文

尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy)】_尚硅谷 spark word 版本

尚硅谷 spark word 版本

视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili

  1. 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】
  2. 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy、filter、sample、distinct、coalesce、repartition、sortBy、intersection、union、subtract、zip)】
  3. 尚硅谷大数据技术Spark教程-笔记03【SparkCore(核心编程,partitionBy、reduceByKey、groupByKey、aggregateByKey、foldByKey、combineByKey、sortByKey、join、leftOuterJoin、cogroup)】

目录

01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P022【022.尚硅谷_SparkCore - 分布式计算模拟 - 搭建基础的架子】12:48

P023【023.尚硅谷_SparkCore - 分布式计算模拟 - 客户端向服务器发送计算任务】10:50

P024【024.尚硅谷_SparkCore - 分布式计算模拟 - 数据结构和分布式计算】11:39

P025【025.尚硅谷_SparkCore - 核心编程 - RDD - 概念介绍】05:31

P026【026.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 1】10:11

P027【027.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 2】08:49

P028【028.尚硅谷_SparkCore - 核心编程 - RDD - RDD和IO之间的关系】12:24

P029【029.尚硅谷_SparkCore - 核心编程 - RDD - 特点】13:34

P030【030.尚硅谷_SparkCore - 核心编程 - RDD - 五大主要配置】11:19

P031【031.尚硅谷_SparkCore - 核心编程 - RDD - 执行原理】03:05

P032【032.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 内存】11:02

P033【033.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件】06:28

P034【034.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件1】04:42

P035【035.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区的设定】11:41

P036【036.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区数据的分配】13:54

P037【037.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区的设定】11:33

P038【038.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配】08:21

P039【039.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配 - 案例分析】06:13

P040【040.尚硅谷_SparkCore - 核心编程 - RDD - 算子介绍】07:49

P041【041.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map】07:46

P042【042.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 小功能】05:12

P043【043.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 并行计算效果演示】08:54

P044【044.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions】06:12

P045【045.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions - 小练习】03:49

P046【046.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions & map的区别 - 完成比完美更重要】02:21

P047【047.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitionsWithIndex】06:30

P048【048.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap】05:07

P049【049.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap - 小练习】02:41

P050【050.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - glom】06:33

P051【051.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 理解分区不变的含义】06:48

P052【052.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy】05:25

P053【053.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - shuffle来袭】06:01

P054【054.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - 小练习】07:51

P055【055.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - filter - 数据倾斜】07:11

P056【056.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sample - 抽奖喽】16:11

P057【057.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - distinct】06:13

P058【058.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - coalesce】11:11

P059【059.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - repartition】07:28

P060【060.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sortBy】06:31

P061【061.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链】08:19

P062【062.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链 - 注意事项】08:10


01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P022【022.尚硅谷_SparkCore - 分布式计算模拟 - 搭建基础的架子】12:48

第5章 Spark核心编程

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

➢ RDD : 弹性分布式数据集

➢ 累加器:分布式共享只写变量

➢ 广播变量:分布式共享只读变量

5.1 RDD

5.1.1 什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

 

  1. package com.atguigu.bigdata.spark.core.test
  2. import java.io.OutputStream
  3. import java.net.Socket
  4. object Driver {
  5. def main(args: Array[String]): Unit = {
  6. //连接服务器
  7. val client = new Socket("localhost", 9999)
  8. val out: OutputStream = client.getOutputStream
  9. out.write(2) //发送数据
  10. out.flush()
  11. out.close()
  12. client.close()
  13. }
  14. }
  1. package com.atguigu.bigdata.spark.core.test
  2. import java.io.InputStream
  3. import java.net.{ServerSocket, Socket}
  4. object Executor {
  5. def main(args: Array[String]): Unit = {
  6. //启动服务器,接收数据
  7. val server = new ServerSocket(9999)
  8. println("服务器启动,等待接收数据...")
  9. //等待客户端的连接
  10. val client: Socket = server.accept()
  11. val in: InputStream = client.getInputStream
  12. val i: Int = in.read()
  13. println("接收到客户端发送的数据:" + i)
  14. in.close()
  15. client.close()
  16. server.close()
  17. }
  18. }

P023【023.尚硅谷_SparkCore - 分布式计算模拟 - 客户端向服务器发送计算任务】10:50

 

  1. package com.atguigu.bigdata.spark.core.test
  2. class Task extends Serializable { //最基本的计算任务
  3. val datas = List(1, 2, 3, 4)
  4. //val logic = (num: Int) => { num * 2 }
  5. val logic: (Int) => Int = _ * 2
  6. //计算
  7. def compute() = {
  8. datas.map(logic)
  9. }
  10. }
  1. package com.atguigu.bigdata.spark.core.test
  2. import java.io.{ObjectOutputStream, OutputStream}
  3. import java.net.Socket
  4. object Driver {
  5. def main(args: Array[String]): Unit = {
  6. //连接服务器
  7. val client = new Socket("localhost", 9999)
  8. val out: OutputStream = client.getOutputStream
  9. val objOut = new ObjectOutputStream(out)
  10. val task = new Task()
  11. objOut.writeObject(task)
  12. objOut.flush()
  13. objOut.close()
  14. client.close()
  15. println("客户端数据发送完毕。")
  16. }
  17. }
  1. package com.atguigu.bigdata.spark.core.test
  2. import java.io.{InputStream, ObjectInputStream}
  3. import java.net.{ServerSocket, Socket}
  4. object Executor {
  5. def main(args: Array[String]): Unit = {
  6. //启动服务器,接收数据
  7. val server = new ServerSocket(9999)
  8. println("服务器启动,等待接收数据...")
  9. //等待客户端的连接
  10. val client: Socket = server.accept()
  11. val in: InputStream = client.getInputStream
  12. val objIn = new ObjectInputStream(in)
  13. val task: Task = objIn.readObject().asInstanceOf[Task]
  14. val ints: List[Int] = task.compute()
  15. println("节点计算任务的节点为:" + ints)//计算节点计算的结果为
  16. objIn.close()
  17. client.close()
  18. server.close()
  19. }
  20. }

P024【024.尚硅谷_SparkCore - 分布式计算模拟 - 数据结构和分布式计算】11:39

Task与SubTask需要具有相同的逻辑。

   

 

  1. package com.atguigu.bigdata.spark.core.test
  2. class Task extends Serializable { //最基本的计算任务
  3. val datas = List(1, 2, 3, 4)
  4. //val logic = (num: Int) => { num * 2 }
  5. val logic: (Int) => Int = _ * 2
  6. //计算
  7. def compute() = {
  8. datas.map(logic)
  9. }
  10. }
  1. package com.atguigu.bigdata.spark.core.test
  2. class SubTask extends Serializable {
  3. var datas: List[Int] = _
  4. var logic: (Int) => Int = _
  5. //计算
  6. def compute() = {
  7. datas.map(logic)
  8. }
  9. }
  1. package com.atguigu.bigdata.spark.core.test
  2. import java.io.{ObjectOutputStream, OutputStream}
  3. import java.net.Socket
  4. object Driver {
  5. def main(args: Array[String]): Unit = {
  6. //连接服务器
  7. val client1 = new Socket("localhost", 9999)
  8. val client2 = new Socket("localhost", 8888)
  9. val task = new Task()
  10. val out1: OutputStream = client1.getOutputStream
  11. val objOut1 = new ObjectOutputStream(out1)
  12. val subTask = new SubTask()
  13. subTask.logic = task.logic
  14. subTask.datas = task.datas.take(2)
  15. objOut1.writeObject(subTask)
  16. objOut1.flush()
  17. objOut1.close()
  18. client1.close()
  19. val out2: OutputStream = client2.getOutputStream
  20. val objOut2 = new ObjectOutputStream(out2)
  21. val subTask1 = new SubTask()
  22. subTask1.logic = task.logic
  23. subTask1.datas = task.datas.takeRight(2)
  24. objOut2.writeObject(subTask1)
  25. objOut2.flush()
  26. objOut2.close()
  27. client2.close()
  28. println("客户端数据发送完毕...")
  29. }
  30. }
  1. package com.atguigu.bigdata.spark.core.test
  2. import java.io.{InputStream, ObjectInputStream}
  3. import java.net.{ServerSocket, Socket}
  4. object Executor {
  5. def main(args: Array[String]): Unit = {
  6. //启动服务器,接收数据
  7. val server = new ServerSocket(9999)
  8. println("服务器启动,等待接收数据...")
  9. //等待客户端的连接
  10. val client: Socket = server.accept()
  11. val in: InputStream = client.getInputStream
  12. val objIn = new ObjectInputStream(in)
  13. val task: SubTask = objIn.readObject().asInstanceOf[SubTask]
  14. val ints: List[Int] = task.compute()
  15. println("计算节点[9999]计算的结果为:" + ints)
  16. objIn.close()
  17. client.close()
  18. server.close()
  19. }
  20. }
  1. package com.atguigu.bigdata.spark.core.test
  2. import java.io.{InputStream, ObjectInputStream}
  3. import java.net.{ServerSocket, Socket}
  4. object Executor2 {
  5. def main(args: Array[String]): Unit = {
  6. //启动服务器,接收数据
  7. val server = new ServerSocket(8888)
  8. println("服务器启动,等待接收数据...")
  9. //等待客户端的连接
  10. val client: Socket = server.accept()
  11. val in: InputStream = client.getInputStream
  12. val objIn = new ObjectInputStream(in)
  13. val task: SubTask = objIn.readObject().asInstanceOf[SubTask]
  14. val ints: List[Int] = task.compute()
  15. println("计算节点[8888]计算的结果为:" + ints)
  16. objIn.close()
  17. client.close()
  18. server.close()
  19. }
  20. }

P025【025.尚硅谷_SparkCore - 核心编程 - RDD - 概念介绍】05:31

5.1 RDD

5.1.1 什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

画图工具:Balsamiq Mockups 3

P026【026.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 1】10:11

 

P027【027.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 2】08:49

P028【028.尚硅谷_SparkCore - 核心编程 - RDD - RDD和IO之间的关系】12:24

 

P029【029.尚硅谷_SparkCore - 核心编程 - RDD - 特点】13:34

5.1.1 什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

  • ➢ 弹性
    • ⚫ 存储的弹性:内存与磁盘的自动切换;
    • ⚫ 容错的弹性:数据丢失可以自动恢复;
    • ⚫ 计算的弹性:计算出错重试机制;
    • ⚫ 分片的弹性:可根据需要重新分片。
  • ➢ 分布式:数据存储在大数据集群不同节点上
  • ➢ 数据集:RDD封装了计算逻辑,并不保存数据
  • ➢ 数据抽象:RDD是一个抽象类,需要子类具体实现
  • ➢ 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑
  • ➢ 可分区、并行计算

P030【030.尚硅谷_SparkCore - 核心编程 - RDD - 五大主要配置】11:19

5.1.2 核心属性

RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。

P031【031.尚硅谷_SparkCore - 核心编程 - RDD - 执行原理】03:05

5.1.3 执行原理

从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。

Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务;然后将任务发到已经分配资源的计算节点上,按照指定的计算模型进行数据计算;最后得到计算结果。

P032【032.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 内存】11:02

5.1.4 基础编程

5.1.4.1 RDD创建

在Spark中创建RDD的创建方式可以分为四种:

  • 1) 从集合(内存)中创建 RDD
  • 2) 从外部存储(文件)创建 RDD
  • 3) 从其他 RDD 创建
  • 4) 直接创建 RDD(new)

ctrl+p:快捷键,提示函数参数列表。

 

  1. package com.atguigu.bigdata.spark.core.rdd.builder
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark01_RDD_Memory {
  5. def main(args: Array[String]): Unit = {
  6. //TODO 准备环境
  7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") //*表示当前系统的最大可用核数
  8. val sc = new SparkContext(sparkConf)
  9. //TODO 创建RDD
  10. //从内存中创建RDD,将内存中集合的数据作为处理的数据源
  11. val seq = Seq[Int](1, 2, 3, 4)
  12. //parallelize:并行
  13. //val rdd: RDD[Int] = sc.parallelize(seq)
  14. //makeRDD方法在底层实现时其实就是调用了rdd对象的parallelize方法。
  15. val rdd: RDD[Int] = sc.makeRDD(seq)
  16. rdd.collect().foreach(println)
  17. //TODO 关闭环境
  18. sc.stop()
  19. }
  20. }

P033【033.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件】06:28

 

  1. package com.atguigu.bigdata.spark.core.rdd.builder
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark02_RDD_File {
  5. def main(args: Array[String]): Unit = {
  6. //TODO 准备环境
  7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
  8. val sc = new SparkContext(sparkConf)
  9. //TODO 创建RDD
  10. //从文件中创建RDD,将文件中的数据作为处理的数据源
  11. //path路径默认以当前环境的根路径为基准,可以写绝对路径,也可以写相对路径
  12. //sc.textFile("D:\\allCode\\JetBrains\\IdeaProjects\\atguigu-classes\\datas\\1.txt")
  13. //val rdd: RDD[String] = sc.textFile("datas/1.txt")
  14. //path路径可以是文件的具体路径,也可以目录名称
  15. //val rdd = sc.textFile("datas")
  16. //path路径还可以使用通配符 *
  17. val rdd = sc.textFile("datas/1*.txt")
  18. //path还可以是分布式存储系统路径:HDFS
  19. //val rdd = sc.textFile("hdfs://node1:8020/test.txt")
  20. rdd.collect().foreach(println)
  21. //TODO 关闭环境
  22. sc.stop()
  23. }
  24. }

P034【034.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件1】04:42

  1. package com.atguigu.bigdata.spark.core.rdd.builder
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark02_RDD_File1 {
  4. def main(args: Array[String]): Unit = {
  5. //TODO 准备环境
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
  7. val sc = new SparkContext(sparkConf)
  8. //TODO 创建RDD
  9. //从文件中创建RDD,将文件中的数据作为处理的数据源
  10. //textFile:以行为单位来读取数据,读取的数据都是字符串
  11. //wholeTextFiles:以文件为单位读取数据
  12. //读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
  13. val rdd = sc.wholeTextFiles("datas")
  14. rdd.collect().foreach(println)
  15. //TODO 关闭环境
  16. sc.stop()
  17. }
  18. }

P035【035.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区的设定】11:41

5.1.4.2 RDD 并行度与分区

  1. package com.atguigu.bigdata.spark.core.rdd.builder
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark01_RDD_Memory_Par {
  5. def main(args: Array[String]): Unit = {
  6. //TODO 准备环境
  7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
  8. sparkConf.set("spark.default.parallelism", "5")//5个分区
  9. val sc = new SparkContext(sparkConf)
  10. //TODO 创建RDD
  11. //RDD的并行度 & 分区
  12. //makeRDD方法可以传递第二个参数,这个参数表示分区的数量
  13. //第二个参数可以不传递的,那么makeRDD方法会使用默认值:defaultParallelism(默认并行度)
  14. // scheduler.conf.getInt("spark.default.parallelism", totalCores)
  15. // spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
  16. // 如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数
  17. //val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
  18. val rdd = sc.makeRDD(List(1, 2, 3, 4))
  19. //将处理的数据保存成分区文件
  20. rdd.saveAsTextFile("output")
  21. //TODO 关闭环境
  22. sc.stop()
  23. }
  24. }

P036【036.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区数据的分配】13:54

  1. package com.atguigu.bigdata.spark.core.rdd.builder
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark01_RDD_Memory_Par1 {
  4. def main(args: Array[String]): Unit = {
  5. //TODO 准备环境
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
  7. val sc = new SparkContext(sparkConf)
  8. //TODO 创建RDD
  9. //【1,2】,【3,4】
  10. //val rdd = sc.makeRDD(List(1,2,3,4), 2)
  11. //【1】,【2】,【3,4】
  12. //val rdd = sc.makeRDD(List(1,2,3,4), 3)
  13. //【1】,【2,3】,【4,5】
  14. val rdd = sc.makeRDD(List(1, 2, 3, 4, 5), 3)
  15. //将处理的数据保存成分区文件
  16. rdd.saveAsTextFile("output")
  17. //TODO 关闭环境
  18. sc.stop()
  19. }
  20. }

P037【037.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区的设定】11:33

  1. package com.atguigu.bigdata.spark.core.rdd.builder
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark02_RDD_File_Par {
  4. def main(args: Array[String]): Unit = {
  5. // TODO 准备环境
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 创建RDD
  9. // textFile可以将文件作为数据处理的数据源,默认也可以设定分区。
  10. // minPartitions : 最小分区数量
  11. // math.min(defaultParallelism, 2)
  12. //val rdd = sc.textFile("datas/1.txt")
  13. //如果不想使用默认的分区数量,可以通过第二个参数指定分区数
  14. //Spark读取文件,底层其实使用的就是Hadoop的读取方式
  15. //分区数量的计算方式:
  16. // totalSize = 7
  17. // goalSize = 7 / 2 = 3(byte)
  18. //7 / 3 = 2...1 (1.1) + 1 = 3(分区)
  19. val rdd = sc.textFile("datas/1.txt", 2)
  20. rdd.saveAsTextFile("output")
  21. // TODO 关闭环境
  22. sc.stop()
  23. }
  24. }

P038【038.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配】08:21

  1. package com.atguigu.bigdata.spark.core.rdd.builder
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark02_RDD_File_Par1 {
  4. def main(args: Array[String]): Unit = {
  5. //TODO 准备环境
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
  7. val sc = new SparkContext(sparkConf)
  8. //TODO 创建RDD
  9. //TODO 数据分区的分配
  10. //1. 数据以行为单位进行读取
  11. // spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,和字节数没有关系
  12. //2. 数据读取时以偏移量为单位,偏移量不会被重复读取
  13. /*
  14. 偏移量
  15. 1@@ => 012
  16. 2@@ => 345
  17. 3 => 6
  18. */
  19. //3. 数据分区的偏移量范围的计算
  20. // 0 => [0, 3] => 12
  21. // 1 => [3, 6] => 3
  22. // 2 => [6, 7] =>
  23. //【1,2】,【3】,【】
  24. val rdd = sc.textFile("datas/1.txt", 2)
  25. rdd.saveAsTextFile("output")
  26. //TODO 关闭环境
  27. sc.stop()
  28. }
  29. }

P039【039.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配 - 案例分析】06:13

  1. package com.atguigu.bigdata.spark.core.rdd.builder
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark03_RDD_File_Par2 {
  4. def main(args: Array[String]): Unit = {
  5. // TODO 准备环境
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 创建RDD
  9. // 14byte / 2 = 7byte
  10. // 14 / 7 = 2(分区)
  11. /*
  12. 1234567@@ => 012345678
  13. 89@@ => 9101112
  14. 0 => 13
  15. [0, 7] => 1234567
  16. [7, 14] => 890
  17. */
  18. // 如果数据源为多个文件,那么计算分区时以文件为单位进行分区
  19. val rdd = sc.textFile("datas/word.txt", 2)
  20. rdd.saveAsTextFile("output003")
  21. // TODO 关闭环境
  22. sc.stop()
  23. }
  24. }

P040【040.尚硅谷_SparkCore - 核心编程 - RDD - 算子介绍】07:49

 

P041【041.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map】07:46

5.1.4.3 RDD转换算子

1) map

RDD根据数据处理方式的不同算子整体上分为Value类型、双Value类型Key-Value类型

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark01_RDD_Operator_Transform {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - map
  9. val rdd = sc.makeRDD(List(1, 2, 3, 4))
  10. // 1,2,3,4
  11. // 2,4,6,8
  12. //转换函数
  13. def mapFunction(num: Int): Int = {
  14. num * 2
  15. }
  16. //val mapRDD: RDD[Int] = rdd.map(mapFunction)
  17. //val mapRDD: RDD[Int] = rdd.map((num: Int) => { num * 2 })
  18. //val mapRDD: RDD[Int] = rdd.map((num: Int) => num * 2)
  19. //val mapRDD: RDD[Int] = rdd.map((num) => num * 2)
  20. //val mapRDD: RDD[Int] = rdd.map(num => num * 2)
  21. val mapRDD: RDD[Int] = rdd.map(_ * 2)
  22. mapRDD.collect().foreach(println)
  23. sc.stop()
  24. }
  25. }

P042【042.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 小功能】05:12

小功能:从服务器日志数据apache.log中获取用户请求URL资源路径。

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark01_RDD_Operator_Transform_Test {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - map
  9. val rdd = sc.textFile("datas/apache.log")
  10. // 长的字符串
  11. // 短的字符串
  12. val mapRDD: RDD[String] = rdd.map(
  13. line => {
  14. val datas = line.split(" ")
  15. datas(6)
  16. }
  17. )
  18. mapRDD.collect().foreach(println)
  19. sc.stop()
  20. }
  21. }

P043【043.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 并行计算效果演示】08:54

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark01_RDD_Operator_Transform_Par {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - map
  9. // 1. rdd的计算一个分区内的数据是一个一个地执行逻辑
  10. // 只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。
  11. // 分区内数据的执行是有序的。
  12. // 2. 不同分区数据计算是无序的。
  13. val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)//2个分区
  14. val mapRDD = rdd.map(
  15. num => {
  16. println(">>>>>>>> " + num)
  17. num
  18. }
  19. )
  20. val mapRDD1 = mapRDD.map(
  21. num => {
  22. println("######" + num)
  23. num
  24. }
  25. )
  26. mapRDD1.collect()
  27. sc.stop()
  28. }
  29. }

P044【044.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions】06:12

2) mapPartitions

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark02_RDD_Operator_Transform {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - mapPartitions
  9. val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
  10. // mapPartitions:可以以分区为单位进行数据转换操作
  11. // 但是会将整个分区的数据加载到内存进行引用
  12. // 如果处理完的数据是不会被释放掉,存在对象的引用。
  13. // 在内存较小,数据量较大的场合下,容易出现内存溢出。
  14. val mpRDD: RDD[Int] = rdd.mapPartitions(
  15. iter => {
  16. println(">>>>>>>>>>")
  17. iter.map(_ * 2)
  18. }
  19. )
  20. mpRDD.collect().foreach(println)
  21. sc.stop()
  22. }
  23. }

P045【045.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions - 小练习】03:49

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark02_RDD_Operator_Transform_Test {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - mapPartitions
  9. val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
  10. // 【1,2】,【3,4】
  11. // 【2】,【4】
  12. val mpRDD = rdd.mapPartitions(
  13. iter => {
  14. List(iter.max).iterator
  15. }
  16. )
  17. mpRDD.collect().foreach(println)
  18. sc.stop()
  19. }
  20. }

P046【046.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions & map的区别 - 完成比完美更重要】02:21

思考一个问题:mapmapPartitions的区别

  • 数据处理角度
    • Map 算子是分区内一个数据一个数据的执行,类似于串行操作。
    • 而 mapPartitions 算子是以分区为单位进行批处理操作。
  • 功能的角度
    • Map 算子主要目的将数据源中的数据进行转换和改变,但是不会减少或增多数据。
    • MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变, 所以可以增加或减少数据。
  • 性能的角度
    • Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。
    • 但是mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。

完成比完美更重要。

P047【047.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitionsWithIndex】06:30

3) mapPartitionsWithIndex

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark03_RDD_Operator_Transform1 {
  4. def main(args: Array[String]): Unit = {
  5. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  6. val sc = new SparkContext(sparkConf)
  7. // TODO 算子 - mapPartitions
  8. val rdd = sc.makeRDD(List(1, 2, 3, 4))
  9. val mpiRDD = rdd.mapPartitionsWithIndex(
  10. (index, iter) => {
  11. // 1, 2, 3, 4
  12. //(0,1)(2,2),(4,3),(6,4)
  13. iter.map(
  14. num => {
  15. (index, num)
  16. }
  17. )
  18. }
  19. )
  20. mpiRDD.collect().foreach(println)
  21. sc.stop()
  22. }
  23. }

P048【048.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap】05:07

4) flatMap

 

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark04_RDD_Operator_Transform {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - flatMap
  9. val rdd: RDD[List[Int]] = sc.makeRDD(List(
  10. List(1, 2), List(3, 4)
  11. ))
  12. val flatRDD: RDD[Int] = rdd.flatMap(
  13. list => {
  14. list
  15. }
  16. )
  17. flatRDD.collect().foreach(println)
  18. sc.stop()
  19. }
  20. }
  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark04_RDD_Operator_Transform1 {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - flatMap
  9. val rdd: RDD[String] = sc.makeRDD(List(
  10. "Hello Scala", "Hello Spark"
  11. ))
  12. val flatRDD: RDD[String] = rdd.flatMap(
  13. s => {
  14. s.split(" ")
  15. }
  16. )
  17. flatRDD.collect().foreach(println)
  18. sc.stop()
  19. }
  20. }

P049【049.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap - 小练习】02:41

小功能:将 List(List(1,2),3,List(4,5))进行扁平化操作。

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark04_RDD_Operator_Transform2 {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - flatMap
  9. val rdd = sc.makeRDD(List(List(1, 2), 3, List(4, 5)))
  10. val flatRDD = rdd.flatMap(
  11. data => {
  12. data match {//模式匹配
  13. case list: List[_] => list
  14. case dat => List(dat)
  15. }
  16. }
  17. )
  18. flatRDD.collect().foreach(println)
  19. sc.stop()
  20. }
  21. }

P050【050.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - glom】06:33

5) glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark05_RDD_Operator_Transform_Test {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - glom
  9. val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
  10. // 【1,2】,【3,4】
  11. // 【2】,【4】
  12. // 【6】
  13. val glomRDD: RDD[Array[Int]] = rdd.glom()
  14. val maxRDD: RDD[Int] = glomRDD.map(
  15. array => {
  16. array.max
  17. }
  18. )
  19. println(maxRDD.collect().sum)
  20. sc.stop()
  21. }
  22. }

P051【051.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 理解分区不变的含义】06:48

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark01_RDD_Operator_Transform_Part {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - map
  9. val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
  10. // 【1,2】,【3,4】
  11. rdd.saveAsTextFile("output1")
  12. val mapRDD = rdd.map(_ * 2)
  13. // 【2,4】,【6,8】
  14. mapRDD.saveAsTextFile("output2")
  15. sc.stop()
  16. }
  17. }

P052【052.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy】05:25

6) groupBy

小功能:将 List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark06_RDD_Operator_Transform {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - groupBy
  9. val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
  10. // groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
  11. // 相同的key值的数据会放置在一个组中
  12. def groupFunction(num: Int) = {
  13. num % 2
  14. }
  15. val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
  16. groupRDD.collect().foreach(println)
  17. sc.stop()
  18. }
  19. }
  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark06_RDD_Operator_Transform1 {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - groupBy
  9. val rdd = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)
  10. // 分组和分区没有必然的关系
  11. val groupRDD = rdd.groupBy(_.charAt(0))
  12. groupRDD.collect().foreach(println)
  13. sc.stop()
  14. }
  15. }

P053【053.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - shuffle来袭】06:01

 

P054【054.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - 小练习】07:51

6) groupBy

小功能:从服务器日志数据apache.log中获取每个时间段访问量。

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import java.text.SimpleDateFormat
  3. import java.util.Date
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark.{SparkConf, SparkContext}
  6. object Spark06_RDD_Operator_Transform_Test {
  7. def main(args: Array[String]): Unit = {
  8. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  9. val sc = new SparkContext(sparkConf)
  10. // TODO 算子 - groupBy
  11. val rdd = sc.textFile("datas/apache.log")
  12. val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
  13. line => {
  14. val datas = line.split(" ")
  15. val time = datas(3)
  16. //time.substring(0, )
  17. val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
  18. val date: Date = sdf.parse(time)
  19. val sdf1 = new SimpleDateFormat("HH")
  20. val hour: String = sdf1.format(date)
  21. (hour, 1)
  22. }
  23. ).groupBy(_._1)
  24. timeRDD.map {
  25. case (hour, iter) => {
  26. (hour, iter.size)
  27. }
  28. }.collect.foreach(println)
  29. sc.stop()
  30. }
  31. }

P055【055.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - filter - 数据倾斜】07:11

7) filter

小功能:从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径。

 

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import java.text.SimpleDateFormat
  3. import java.util.Date
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark.{SparkConf, SparkContext}
  6. object Spark07_RDD_Operator_Transform {
  7. def main(args: Array[String]): Unit = {
  8. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  9. val sc = new SparkContext(sparkConf)
  10. // TODO 算子 - filter
  11. val rdd = sc.makeRDD(List(1, 2, 3, 4))
  12. val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0)
  13. filterRDD.collect().foreach(println)
  14. sc.stop()
  15. }
  16. }
  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark07_RDD_Operator_Transform_Test {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - filter
  9. val rdd = sc.textFile("datas/apache.log")
  10. rdd.filter(
  11. line => {
  12. val datas = line.split(" ")
  13. val time = datas(3)
  14. time.startsWith("17/05/2015")
  15. }
  16. ).collect().foreach(println)
  17. sc.stop()
  18. }
  19. }

P056【056.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sample - 抽奖喽】16:11

8) sample

思考一个问题:有啥用,抽奖吗?使用场景:数据倾斜,分区:均衡、shuffle。

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark08_RDD_Operator_Transform {
  4. def main(args: Array[String]): Unit = {
  5. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  6. val sc = new SparkContext(sparkConf)
  7. // TODO 算子 - sample
  8. val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
  9. // sample算子需要传递三个参数
  10. // 1. 第一个参数表示,抽取数据后是否将数据返回,true(放回)、false(丢弃)
  11. // 2. 第二个参数表示,
  12. // 如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
  13. // 如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数
  14. // 3. 第三个参数表示,抽取数据时随机算法的种子
  15. // 如果不传递第三个参数,那么使用的是当前系统时间
  16. // println(rdd.sample(
  17. // false,
  18. // 0.4,
  19. // 1
  20. // ).collect().mkString(",")
  21. // )
  22. println(rdd.sample(
  23. true,
  24. 2,
  25. 1
  26. ).collect().mkString(","))
  27. sc.stop()
  28. }
  29. }

P057【057.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - distinct】06:13

9) distinct

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark09_RDD_Operator_Transform {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - distinct
  9. val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))
  10. // map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
  11. // (1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null)
  12. // (1, null)(1, null)(1, null)
  13. // (null, null) => null
  14. // (1, null) => 1
  15. val rdd1: RDD[Int] = rdd.distinct()
  16. rdd1.collect().foreach(println)
  17. sc.stop()
  18. }
  19. }

P058【058.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - coalesce】11:11

10) coalesce

 

 

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark10_RDD_Operator_Transform {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - coalesce
  9. val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
  10. // coalesce方法默认情况下不会将分区的数据打乱重新组合
  11. // 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
  12. // 如果想要让数据均衡,可以进行shuffle处理
  13. // val newRDD: RDD[Int] = rdd.coalesce(2)
  14. val newRDD: RDD[Int] = rdd.coalesce(2, true)
  15. newRDD.saveAsTextFile("output004")
  16. sc.stop()
  17. }
  18. }

P059【059.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - repartition】07:28

11) repartition

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark11_RDD_Operator_Transform {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - repartition
  9. val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
  10. // coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。
  11. // 所以如果想要实现扩大分区的效果,需要使用shuffle操作
  12. // spark提供了一个简化的操作
  13. // 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
  14. // 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle
  15. // val newRDD: RDD[Int] = rdd.coalesce(3, true)
  16. val newRDD: RDD[Int] = rdd.repartition(3)
  17. newRDD.saveAsTextFile("output005")
  18. sc.stop()
  19. }
  20. }

P060【060.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sortBy】06:31

12) sortBy

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark12_RDD_Operator_Transform1 {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - sortBy
  9. val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)
  10. // sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式
  11. // sortBy默认情况下,不会改变分区,但是中间存在shuffle操作。
  12. val newRDD = rdd.sortBy(t => t._1.toInt, false)
  13. newRDD.collect().foreach(println)
  14. sc.stop()
  15. }
  16. }

P061【061.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链】08:19

13) intersection

14) union

15) subtract

16) zip

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark13_RDD_Operator_Transform {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - 双Value类型
  9. // 交集,并集和差集要求两个数据源数据类型保持一致
  10. // 拉链操作两个数据源的类型可以不一致
  11. val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
  12. val rdd2 = sc.makeRDD(List(3, 4, 5, 6))
  13. // 交集 : 【3,4】
  14. val rdd3: RDD[Int] = rdd1.intersection(rdd2)
  15. println(rdd3.collect().mkString(","))
  16. // 并集 : 【1,2,3,4,3,4,5,6】
  17. val rdd4: RDD[Int] = rdd1.union(rdd2)
  18. println(rdd4.collect().mkString(","))
  19. // 差集 : 【1,2】
  20. val rdd5: RDD[Int] = rdd1.subtract(rdd2)
  21. println(rdd5.collect().mkString(","))
  22. // 拉链 : 【1-3,2-4,3-5,4-6】
  23. val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
  24. println(rdd6.collect().mkString(","))
  25. sc.stop()
  26. }
  27. }

P062【062.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链 - 注意事项】08:10

13) intersection

14) union

15) subtract

16) zip

  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark13_RDD_Operator_Transform {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - 双Value类型
  9. // 交集,并集和差集要求两个数据源数据类型保持一致
  10. // 拉链操作两个数据源的类型可以不一致
  11. val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
  12. val rdd2 = sc.makeRDD(List(3, 4, 5, 6))
  13. val rdd7 = sc.makeRDD(List("3", "4", "5", "6"))
  14. // 交集 : 【3,4】
  15. val rdd3: RDD[Int] = rdd1.intersection(rdd2)
  16. //val rdd8 = rdd1.intersection(rdd7)
  17. println(rdd3.collect().mkString(","))
  18. // 并集 : 【1,2,3,4,3,4,5,6】
  19. val rdd4: RDD[Int] = rdd1.union(rdd2)
  20. println(rdd4.collect().mkString(","))
  21. // 差集 : 【1,2】
  22. val rdd5: RDD[Int] = rdd1.subtract(rdd2)
  23. println(rdd5.collect().mkString(","))
  24. // 拉链 : 【1-3,2-4,3-5,4-6】
  25. val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
  26. val rdd8 = rdd1.zip(rdd7)
  27. println(rdd6.collect().mkString(","))
  28. sc.stop()
  29. }
  30. }
  1. package com.atguigu.bigdata.spark.core.rdd.operator.transform
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark13_RDD_Operator_Transform1 {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 算子 - 双Value类型
  9. // Can't zip RDDs with unequal numbers of partitions: List(2, 4)
  10. // 两个数据源要求分区数量要保持一致
  11. // Can only zip RDDs with same number of elements in each partition
  12. // 两个数据源要求分区中数据数量保持一致
  13. val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
  14. val rdd2 = sc.makeRDD(List(3, 4, 5, 6), 2)
  15. val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
  16. println(rdd6.collect().mkString(","))
  17. sc.stop()
  18. }
  19. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/470652
推荐阅读
相关标签
  

闽ICP备14008679号