当前位置:   article > 正文

Spark2——运行架构、核心编程

spark2

Spark运行架构

运行框架

Spark 框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。 如下图所示,它展示了一个 Spark 执行时的基本结构。图形中的 Driver 表示 master,负责管理整个集群中的作业任务调度。图形中的 Executor 则是 slave,负责实际执行任务

在这里插入图片描述

核心组件

  1. Driver

Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。Driver 在 Spark 作业执行时主要负责:

➢ 将用户程序转化为作业(job)
➢ 在 Executor 之间调度任务(task)
➢ 跟踪 Executor 的执行情况
➢ 通过 UI 展示查询运行情况

  1. Executor

Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。

Executor 有两个核心功能:
➢ 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
➢ 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

  1. Master & Worker

Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能,所以环境中还有其他两个核心组件:Master 和 Worker,这里的 Master 是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于 Yarn 环境中的 RM, 而Worker 呢,也是进程,一个 Worker 运行在集群中的一台服务器上,由 Master 分配资源对数据进行并行的处理和计算,类似于 Yarn 环境中 NM。

  1. ApplicationMaster

Hadoop 用户向 YARN 集群提交应用程序时,提交程序中应该包含 ApplicationMaster,用于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。
说的简单点就是,ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是ApplicationMaster

核心概念

  1. Executor 与 Core

Spark Executor 是集群中运行在工作节点(Worker)中的一个 JVM 进程,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源。这里的资源一般指的是工作节点 Executor 的内存大小和使用的虚拟 CPU 核(Core)数量。
在这里插入图片描述

  1. 并行度(Parallelism)

将整个集群并行执行任务的数量称之为并行度。

提交流程

资源申请+执行步骤
在这里插入图片描述

Spark 应用程序提交到 Yarn 环境中执行的时候,一般会有两种部署执行的方式:Client和 Cluster。两种模式主要区别在于:Driver 程序的运行节点位置。

Spark核心编程

IO基本实现原理

  1. 字节流

读一个,写一个:效率很低
在这里插入图片描述

  1. 缓冲流

将传入的字节放入缓冲区,缓冲区超过阈值,将数据一次性输出,提高处理效率。

在这里插入图片描述

  1. 字符流

实际中读取一行一行的数据,大部分需要使用字符流

三个字节转换为一个字符。

在这里插入图片描述

IO操作体现了装饰者设计模式
在这里插入图片描述

FileInputStream:文件读取
InputStreamReader:字节转换为字符
BufferedReader:进行缓冲

  1. RDD与IO操作的关系

在这里插入图片描述

数据流过,并不保存,保存的只是生成RDD的操作。

RDD

  1. 什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

(1)弹性
⚫ 存储的弹性:内存与磁盘的自动切换
⚫ 容错的弹性:数据丢失可以自动恢复
⚫ 计算的弹性:计算出错重试机制;
⚫ 分片的弹性:可根据需要重新分片

(2)分布式:数据存储在大数据集群不同节点上
(3)数据集:RDD 封装了计算逻辑,并不保存数据
(4)数据抽象:RDD 是一个抽象类,需要子类具体实现
(5)不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
(6)可分区、并行计算

  1. 核心属性

(1)分区列表
RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
(2)分区计算函数
Spark 在计算时,是使用分区函数对每一个分区进行计算(计算逻辑)
(3)RDD 之间的依赖关系
RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系
(4)分区器(可选)
当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区
(5)首选位置(可选)
计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算(判断计算到那个节点,计算效率最优)

移动数据不如移动计算

  1. 执行原理

从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。
执行时,需要将计算资源和计算模型进行协调和整合。
Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的
计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计
算。最后得到计算结果。
RDD 是 Spark 框架中用于数据处理的核心模型,接下来我们看看,在 Yarn 环境中,RDD
的工作原理:
(1) 启动 Yarn 集群
在这里插入图片描述

(2)Spark通过申请资源创建调度节点和计算节点
在这里插入图片描述

(3)Spark 框架根据需求将计算逻辑根据分区划分成不同的任务
在这里插入图片描述

(4)调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

在这里插入图片描述

从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给Executor 节点执行计算。

  1. 基础编程

1)RDD创建

(1)从集合(内存)中创建RDD
从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD

package com.yu.bigdata.spark.core.rddCreate

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDD_Memory {
  def main(args: Array[String]): Unit = {
    //1. 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    //local[*] * 表示当前可用的最大核数,local表示单线程模拟单核
    val sc = new SparkContext(sparkConf)

    //2. 创建RDD
    //从内存中创建RDD,将内存中集合的数据作为处理的数据源
    val seq = Seq[Int](1, 2, 3, 4)
    //parallelize方法
    //val rdd: RDD[Int] = sc.parallelize(seq)  

    //使用makeRDD方法
    //makeRDD方法在底层实现时就是调用了rdd对象的parallelize方法,更易理解
    val rdd: RDD[Int] = sc.makeRDD(seq)
    rdd.collect().foreach(println)

    //3. 关闭环境
    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

(2)从外部存储(文件)创建 RDD
由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase 等。

    //从文件中创建RDD,将文件中的数据作为数据源    //sc.textFile("/Users/yu/Downloads/practise/com.yu.bigdata/datas/1.txt") //绝对路径
    //(1)path路径默认以当前环境的根路径为基准,绝对路径和相对路径均可
    val rdd: RDD[String] = sc.textFile("datas/1.txt")
    
    //(2)路径可以是文件的具体路径,也可以是目录名称
    val rdd = sc.textFile("datas")  //可以读取文件夹下所有文件
    
    //(3)路径还可以使用通配符
    sc.textFile("datas/1*.txt") //这样以1开头的文件内容都会被读取到

    //(4)path也可以是分布式存储系统路径:HDFS
    sc.textFile("hdfs://linux1:8020/text.txt")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

textFile与wholeTextFiles区分:是否能够看出从哪个文件中读取的内容

    //textFile:以行为单位来读取数据,读取的数据都是字符串
    //wholeTextFiles:以文件为单位来读取数据 能够看出从哪个文件中读取的内容
    //读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
    val rdd: RDD[(String, String)] = sc.wholeTextFiles("datas")

    rdd.collect().foreach(println)
    //(file:/Users/yu/Downloads/practise/com.yu.bigdata/datas/2.txt,Hello World Hello Spark)
    //(file:/Users/yu/Downloads/practise/com.yu.bigdata/datas/1.txt,Hello World Hello Spark)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

(3)从其他 RDD创建
主要是通过一个 RDD 运算完后,再产生新的 RDD。
(4)直接创建RDD(new)
使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。

2)RDD并行度与分区

默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能 够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。

A. 集合数据源——分区的分配

package com.yu.bigdata.spark.core.rddCreate

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object RDD_Memory_Par {
  def main(args: Array[String]): Unit = {
    // TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    //sparkConf.set("spark.default.parallelism", "5")   //这样可以指定分区数
    val sc = new SparkContext(sparkConf)

    // TODO 创建RDD 并行度&分区
    //第二个参数表示分区数量 可以不传递,会使用默认并行度 使用totalCores即为当前环境的最大可用核数
    //分区可以在第一步配置环境时指定
    //val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)  //产生两个分区
    val rdd = sc.makeRDD(List(1, 2, 3, 4))       //产生了8个,默认并行度

    //将处理的数据保存成分区文件
    rdd.saveAsTextFile("output")

    // TODO 关闭环境
    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

B. 集合数据源——分区数据的分配:

package com.yu.bigdata.spark.core.rddCreate

import org.apache.spark.{SparkConf, SparkContext}

object RDD_Memory_Par1 {
  def main(args: Array[String]): Unit = {
    // TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    // TODO 创建RDD 并行度&分区
    // 【1,2】 【3,4】
    //val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)  //产生两个分区
    // 【1】 【2】 【3, 4】
    //val rdd = sc.makeRDD(List(1, 2, 3, 4), 3)       //产生3个分区
    // 【1】 【2,3】 【4,5】
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5), 3) //产生3个分区

    //将处理的数据保存成分区文件
    rdd.saveAsTextFile("output")

    // TODO 关闭环境
    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

源码切分函数:
在这里插入图片描述

[1, 2, 3, 4, 5] 3
length: 5 , numSlices: 3
切分数组,分配数据
numSlices : 0 => (0, 1) => [1]
numSlices : 1 => (1, 3) => [2, 3]
numSlices : 2 => (3, 5) => [4, 5]

C. 文件数据源——分区的分配:

package com.yu.bigdata.spark.core.rddCreate

import org.apache.spark.{SparkConf, SparkContext}

object RDD_File_Par {
  def main(args: Array[String]): Unit = {
    // TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    // TODO 创建RDD
    //textFile可以将文件作为数据处理的数据源,默认也可以设定分区
    //minPartitions:最小分区数量
    //math.min(defaultParallelism, 2)  min(8,2)
//    val rdd = sc.textFile("datas/1.txt")
//    rdd.saveAsTextFile("output")

    //如果不想使用默认的分区数量,可以通过第二个参数指定分区数
    //val rdd = sc.textFile("datas/1.txt", 3)  //三个分区
    val rdd = sc.textFile("datas/1.txt", 2)  //两个分区
    //分区数量的计算方式   1/ENTER2/ENTER3  =>  1+2+1+2+1=7
    //Spark
    //totalSize : 7   ENTER两个字节 共七个字节
    //goalSize : 7 / 2 = 3 (byte)每个分区存放三个字节
    //7 / 3 = 2……1 剩下1 (1.1)
    // 需要判断 剩下的字节数占每个分区字节数 的百分比大于10% 就要产生新的分区 否则不会
    //所以 2 + 1 ,一共三个分区

    // TODO 关闭环境
    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

D. 文件数据源——分区数据的分配:

RDD转换算子

RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型。

RDD方法:(RDD算子)
转换:功能的补充和封装,将旧的RDD包装成新的RDD(flatMap,map)
行动:触发任务的调度和作业的执行(collect)

Value类型

(1)map
函数说明:
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

函数签名:
def map[U: ClassTag](f: T => U): RDD[U]

	val rdd = sc.makeRDD(List(1, 2, 3, 4))
    val mapRDD: RDD[Int] = rdd.map(_ * 2)
  • 1
  • 2

并行执行:

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform_Par {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // TODO 算子 —— map
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

    //1. rdd计算一个分区内的数据是一个一个执行
    //   只有前面的数据全部的逻辑执行完毕后,才会执行下一个数据
    //   分区内数据的执行是有序的
    //2. 不同分区的数据执行是无序的
    val mapRDD = rdd.map (
      num => {
        println(">>>>>>" + num)
        num
      }
    )
    val mapRDD1 = mapRDD.map (
      num => {
        println("######" + num)
        num
      }
    )

    mapRDD1.collect()
    //一个分区时的结果     //两个分区时的结果
    //>>>>>>1           >>>>>>1
    //######1           >>>>>>3
    //>>>>>>2           ######3
    //######2           ######1
    //>>>>>>3           >>>>>>4
    //######3           >>>>>>2
    //>>>>>>4           ######2
    //######4           ######4

    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

(2)mapPartitions
函数说明:
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

函数签名:
def mapPartitions[U: ClassTag](
  f: Iterator[T] => Iterator[U],
  preservesPartitioning: Boolean = false): RDD[U]

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_MapPartitions {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // TODO 算子 —— mapPartitions
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

    //可以以分区为单位进行数据转换操作,但是会将整个分区的数据加载到内存进行引用
    //处理完的数据是不会被释放掉的,因为存在对象的引用
    //在数据量多,内存较小的情况下,容易出现内存溢出  那么使用map更好
    val mapRDD: RDD[Int] = rdd.mapPartitions(
      iter => {
        println(">>>>>>")
        iter.map(_ * 2)
      }
    )
    mapRDD.collect().foreach(println)
    //>>>>>>  有两个分区,迭代器走了两次,是以分区为单位进行的处理
    //>>>>>>
    //2
    //4
    //6
    //8

    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

获取每个数据分区的最大值:

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_MapPartitions_test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // TODO 算子 —— mapPartitions
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
    // TODO 获取每个分区的最大值
    val mapRDD = rdd.mapPartitions(
      iter => {
        List(iter.max).iterator
      }
    )
    mapRDD.collect().foreach(println)
    //2
    //4

    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

map 和 mapPartitions 的区别:
➢ 数据处理角度
  Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。

➢ 功能的角度
  Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据。

➢ 性能的角度
  Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。

(3)mapPartitionsWithIndex
函数说明:
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引

函数签名:
def mapPartitionsWithIndex[U: ClassTag](
  f: (Int, Iterator[T]) => Iterator[U],
  preservesPartitioning: Boolean = false): RDD[U]

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_MapPartitionsWithIndex {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // TODO 算子 —— mapPartitionsWithIndex
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
    val mapRDD = rdd.mapPartitionsWithIndex(
      (index, iter) => {
        if ( index == 1 ) {
          iter
        } else {
          Nil.iterator  //空集合
        }
      }
    )
    mapRDD.collect().foreach(println)
    //3
    //4
    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

打印出每个分区内的数据

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_MapPartitionsWithIndex1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // TODO 算子 —— mapPartitionsWithIndex
    val rdd = sc.makeRDD(List(1, 2, 3, 4)) //这样默认会产生8个分区
    //输出每个分区的数据 (分区索引, 分区内元素)
    val mapRDD = rdd.mapPartitionsWithIndex(
      (index, iter) => {
        iter.map(
          num => {
            (index, num)
          }
        )
      }
    )
    mapRDD.collect().foreach(println)
    //(1,1)
    //(3,2)
    //(5,3)
    //(7,4)

    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

(4)flatMap
函数说明:
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射。

函数签名:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_flatMap {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("flatMap")
    val sc = new SparkContext(sparkConf)

    // TODO 算子flatMap
    val rdd: RDD[List[Int]] = sc.makeRDD(List(
      List(1, 2), List(3, 4)
    ))

    val flatRDD: RDD[Int] = rdd.flatMap(
      list => {
        list    //返回一个迭代器,flatMap知道怎么拆
      }
    )
    flatRDD.collect().foreach(println)
    //1
    //2
    //3
    //4

    val rdd1: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val flatRDD1: RDD[String] = rdd1.flatMap(
      s => {
        s.split(" ") //返回一个迭代器,告诉flatMap怎么拆分
      }
    )
    flatRDD1.collect().foreach(println)
    //Hello
    //Scala
    //Hello
    //Spark
	
	//将 List(List(1,2),3,List(4,5))进行扁平化操作
    val rdd2: RDD[Any] = sc.makeRDD(List(List(1, 2), 3, List(3, 4)))
    val flatRDD2 = rdd2.flatMap(
      data => {
        data match {
          case list:List[_] => list  //是list返回自身
          case data => List(data)   //转换为list
        }
      }
    )
    flatRDD2.collect().foreach(println)
    //1
    //2
    //3
    //3
    //4
    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

(5)glom
函数说明:
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
转换前后,分区不变,并行度不变。
在这里插入图片描述

函数签名:
def glom(): RDD[Array[T]]

一个分区的数据 => 一个数组

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark05_RDD_glom {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(sparkConf)

    // TODO -glom
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
    val glomRDD: RDD[Array[Int]] = rdd.glom()
    glomRDD.collect().foreach( data => println(data.mkString(",")))
    //1,2
    //3,4

    //    计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
    //上面已经将list转换成两个分区的集合了,可以直接统计每个集合中的最大值
    val maxRDD: RDD[Int] = glomRDD.map(
      array => {
        array.max
      }
    )
    println(maxRDD.collect().sum)
    //6
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

(6)groupBy
函数说明:
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中。
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组。

在这里插入图片描述
groupBy会将数据打乱(打散),重新组合,这个操作称之为shuffle。分组与分区没有必然的关系

函数签名:
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_groupBy {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("groupBy")
    val sc = new SparkContext(sparkConf)

    // TODO groupBy
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    //groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
    //相同key值的数据会放置在一个组里
    def groupFunction(num: Int): Int = {
      num % 2
    }

    val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
    groupRDD.collect().foreach(println)
    //(0,CompactBuffer(2, 4))
    //(1,CompactBuffer(1, 3))

    // TODO 按照首字母进行分组
    val rdd1: RDD[String] = sc.makeRDD(List("Hello", "Spark", "Hello", "Scala"))
    val groupRDD1: RDD[(Char, Iterable[String])] = rdd1.groupBy(_.charAt(0))
    groupRDD1.collect().foreach(println)
    //(H,CompactBuffer(Hello, Hello))
    //(S,CompactBuffer(Spark, Scala))
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

从服务器日志数据 apache.log 中获取每个时间段访问量:

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import java.text.SimpleDateFormat
import java.util.Date

object Spark06_RDD_groupBy_test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("groupBy")
    val sc = new SparkContext(sparkConf)

    // TODO 从服务器日志数据 apache.log 中获取每个时间段访问量
    val rdd = sc.textFile("datas/apache.log")
    val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
      line => {
        val datas = line.split(" ") //根据空格切分
        val time = datas(3) //取出时间段
        val hour: String = time.split(":")(1)
        val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
        val date: Date = sdf.parse(time)  //按指定的源格式把String转换为Date对象
        val sdf1 = new SimpleDateFormat("HH")
        val hour: String = sdf1.format(date) //按指定的目标格式把Date对象转换为String
        //date.getHours
        (hour, 1)
      }
    ).groupBy(_._1)

    val groupRDD: RDD[(String, Int)] = timeRDD.map {
      case (hour, iter) => {
        (hour, iter.size)
      }
    }
    groupRDD.collect().foreach(println)
    //(06,366)
    //(20,486)
    //……
    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

(7)filter

函数说明:
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

函数签名:
def filter(f: T => Boolean): RDD[T]

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark07_RDD_filter {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("filter")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    //取出奇数
    val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0)
    filterRDD.collect().foreach(println)
    //1
    //3

    val rdd1: RDD[String] = sc.textFile("datas/apache.log")
    val filterRDD1: RDD[String] = rdd1.filter(
      line => {
        val data = line.split(" ")
        val time = data(3)
        time.startsWith("17/05/2015")
      }
    )
    filterRDD1.collect().foreach(println)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

(8)sample
函数说明:
根据指定的规则从数据集中抽取数据。

函数签名:
def sample(
  withReplacement: Boolean,
  fraction: Double,
  seed: Long = Utils.random.nextLong): RDD[T]

数据抽取不放回(伯努利算法):
第一个参数:抽取的数据是否放回,false:不放回
第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
第三个参数:随机数种子

数据抽取放回(泊松算法):
第一个参数:抽取的数据是否放回,true:放回;false:不放回
第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
第三个参数:随机数种子

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.{SparkConf, SparkContext}

object Spark8_RDD_Sample {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("filter")
    val sc = new SparkContext(sparkConf)

    // TODO filter
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8))

    // 第一个参数:抽取的数据是否放回,false:不放回
    // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
    //           抽取不放回:伯努利分布,数据源中每条数据被抽取的概率  阈值
    //           抽取放回:泊松分布,大于等于0,表示数据源中的每条数据被抽取的可能次数
    // 第三个参数:随机数种子  若不传递此参数,使用的是当前系统时间,所以每次都会不同
//    rdd.sample(
//      false,
//      0.4,
//      3
//    ).collect().foreach(println)
    println(rdd.sample(
      true,
      4,
      3
    ).collect().mkString(",")
    )
    //1,1,2,2,3,3,4,4,4,4,5,5,5,5,5,6,6,6,6,6,7,7,8,8,8,8,8
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

可以使用采样方法测试数据是否有倾斜现象。

(9)distinct
函数说明:
将数据集中重复的数据去重

函数签名:
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.{SparkConf, SparkContext}

object Spark09_RDD_distinct {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("filter")
    val sc = new SparkContext(sparkConf)
    // TODO distinct
    // case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
    val rdd = sc.makeRDD(List(1, 2, 3, 1, 2, 5, 4, 8))
    val disRDD = rdd.distinct()
    disRDD.collect().foreach(println)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

(10)coalesce
函数说明:
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

函数签名:
def coalesce(numPartitions: Int, shuffle: Boolean = false,
    partitionCoalescer:
    Option[PartitionCoalescer] = Option.empty)
    (implicit ord: Ordering[T] = null)
  : RDD[T]

在这里插入图片描述

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.{SparkConf, SparkContext}

object Spark10_RDD_coalesce {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("coalesce")
    val sc = new SparkContext(sparkConf)
    // TODO coalesce
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
    val newRDD = rdd.coalesce(2)  //1 2在一个分区, 3,4,5,6在一个分区
    val newRDD1 = rdd.coalesce(2, true)  //进行shuffle操作  数据均衡了

    //coalesce方法默认情况下不会将分区的数据打乱重新组合
    //这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
    //如果想要让数据均衡,可以进行shuffle操作
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

(11)repartition
函数说明:
该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。

函数签名:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.{SparkConf, SparkContext}

object Spark11_RDD_repartition {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("repartition")
    val sc = new SparkContext(sparkConf)
    // TODO repartition
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)

    //coalesce算子可以扩大分区,但是如果不进行shuffle操作是没有意义的,不起作用
    //所以如果想要实现扩大分区的效果,需要使用shuffle操作
//    val newRDD = rdd.coalesce(3)  //1:123 2:345 3:null
    val newRDD = rdd.coalesce(3, true)

    //简化操作:缩减分区使用coalesce操作,扩大分区使用repartition
    // repartition底层调用了coalesce,一定调用shuffle操作
    val newRDD1 = rdd.repartition(3)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

(12)sortBy
函数说明:
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程。

函数签名:
def sortBy[K](
 f: (T) => K,
 ascending: Boolean = true,
 numPartitions: Int = this.partitions.length)
 (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark_RDD_sortBy {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sortBy")
    val sc = new SparkContext(sparkConf)
    // TODO sortBy
    val rdd = sc.makeRDD(List(6, 2, 4, 3, 1, 5),2)  //底层有shuffle
    val newRDD = rdd.sortBy(num => num)
    newRDD.collect().foreach(println)

    //sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序方式
    //sortBy默认情况下不会改变分区,但是中间存在shuffle操作
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)
    //val newRDD1 = rdd1.sortBy( t => t._1 )
    val newRDD1 = rdd1.sortBy( t => t._1.toInt , false)
    newRDD1.collect().foreach(println)

    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

双Value类型

(13)intersection
对源 RDD 和参数 RDD 求交集后返回一个新的 RDD

(14)union
对源 RDD 和参数 RDD 求并集后返回一个新的 RDD

(15)substract
以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集

(16)zip
将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark13_RDD_Collect {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("operator")
    val sc = new SparkContext(sparkConf)
    // TODO 双Value类型
    val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
    val rdd2 = sc.makeRDD(List(3, 4, 5, 6))

    //交集
    val rdd3 = rdd1.intersection(rdd2)
    println(rdd3.collect().mkString(","))
    //3,4

    //并集
    val rdd4 = rdd1.union(rdd2)
    println(rdd4.collect().mkString(","))
    //1,2,3,4,3,4,5,6

    //差集
    val rdd5 = rdd1.subtract(rdd2)
    println(rdd5.collect().mkString(","))
    //1,2

    //拉链
    val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
    println(rdd6.collect().mkString(","))
    //(1,3),(2,4),(3,5),(4,6)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

PS:
交集、并集、差集要求两个数据源数据类型保持一致;
拉链操作两个数据源的数据类型可以不一致,但数据源的分区数量需要保持一致,且两个数据源要求分区中数据数量保持一致。

Key-Value类型

(17)partitionBy
将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner。

如果重分区的分区器和当前RDD的分区器名称和数量均相同,那么数据分区不会再变化。

	val rdd = sc.makeRDD(List(1, 2, 3, 4))
    val mapRDD: RDD[(Int, Int)] = rdd.map((_, 1))

    //partitionBy根据指定的分区规则对数据进行重分区
    mapRDD.partitionBy(new HashPartitioner(2))
  • 1
  • 2
  • 3
  • 4
  • 5

(18)reduceByKey
可以将数据按照相同的 Key 对 Value 进行聚合

    val rdd = sc.makeRDD(List(
      ("a", 1), ("b", 2), ("a", 3), ("c", 5)
    ))

    //可以将数据按照相同的 Key 对 Value 进行聚合
    //Scala中一般的聚合操作是两两聚合,spark基于scala开发,所以也是两两聚合
    //reduceByKey中如果key的数据只有一个,是不会参与运算的
    val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey((x: Int, y: Int) => x + y)
    reduceRDD.collect().foreach(println)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

(19)groupByKey
将数据源的数据根据 key 对 value 进行分组

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark16_RDD_groupByKey {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("operator")
    val sc = new SparkContext(sparkConf)
    // TODO Key-Value类型
    val rdd = sc.makeRDD(List(
      ("a", 1), ("b", 2), ("a", 3), ("c", 5)
    ))
    //将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组
    //元组中的第一个元素就是key
    //元组中的第二个元素是相同key的value集合
    val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
    groupRDD.collect().foreach(println)
    //(a,CompactBuffer(1, 3))
    //(b,CompactBuffer(2))
    //(c,CompactBuffer(5))

    val groupRDD1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)
    groupRDD1.collect().foreach(println)
    //(a,CompactBuffer((a,1), (a,3)))
    //(b,CompactBuffer((b,2)))
    //(c,CompactBuffer((c,5)))
    sc.stop()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

reduceByKey与groupByKey区别:

在这里插入图片描述

groupByKey会导致数据打乱重组,存在shuffle操作。Spark中,shuffle操作必须落盘处理,不能在内存中数据等待,会导致内存溢出,shuffle操作的性能非常低。

在这里插入图片描述

预聚合:
在这里插入图片描述

reuceBykey可以对分区内的数据进行预聚合,可以有效减少shuffle时落盘的数据量。(reduceByKey分区内和分区间计算规则是相同的。)

从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高
从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey。

(20)aggregateByKey
将数据根据不同的规则进行分区内计算和分区间计算

val rdd = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("a", 3), ("a", 4)
    ), 2)
    //( (a, [1, 2]), (a, [3, 4]) )
    //(a, 2) (a, 4)
    //(a, 6)

    //aggregateByKey存在函数柯里化,有两个参数列表
    //第一个参数列表,需要传递一个参数,表示为初值值
    //      主要用于遇到第一个key时,和value进行分区内计算
    //第二个参数列表需要传递两个参数
    //      第一个参数表示分区内计算规则
    //      第一个参数表示分区间计算规则
    val aggRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(
      (x, y) => math.max(x, y),   //分区内取最大值
      (x, y) => x + y			//分区间求和
    )
    aggRDD.collect().foreach(println)
    //(a,6)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

aggregateByKey计算流程:
在这里插入图片描述

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark17_RDD_aggregateByKey_test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("operator")
    val sc = new SparkContext(sparkConf)
    // TODO Key-Value类型
    val rdd = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("b", 3),
      ("b", 4), ("b", 5), ("a", 6)
    ), 2)

    //aggregateByKey最终的返回数据结果应该和初始值的类型保持一致
    val newRDD: RDD[(String, String)] = rdd.aggregateByKey("")(_ + _, _ + _)

    //获取相同key的数据的平均值 => (a, 3)(b, 4)
    val newRDD1: RDD[(String, (Int, Int))] = rdd.aggregateByKey((0, 0))(  //第一个0代表用于比较的初始值,第二个0代表相同key的出现次数
      (t, v) => {  //t代表元组,v代表相应的值,t._1是加和产生的值,t._2是累加key的出现次数
        (t._1 + v, t._2 + 1)
      },    //分区内,相同key相加和,并统计出现次数
      (t1, t2) => {  //将分区间的统计值相加
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )
    //只对v采取操作使用mapValues
    val AverRDD: RDD[(String, Int)] = newRDD1.mapValues {
      case (sum, cnt) => {
        sum / cnt
      }
    }
    newRDD1.collect().foreach(println)
    //(b,(12,3))
    //(a,(9,3))

    AverRDD.collect().foreach(println)
    //(b,4)
    //(a,3)

    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

(21)foldByKey
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

    val rdd = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("b", 3),
      ("b", 4), ("b", 5), ("a", 6)
    ), 2)

    //如果分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
    rdd.foldByKey(0)(_+_).collect().foreach(println)
    //(b,12)
    //(a,9)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

(22)combineByKey
最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

val newRDD = rdd.combineByKey(
      v => (v, 1), //进行结构转换
      //下面需要加上类型,动态的
      (t: (Int, Int), v) => {
        (t._1 + v, t._2 + 1)
      },
      (t1: (Int, Int), t2: (Int, Int)) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别:
reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。

(23)sortByKey
在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的 默认升序

    val rdd = sc.makeRDD(List(
      ("a", 1), ("d", 2), ("b", 3)
    ), 2)

    val sortRDD: RDD[(String, Int)] = rdd.sortByKey()
    sortRDD.collect().foreach(println)
    //(a,1)
    //(b,3)
    //(d,2)
    val sortRDD1 = rdd.sortByKey(false)
    sortRDD1.collect().foreach(println)
    //(d,2)
    //(b,3)
    //(a,1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

(24)join
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的

val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2), ("c", 3)
    ), 2)
    val rdd2 = sc.makeRDD(List(
      ("a", 4), ("b", 5), ("c", 6)
    ), 2)

    //join:两个不同数据源的数据,相同的key的value会连接在一起,形成元组
    //      如果两个数据源中key没有匹配上,那么数据不会出现在结果中
    //      如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会指数增长,性能会降低
    val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
    joinRDD.collect().foreach(println)
    //(b,(2,5))
    //(a,(1,4))
    //(c,(3,6))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

(25)leftOuterJoin
类似于 SQL 语句的左外连接

    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2), ("c", 3)
    ), 2)
    val rdd2 = sc.makeRDD(List(
      ("a", 4), ("b", 5)//, ("c", 6)
    ), 2)

    //rdd1为主表,主表中的都会输出
    val leftJoinRDD: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
    leftJoinRDD.collect().foreach(println)
    //(b,(2,Some(5)))
    //(a,(1,Some(4)))
    //(c,(3,None))
    val rightJoinRDD = rdd1.rightOuterJoin(rdd2)  //右连接
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

(26)cogroup
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD

val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2)//, ("c", 3)
    ), 2)
    val rdd2 = sc.makeRDD(List(
      ("a", 4), ("b", 5), ("c", 6), ("c", 7)
    ), 2)

    //cogroup: connect + group  分组 连接
    val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
    cgRDD.collect().foreach(println)
    //(b,(CompactBuffer(2),CompactBuffer(5)))
    //(a,(CompactBuffer(1),CompactBuffer(4)))
    //(c,(CompactBuffer(),CompactBuffer(6, 7)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

案例实操

统计出每一个省份每个广告被点击数量排行的 Top3。

package com.yu.bigdata.spark.core.operatorTransform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark24_RDD_Req {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("operator")
    val sc = new SparkContext(sparkConf)
    // TODO 案例实操
    //1. 获取原始数据 时间戳,省份,城市,用户,广告
    val dataRdd: RDD[String] = sc.textFile("datas/agent.log")

    //2. 将原始数据进行结构的转换,便于统计
    //取出省份,广告 ((河北,A), 1)
    val mapRDD: RDD[((String, String), Int)] = dataRdd.map(
      line => {
        val datas = line.split((" "))
        ((datas(1), datas(4)), 1)
      }
    )

    //3. 将转换结构后的数据分组聚合
    //  ((河北,A), 1)  => ((河北,A), sum)
    val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _)

    //4. 将聚合的结果进行结构转换  ((河北,A), sum) => (河北,(A, sum) )
    //模式匹配
    val newRDD: RDD[(String, (String, Int))] = reduceRDD.map {
      case ((prv, ad), sum) => {
        (prv, (ad, sum))
      }
    }

    //5. 将转换结构后的数据根据省份进行分组
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = newRDD.groupByKey()

    //6. 将分组后的数据组内排序(降序),取前三
    val resultRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues(
      iter => {
        iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
      }
    )

    //7. 采集数据,进行打印
    resultRDD.collect().foreach(println)
    //(4,List((12,25), (2,22), (16,22)))
    //(8,List((2,27), (20,23), (11,22)))
    //……

    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/593240
推荐阅读
相关标签
  

闽ICP备14008679号