当前位置:   article > 正文

Spark基础

Spark基础

Spark 基础

Spark 简单介绍

Why Spark
一、MapReduce编程模型的局限性

1、繁杂:只有Map和Reduce两个操作,复杂的逻辑需要大量的样板代码

2、处理效率低:
2.1、Map中间结果写磁盘,Reduce写HDFS,多个Map通过HDFS交换数据
2.2、任务调度与启动开销大

3、不适合迭代处理、交互式处理和流式处理

二、Spark是类Hadoop MapReduce的通用【并行】框架

1、Job中间输出结果可以保存在内存,不再需要读写HDFS

2、比MapReduce平均快10倍以上

三、版本

2014 1.0

2016 2.x

2020 3.x

四、优势

1、速度快
基于内存数据处理,比MR快100个数量级以上(逻辑回归算法测试)
基于硬盘数据处理,比MR快10个数量级以上

2、易用性
支持Java、【Scala】、【Python:pyspark】、R语言
交互式shell方便开发测试

3、通用性
一栈式解决方案:
批处理
交互式查询 spark sql
实时流处理(微批处理) spark streaming
图计算 spark graphic
机器学习 spark mlib

4、多种运行模式
YARN,生产环境(ApplicationMaster)
Standalone,生产环境(Master/Worker)
Local[*] ,本地模式【初学】√

Spark 技术栈

(图片侵权可删)

1、Spark Core:核心组件,分布式计算引擎 RDD

2、Spark SQL:高性能的基于Hadoop的SQL解决方案

3、Spark Streaming:可以实现高吞吐量、具备容错机制的准实时流处理系统

4、Spark GraphX:分布式图处理框架

5、Spark MLlib:构建在Spark上的分布式机器学习库

spark-shell:Spark自带的交互式工具

local:spark-shell --master local[*]
alone:spark-shell --master spark://MASTERHOST:7077
yarn :spark-shell --master yarn

Spark服务(服务上跑的是进程)

Master : Cluster Manager
Worker : Worker Node

Spark架构核心组件

Application :
建立在Spark上的用户程序,包括Driver代码和运行在集群各节点Executor中的代码

Driver Program :
驱动程序,运行Application中的main函数并创建SparkContext

SparkContext :
Spark应用程序的入口,负责调度各个运算资源,协调各个Worker Node上的Executor

Cluster Manager :
在集群(Standalone、Mesos、YARN)上获取资源的外部服务

Worker Node :
集群中任何可以运行Application代码的节点,运行一个或多个Executor进程

Executor :
Application运行在Worker Node上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上

Task :
运行在Executor上的工作单元

Job :
多个Task组成的并行计算,由Action触发生成,一个Application中含多个Job

Stage :
每个Job会被拆分成多组Task,作为一个TaskSet,其名称为Stage

在这里插入图片描述

1、在驱动程序中,通过SparkContext主导应用的执行

2、SparkContext可以连接不同类型的 CM(Standalone、YARN),连接后,获得节点上的 Executor

3、一个节点默认一个Executor,可通过 SPARK_WORKER_INSTANCES 调整

4、每个应用获取自己的Executor

5、每个Task处理一个RDD分区

SparkContext

/*
连接Driver与Spark Cluster(Workers)
Spark的主入口
每个JVM仅能有一个活跃的SparkContext 

【配置】
	master:
    	local[*] : CPU核数为当前环境的最大值
    	local[2] : CPU核数为2
    	local : CPU核数为1
    	yarn
*/
val conf:SparkConf = new SparkConf()
      .setAppName(name:String)
	  .set(key:String,value:String) // 多项设置
      .setMaster(master:String)
val sc: SparkContext = SparkContext.getOrCreate(conf)

/**
	封装:工具类
*/
class SparkCom(appName:String,master:String,logLevel:String="INFO") {
  private val conf:SparkConf = new SparkConf().setAppName(appName).setMaster(master)
  private var _sc:SparkContext = _
  private var _spark:SparkSession = _
  def sc() = {
    if (Objects.isNull(_sc)) {
      _sc = new SparkContext(conf)
      _sc.setLogLevel(logLevel)
    }
    _sc
  }
  def spark() = {
    if (Objects.isNull(_spark)) {
      _spark = SparkSession.builder().config(conf).getOrCreate()
    }
    _spark
  }
  def stop() = {
    if (Objects.nonNull(_sc)) {
      _sc.stop()
    }
    if (Objects.nonNull(_spark)) {
      _spark.stop()
    }
  }
}
object SparkCom{
  def apply(appName:String): SparkCom = new SparkCom(appName,"local[*]")
  def apply(appName:String, master:String): SparkCom = new SparkCom(appName,master)
  def apply(appName:String, master:String, logLevel:String): SparkCom = new SparkCom(appName,master,logLevel)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

RDD

RDD的简单介绍

RDD:Spark核心,主要数据抽象

将数据项拆分为多个分区的集合,存储在集群的工作节点上的内存和【磁盘】

RDD是用于数据转换的接口

RDD指向了
或存储在(HIVE)HDFS、Cassandra、HBase等
或缓存(内存、内存+磁盘、仅磁盘等)
或在故障或缓存收回时重新计算其他RDD分区中的数据

RDD:弹性分布式数据集(Resilient Distributed Datasets)

抽象的:RDD并不存储真正的数据,只是【对数据和操作】的描述

分布式数据集:
RDD是只读的、分区记录的集合,每个分区分布在集群的不同节点上,每个任务处理一个分区,每个分区上都有compute函数,用来计算该分区中的数据

弹性:
RDD默认存放在内存中,当内存不足,Spark自动将RDD写入磁盘

容错性:
根据【数据血统】,可以自动从节点失败中恢复分区

RDD分区:Partition -> Partitioner -> Hash | Range …

分区是RDD被拆分并发送到节点的不同块之一

我们拥有的分区越多,得到的并行性就越强(RDD 是可以并行操作的元素的容错集合)

每个分区都是被分发到不同Worker Node的候选者

每个分区对应一个Task

获取RDD的方式
/*
1.小型化数据集
2.读本地文件
3.读远程文件(HDFS)
*/
// 集合创建:小数据集,可通过 numSlices 指定并行度(分区数)
val rdd: RDD[T] = sc.parallelize(seq:Seq[T], numSlices:Int) // ✔
val rdd: RDD[T] = sc.makeRDD(seq:Seq[T], numSlices:Int) // 调用了 parallelize

// 外部数据源创建: 可通过 minPartitions 指定分区数,CPU核占用数
// 文件系统:local(file:///...)或hadoop(hdfs://)
val rdd: RDD[String] = sc.textFile(path:String, minPartitions:Int)【path】
val rdd: RDD[String] = sc.wholeTextFiles(dir:String, minPartitions:Int)【dir】

// 其他 RDD 创建
val rdd2: RDD[Map[String, Int]] = rdd
      .mapPartitions(_
        .map(
          _
            .split("[^a-zA-Z]+")
            .map((_, 1))
            .groupBy(_._1)
            .map(t2 => (t2._1, t2._2.length))
        )
      )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
RDD操作类型

分为lazy与non-lazy两种

Transformation(lazy):也称转换操作、转换算子,返回一个新的RDD

Actions(non-lazy):立即执行,也称动作操作、动作算子,行动(为)算子,无返回值或者返回其他的

RDD中的所有转换都是惰性求值/延迟执行的,也就是说不会直接进行计算,只有当发生一个要求返回结果给 Driver 的 Action(行动算子) 动作时,这些转换才会真正进行。

之所以使用惰性求值/延迟执行,是因为这样可以在 Action 时对 RDD 操作形成 DAG 有向无环图进行 Stage 的划分和并行优化,这种设计让 Spark 更加有效率的进行。

RDD与DAG: Spark提供的核心抽象

DAG【有向无环图:如下图】反映了RDD之间的依赖关系

转换(transform)算子(窄依赖)==》stage1里面的相互平行

shuffle算子(宽依赖)==》stage2里面的相互交叉

action算子(行为算子)==》最后一个算子为行为算子,不到它不进行计算

在这里插入图片描述(图片侵权可删)

数据准备
// 数据类
case class Customer(
    cus_id            :Int,
    cus_fname         :String,
    cus_lname         :String,
    cus_email         :String,
    cus_password      :String,
    cus_street        :String,
    cus_city          :String,
    cus_state         :String,
    cus_zipcode       :String
)
case class OrderItem(
    ori_id            :Int,
    ori_order_id      :Int,
    ori_product_id    :Int,
    ori_quantity      :Int,
    ori_subtotal      :Float,
    ori_product_price :Float
)
case class Order(
    or_id             :Int,
    or_date           :String,
    or_customer_id    :Int,
    or_status         :String
)
case class Product(
    pro_id            :Int,
    pro_category_id   :Int,
    pro_name          :String,
    pro_description   :String,
    pro_price         :Float,
    pro_image         :String
)

// 工具
implicit class EntityUtil(line:String){
    val r_customer = "\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\"".r
    val r_product = "\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\"".r
    val r_order = "\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\"".r
    val r_order_item = "\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\"".r

    def toCustomer = line match {
        case r_customer(cust_id,fName,lName,email,password,street,city,state,zipcode)
        	=>Customer(cust_id.toInt,fName,lName,email,password,street,city,state,zipcode)
    }
    def toProduct = line match {
        case r_product(prod_id,category_id,name,description,price,image)
 			=>Product(prod_id.toInt,category_id.toInt,name,description,price.toFloat,image)
    }
    def toOrder = line match {
        case r_order(or_id,date,customer_id,status)
        	=>Order(or_id.toInt,date,customer_id.toInt,status)
    }
    def toOrderItem = line match {
        case r_order_item(ori_id,order_id,product_id,quantity,subtotal,product_price)
        	=>OrderItem(ori_id.toInt,order_id.toInt,product_id.toInt,quantity.toInt,subtotal.toFloat,product_price.toFloat)
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
转换算子:RDD transform(窄依赖)
/*
	默认的并行度:200
	控制并行度:
		带(并行度) 的算子(执行完计算后对数据进行再分配)
		  并行度的体现方式:
			分区数:numParitions|numSlices:Int  5[, fieldName]
				partitionIndex = fieldName.hashCode() % numParitions
				扩展随机字段:0~numPartitions
			分区器:partioner:Partioner (key)
				默认的分区器:HashPartitioner
		再分区算子(对数据进行重新分配)
			coalesce(numPartitions:Int, shuffle:Boolean)
			✔ repartition(numPartitions:Int) => coalesce(numPartitions, true)

	简单类型 RDD[T]
	val rddLine: RDD[String] = sc.textFile("/spark/practice_01/customers", 4)
*/


// 【逐条处理】
val rdd2: RDD[U] = rdd.map(f:T=>U)
val rddAggByState: RDD[(String, Int)] = rddGroupByState.map(t => (t._1, t._2.size))

// 【扁平化处理】:TraversableOnce : Trait用于遍历和处理集合类型元素,类似于java:Iterable
val rdd2: RDD[U] = rdd.flatMap(f:T=>TraversableOnce[U]) //经典案例为:WordCount

/* 【✔ 分区内逐行处理】:以分区为单位(分区不变)逐行处理数据
	map VS mapPartitions
	1、数量:前者一进一出IO数量一致,后者多进多出IO数量不一定一致
	2、性能:前者多分区逐条处理,后者各分区并行逐条处理更佳,常时间占用内存易导致OOM,内存小不推荐
	3、类型:两者入口和出口类型都必须一致,后者进出都必须是迭代器
	
	// 推荐
	mapParitions( 
		≈ 子查询
		it.filter(...) <=> 谓词下缀
	)
	
	// 不推荐
	mapParitions(...)
	fielter(...)   <=> where
*/
val rdd2: RDD[U] = rdd.mapPartitions(f:Iterator[T]=>Iterator[U][,preservePar:Boolean])
// 【分区内逐行处理】:以分区为单位(分区不变)逐行处理数据,并追加分区编号
val rdd2: RDD[U] = rdd.mapPartitionsWithIndex(f:(Int,Iterator[T])=>Iterator[U][,preservePar:Boolean])
val rddCust: RDD[Customer] = rddLine.mapPartitions(_.map(_.toCustomer))

// 【转内存数组】:同分区的数据转为同类型的内存数组,分区不变 rdd:RDD[T]
val rdd2: RDD[Array[T]] = rdd.glom();
val rddCustGlom: RDD[Array[Customer]] = rddCust.glom()

// 【× 数据过滤】:过滤规则 f:T=>Boolean,保留满足条件数据,分区不变,【数据可能倾斜skew】
val rdd2: RDD[T] = rdd.filter(f:T=>Boolean)
val rddFilter: RDD[Customer] = rddCust.filter(_.cus_state.matches("TX|CO|PR"))

// 【数据分组】:同键同组同区,同区可多组;打乱shuffle,按f:T=>K规则,分区不变,【数据可能倾斜skew】
val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K)
val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K, partioner:Partitioner)
val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K, numPartitions:Int)
val rddGroupByCity: RDD[(String, Iterable[Customer])] = rddFilter.groupBy(_.cus_state)

/* 【✔ 数据抽样】
	withReplacement:Boolean		是否有放回抽样
	fraction:Double				抽样率
	seed:Long					随机种子,默认为当前时间戳(一般缺省)
      若数据为100条
        false, 0.4 => 抽样40%的数据,40条左右
        true,  0.4 => 每条数据被抽取的概率为40%
*/
val rdd2: RDD[T] = rdd.sample(withReplacement:Boolean,fraction:Double,seed:Long)
val rddSample: RDD[Customer] = rddCust.sample(true, 0.1, 1)

// 【× 数据去重】:numPartitions:Int 设定去重后的分区数
// 采用该方法去重,数据规模比较大的情况下,数据压力比较大,因为数据需要在不同的分区间比较
// 一般采用分组的方式,将去重字段作为分组字段,在不同的分区内并行去重
val rdd2: RDD[T] = rdd.distinct()
val rdd2: RDD[T] = rdd.distinct(numPartitions:Int)(implicit order:Ording[T] = null)


/* 【数据排序】
	处理数据f:T=>K,升降序asc:Boolean,分区数numPartitions:Int
	默认排序前后分区一致,【有shuffle】,除非重新设定 numPartitions
	全局排序,多分区间交换数据,效率较低。优化见 PairRDD
	若:K为基本类型,则无需提供第二参数列表中的隐式参数 ord: Ordering[K]
	若:K为自定义类型,则必须提供第二参数
*/
val rdd2: RDD[T] = rdd.sortBy(f:T=>K,asc:Boolean,numPartitions:Int)(implicit ord: Ordering[K], ctag: ClassTag[K])
val rddSortBy: RDD[Customer] = rddCust.sortBy(c => c.cus_state, asc=true, np=3)

/*
	多个类型 RDD[T]:纵向
		交并差操作:数据类型一致,根据元素 equals 认定是否相同
			【自定义类型】:必须重写 equals 方法,因为默认等值判断 == 判断地址
		拉链操作:要求分区数和分区内的数据量一致
*/
// 【求交集】:重载可重设分区数numPartitions:Int,或定义分区规则par:Partitioner[T]
val rdd2: RDD[T] = rdd.intersection(rdd3:RDD[T])
val rdd2: RDD[T] = rdd.intersection(rdd3:RDD[T], numPartitions:Int)
val rdd2: RDD[T] = rdd.intersection(rdd3:RDD[T], par:Partitioner[T])
// 【求并集】:不去重
val rdd2: RDD[T] = rdd.union(rdd3:RDD[T])
// 【求差集】:重载可重设分区数numPartitions:Int,或定义分区规则par:Partitioner[T]
val rdd2: RDD[T] = rdd.subtract(rdd3:RDD[T])
val rdd2: RDD[T] = rdd.subtract(rdd3:RDD[T], numPartitions:Int)
val rdd2: RDD[T] = rdd.subtract(rdd3:RDD[T], par:Partitioner[T])

// 【拉链操作】
val rdd2: RDD[(T,U)] = rdd.zip(rdd3:RDD[U])
val rdd2: RDD[(T,Long)] = rdd.zipWithIndex()
val rdd2: RDD[(T,Long)] = rdd.zipWithUniqueId()
// 有三个重载:1+1,1+2,1+3
val rdd2: RDD[V]=rdd.zipPartitions(rddA:RDD[A])(f:(Iterator[T],Iterator[A])=>Iterator[V])
val rdd2: RDD[V]=rdd.zipPartitions(rddA:RDD[A],preserveParitioning:Boolean)(f:(Iterator[T],Iterator[A])=>Iterator[V])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
键值对算子:PairRDD(K,V)
// 若再分区器和现有分区器相同,则不执行分区操作
val rddPair: RDD[(String, Int)] = rddLine.mapPartitions(it => {
    it.map(line => {
        val c: Customer = line.toCustomer
        (c.cus_state, 1)
    })
})

// 按指定分区器执行再分区操作,根据 K 的自定义 Partitioner 进行,默认为 HashPartitioner
val pairRdd2: RDD[(K,V)] = pairRdd.partitionBy(p:Partitioner)

// 【按键排序】:
/*
	排序优化:
		1、自定义分区规则,实现分区内排序
		org.apache.spark.Partitioner
            HashPartitioner
            RangePargitioner
		2、自定义类型作为 K ,需要提供隐式 Ordering,才可以看到 sortByKey
		3、使用 repartitionAndSortWithinPartitions(par:Partitioner)方法			
*/
val rddCI: RDD[(Customer, Int)] = rddCust
      .mapPartitions(_.map(c=>(c, 1)))
	  .cache()

// 【方法一】
val rddCIOrder: RDD[(Customer, Int)] = rddCI
      .partitionBy(new CustPartitioner)
      .sortBy(_.cust_id)

// 【方法二】
val pairRdd2: RDD[(K,V)] = pairRdd.sortByKey(ascending:Boolean=true, numPartitions:Int)

// 【方法二.一】
implicit val orderCust:Ordering[Customer] = Ordering.by(_.cus_id)
val rddCIOrder: RDD[(Customer, Int)] = rddCI.sortByKey(true)

// 【方法二.二】
case class Customer2(
    cus_id            :Int,
    cus_fname         :String,
    cus_lname         :String,
    cus_email         :String,
    cus_password      :String,
    cus_street        :String,
    cus_city          :String,
    cus_state         :String,
    cus_zipcode       :String
)
extends Ordered[Customer2]
with Serializable {
    override def compare(that: Customer2): Int = 
    if(cus_state.compareTo(that.cus_state)==0) cus_id-that.cus_id
    else cus_state.compareTo(that.cus_state)
}

sc.textFile("D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\retail_db\\customers.csv", 4)
	.mapPartitions(_.map(line=>(line.toCustomer2,1)))
	.sortByKey(false)
	.foreach(println)

// 【方法三】:重新分区,并在当前分区内排序
val pairRdd2: RDD[(K,V)] = pairRdd.repartitionAndSortWithinPartitions(par:Partitioner)

class CustPartitioner extends Partitioner {
    // 分区数:Executor 数量
    override def numPartitions: Int = 3

    // 根据键获取分区号
    override def getPartition(key: Any): Int = key match {
        case Customer(_,_,_,_,_,_,_,state,_) => state match {
            case "Canada" => 0
            case "China" => 1
            case "United States" => 2
        }
    }
}
implicit val orderCust:Ordering[Customer] = Ordering.by(_.cus_id)
val rddCIOrder: RDD[(Customer, Int)] = rddCI
      .repartitionAndSortWithinPartitions(new CustPartitioner)

// reduceByKey + foldByKey + aggregateByKey 都调用 combineByKeyClassTag

// 【✔ 按键聚合值】: combiner和reduce的值类型相同,计算规则相同
// group by + combiner + reduce
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(f:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(f:(V,V)=>V, numPartitions:Int)
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(partitioner:Partitioner, f:(V,V)=>V)
val rddPair2: RDD[(String, Int)] = rddPair.reduceByKey(_+_)

// 【按键聚合值】: combiner和reduce的值类型相同,计算规则相同,带初值
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V)(inParOp:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V,numPartitions:Int)(inParOp:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V,partitioner:Partitioner)(inParOp:(V,V)=>V)

// 【✔ 按键分别执行分区内和分区间计算】: combiner和reduce的值类型可不同,计算规则可不同
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U,numPartitions:Int)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U,partitioner:Partitioner)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val rddPair2: RDD[(String, Float)] = rddPair.aggregateByKey(0.0f)(_+_,_+_)

// 【按键分别执行分区内和分区间计算】: combiner和reduce的值类型可不同,计算规则可不同
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U,numPartitions:Int)
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U,partitioner:Partitioner,mapSideCombine:Boolean,serializer:Serializer)

// 【✔ 按键分组】
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey()
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey(numPartitions:Int)
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey(partitioner:Partitioner)

// 【多数据集分组】:1VN 同键同组,不同RDD值进入TupleN的不同Iterable
-------------------------------------------------------------------------------
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.groupWith(otherA: RDD[(K,V1)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1],Iterable[V2])] = pairRdd.groupWith(otherA: RDD[(K,V1)],otherB: RDD[(K,V2)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1],Iterable[V2],Iterable[V3])] = pairRdd.groupWith(otherA: RDD[(K,V1)],otherB: RDD[(K,V2)],otherC: RDD[(K,V3)])
-------------------------------------------------------------------------------
// 重载 1+1 1+2 1+3,追加再分区操作
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)],numPartitions:Int)
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)],partitioner:Partitioner)

// 每国家有多少人?
val rddAvgByState: RDD[(String, Float)] = rddLine.mapPartitions(it => {
    it.map(line => {
        val c: Customer = line.toCustomer
        (c.cus_state, 1)
    })
})
.reduceByKey(_ + _)

// 每个国家平均每个城市有多少人?
val rddAvgByState: RDD[(String, Float)] = rddLine.mapPartitions(it => {
    it.map(line => {
        val c: Customer = line.toCustomer
        (s"${c.cus_state}_${c.cus_city}", 1)
    })
})
.reduceByKey(_ + _)
.mapPartitions(it => {
    it.map(t => {
        val ps: Array[String] = t._1.split("_")
        (ps(0), t._2)
    })
})
.groupByKey()
.mapPartitions(_.map(t=>(t._1,t._2.sum.toFloat/t._2.size)))

/*
	【关联操作】:1V1			Shuffle ?
		横向,根据键做关联
		重载:numPartitions:Int 或 partitioner:Partitioner
*/
val pairRdd: RDD[(K, (V, V1))] = pairRdd1.join(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (V, Option[V1]))] = pairRdd1.leftOuterJoin(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (Option[V]), V1)] = pairRdd1.rightOuterJoin(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (Option[V]), Option[V1])] = pairRdd1.fullOuterJoin( pairRdd3:RDD[(K,V1)])

    val rddCus: RDD[Customer] =
      sc.textFile("D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\retail_db\\customers.csv", 4)
        .mapPartitions(_.map(_.toCustomer)).cache()

    val rddOrder: RDD[(Int,Order)] =
      sc.textFile("D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\retail_db\\orders.csv", 4)
        .mapPartitions(_.map(line=>{
          val order: Order = line.toOrder
          (order.or_customer_id,order)
        }))
        .cache()

    // 用户的订单数量
    val rddCusOrd: RDD[(Int, Int)] = rddCus.mapPartitions(
      _.map(c => (c.cus_id, c))
    ).leftOuterJoin(rddOrder)
      .mapPartitions(_.map(t => (t._1, if (t._2._2.isEmpty) 0 else 1)))
      .reduceByKey(_ + _)

	// 用户订单金额
	val rddOI: RDD[(Int,Float)] =
      sc.textFile("D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\retail_db\\order_items", 4)
        .mapPartitions(_.map(line=>{
          val oi: OrderItem = line.toOrderItem
          (oi.ori_order_id,oi.ori_subtotal)
        }))
        .cache()
	
	val rddCusSum: RDD[(Int, Float)] = rddCus.mapPartitions(
      _.map(c => (c.cus_id, c))
    ).leftOuterJoin(rddOrder)
      .mapPartitions(_.map(t => (if (t._2._2.isEmpty) 0 else t._2._2.get.or_id, t._1)))
      .leftOuterJoin(rddOI)
      .mapPartitions(_.map(t => (t._2._1, if (t._2._2.isEmpty) 0 else t._2._2.get)))
      .reduceByKey(_ + _)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
行动算子:action
/* 【返回】所有元素分别在分区间和分区内执行【聚集】操作的结果
	reduce & fold 分区内和分区间执行相同操作,且类型与元素类型一致
	aggregate 分区内和分区间执行不同操作,且类型与元素类型不一致
*/
val rst:T = rdd.reduce(f:(T,T)=>T)
val rst:T = rdd.fold(init:T)(f:(T,T)=>T)
val rst:U = rdd.aggregate(init:U)(f:(U,T)=>U,f:(U,T)=>U)
// 返回包含数据集中所有元素的数组
val array:Array[T] = rdd.collect()
// 返回数据集中元素数量
val rst:Long = rdd.count()
val rst:Map[K,Long] = pairRdd.countByKey()
// 返回数据集中最大值
val rst:T = rdd.max()
// 返回数据集中最小值
val rst:T = rdd.min()
// 返回数据集中的第一个元素
val rst:T = rdd.first()
// 返回数据集中的前 num 个元素
val array:Array[T] = rdd.take(num:Int)
// 返回排序后数据集中的前 num 个元素
val array:Array[T] = rdd.takeOrdered(num:Int)(implicit ord:Ordering[T])
/* 持久化至文本文件,重载追加压缩功能
	import org.apache.hadoop.io.compress.{BZip2Codec, SnappyCodec}
	import io.airlift.compress.lzo.LzopCodec
	rdd.saveAsTextFile("out_path",classOf[BZip2Codec])
*/
rdd.saveAsTextFile(path:String)
rdd.saveAsTextFile(path:String,codec: Class[_ <: CompressionCodec])
rdd.saveAsObjectFile(path:String)
// 遍历迭代
rdd.foreach(f:T=>Unit)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
经典案例(wordCount《==》单词计数功能)

在这里插入图片描述

// 工具类的创建(SparkUtil,实现一劳永逸)
package spark_util

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

class SparkUtil {
  /**
   *  spark configuration:配置工具类
   *  appName,master,logLevel,sparkHome
   */
  private var config:SparkConf = null
  /**
   *  spark rdd:弹性分布式数据集(rdd算子)
   */
  private var sc:SparkContext = null
  /**
   *  spark sql:结构化查询(sql算子,sql语句)
   */
  private var spark:SparkSession = null

  private def check(info:String,value:Any,regex:String=".*")={
    if(null==value){
      throw new RuntimeException(s"$info 空指针异常")
    }
    if(value.isInstanceOf[String] && value.toString.size==0){
      throw new RuntimeException(s"$info 空字符串异常")
    }
    if (null!=regex && !value.toString.matches(regex)) {
      throw new RuntimeException(s"$info 不符合正则规则 $regex 异常: ${value.toString}")
    }
  }

  def build()={
    config = new SparkConf()
    this
  }
  def appName(name:String) ={
    check("SparkConfig",config)
    check("appName",name,"[a-z]\\w+")
    config.setAppName(name)
    this
  }

  /**
   * @param master
   * 1、local:单线程
   * 2、local[*]:所有可用线程
   * 3、local[N]:N条线程
   * 4、spark://host:7077: standalone
   * 5、yarn-client:yarn模式,本地化运行
   * 6、yarn-cluster:yarn模式,ApplicationMaster
   */
  def master(master:String)={
    check("SparkConfig",config)
    check("appName",master,
      "local\\[(\\*|\\d+)]|spark://[a-z][a-z0-9]*:\\d+|yarn-(client|cluster)")
    config.setMaster(master)
    this
  }

  def set(item:String,value:String)={
    check("SparkConfig",config)
    check("ConfigItemName",item)
    check(item,value)
    config.set(item,value)
  }

  def sparkContext(level:String="INFO")={
    if(null==sc){
      check("SparkConfig",config)
      check("LogLevel",level.toUpperCase, "INFO|DEBUG|WARN|ERROR|FATAL")
      sc = new SparkContext(config)
      sc.setLogLevel(level)
    }
    sc
  }

  def sparkSession(hiveSupport:Boolean=false,level:String="INFO")={
    if(null==spark){
      check("SparkConfig",config)
      check("LogLevel",level.toUpperCase, "INFO|DEBUG|WARN|ERROR|FATAL")
      // 在创建 SparkSession 之前先创建 SparkContext
      spark ={
        if(hiveSupport)
          SparkSession
          .builder()
          .config(config)
          .enableHiveSupport()
          .getOrCreate()
        else
          SparkSession
          .builder()
          .config(config)
          .getOrCreate()
      }
      sc = spark.sparkContext
    }
    spark
  }

  def close={
    if (null != spark) {
      // SparkSession 和 SparkContext 并存
      // 关闭 SparkSession 同时自动关闭 SparkContext
      spark.stop()
    }else if(null != sc){
      sc.stop()
    }
  }
}

object SparkUtil{
  def apply(): SparkUtil = new SparkUtil()
}

// wordCount 具体实现代码( SparkRDD )
// 读取一个文本文件,统计每个单词出现的次数,并将结果按单词出现次数降序排序
package spark_rdd

import org.apache.spark.{Partitioner, SparkContext}
import spark_util.SparkUtil
import spark_util.Data._

object SparkRDD {
  def main(args: Array[String]): Unit = {
  	 val sparkUtil: SparkUtil = SparkUtil()
		.build()
		.appName("spark_rdd_01")
		.master("local[4]")
		
    val sc: SparkContext = sparkUtil.sparkContext(level = "ERROR")
    
	// 从指定路径读取文本文件story.txt,并设置并行度为2。这意味着Spark将尝试使用两个分区来处理文件
	sc.textFile("D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\story.txt",2)
		// .mapPartitions(it=> { ... }):
		// 一个转换操作,mapPartitions允许你处理每个分区的数据。参数it是一个Iterator[String],表示每个分区中的行
		.mapPartitions(it=>{
        	// it.flatMap(_.trim.replaceAll("[^a-zA-Z ]+","").split("\\s+").map((_,1)):
			/**
				flatMap:将每个元素转换为多个元素
				trim:去除每行的前后空白字符
				replaceAll("[^a-zA-Z ]+",""):删除行中除了字母和空格之外的所有字符
				split("\\s+"):按空白字符分割字符串,生成单词数组
				map((_,1)):将每个单词与常量1配对,形成一个二元组
			*/
          	it.flatMap(         
	            _.trim            
					.replaceAll("[^a-zA-Z ]+","")
					.split("\\s+")
					.map((_,1))       
			).toArray				// 将 Iterator 转换为数组
          // 分区内聚合 : Combiner
			.groupBy(_._1)			// 按二元组的第一个元素进行分组
			.mapValues(_.size)		// 计算每个单词出现的次数
			.toIterator				// 奖结果转换回 Iterator
		}).groupBy(_._1,4)			// 再次按单词进行分组,这次指定了4个分区。这有助于在后续步骤中减少数据的shuffle
		// 再次使用mapPartitions来处理每个分区的数据
        .mapPartitions(it=>{
			it.map(tp2=>{
				val word: String = tp2._1
				// Reducer : 不同分区同一个单词出现的次数,将不同分区同一个单词的次数求和
				val count: Int = tp2._2   // Iterator[(String,Int)]
					.map(_._2)              // Iterator[Int]
					.sum
				(word,count)
			}).toArray
				.sortWith(_._2>_._2)  // (a:(String,Int),b:(String,Int))=>{...}	按单词出现的次数进行降序排序
				.toIterator
        }).sortBy(_._2,false)
        .saveAsTextFile("file:///D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\result\\wc")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
RDD专项练习
/*
	现有客户信息文件 customers.csv,请找出:
        客户中的前5个最大家族
        客户中的前10个最流行的名字
*/

/*
	现有客户信息文件 scores.txt,请找出:
	班级 ID 姓名 年龄 性别 科目 成绩
	需求如下:
    1. 一共有多少人参加考试?
        1.1 一共有多少个小于 20 岁的人参加考试?
        1.2 一共有多少个等于 20 岁的人参加考试?
        1.3 一共有多少个大于 20 岁的人参加考试?
    2. 一共有多个男生参加考试?
        2.1 一共有多少个女生参加考试?
    3. 12 班有多少人参加考试?
        3.1 13 班有多少人参加考试?
    4. 语文科目的平均成绩是多少?
        4.1 数学科目的平均成绩是多少?
        4.2 英语科目的平均成绩是多少?
    5. 单个人平均成绩是多少?
    6. 12 班平均成绩是多少?
        6.1 12 班男生平均总成绩是多少?
        6.2 12 班女生平均总成绩是多少?
        6.3 同理求 13 班相关成绩
    7. 全校语文成绩最高分是多少?
        7.1 12 班语文成绩最低分是多少?
        7.2 13 班数学最高成绩是多少?
    8. 总成绩大于 150 分的 12 班的女生有几个?
*/

package spark_rdd2

import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

object SparkRDD {
  // 样例类参与 RDD 运算不能写在 main 中,否则报错:序列化异常
  case class Score(classId:Int,name:String,age:Int,gender:String,subject:String,score:Int,_type:String)
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
		.setAppName("spark")
		.setMaster("local[4]")

    val sc: SparkContext = SparkContext.getOrCreate(conf)
    // 设置检查点路径
    sc.setCheckpointDir("hdfs://single01:9000/spark/checkpoint")

    val map = Map(
		"GT20"->"20岁以上",
		"EQ20"-> "20岁",
		"LT20"-> "20岁以下"
    )

//    val mapBroad: Broadcast[Map[String, String]] = sc.broadcast(map)
    val maleCount: LongAccumulator = sc.longAccumulator("maleCount")

    val path = "file:///D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\scores.txt"
    val scores: RDD[Score] = sc
		.textFile(path, 3)
		.mapPartitions(
			_.drop(1)
				.map(line=>{
					val a: Array[String] = line.split("\\s+")
					val age: Int = a(2).toInt
					val _type = age match {
						case age if (age > 20) => map.get("GT20").get
						case age if (age == 20) => map.get("EQ20").get
						case _ => map.get("LT20").get
					}
					if(a(3).equals("男")){
						maleCount.add(1)
					}
            		Score(a(0).toInt,a(1),age,a(3),a(4),a(5).toInt,_type)
          		})
		)
      .cache()

    scores.foreach(println)

    println(maleCount.value)

    /**
     *   mapPartitions接收一个函数,该函数将每个分区中的迭代器转换为另一个迭代器(出入相同,都为Iterator)
     *   groupByKey方法的作用是将具有相同键的所有值聚集在一起
     *   reduceByKey方法用于合并具有相同键的所有值
     */

    // 一共有多少人参加考试(小于20,等于20,大于20)
    val personCount: Long = scores
		.mapPartitions(_.map(_.name))
		.distinct()
		.count()
    println(s"$personCount 人考试")
    scores
		.mapPartitions(_.map(s=>(s._type,1)))
		.reduceByKey(_+_)
		.foreach(println)

    // 一共有多少男生(女生)参加考试
    val genderCount: RDD[(String, Int)] = scores
		.mapPartitions(_.map(s => (s.gender, 1)))
		.reduceByKey(_ + _)
    genderCount.checkpoint()
    genderCount.foreach(println)

    // 12班(13班)一共有多少人参加考试
    scores
		.mapPartitions(_.map(s=>((s.classId,s.name),1)))
		.groupByKey()
		.mapPartitions(_.map(t=>(t._1._1,1)))
		.reduceByKey(_+_)
		.foreach(println)

    scores
		.mapPartitions(_.map(t=>(t.classId,t.name)).toSet[(Int,String)].toIterator.map(t=>(t._1,1)))
		.reduceByKey(_+_)
		.foreach(println)

    // 语数外各科目的平均成绩
    scores
		.mapPartitions(_.map({
		case s if s.subject matches("chinese|math|english") => (s.subject,s.score)
		}))
		.groupByKey()
		.mapPartitions(_.map(t=>(t._1,t._2.sum*1.0f/t._2.size)))
		.reduceByKey(_+_)
		.foreach(println)

    // 单个人的平均成绩
    scores
		.mapPartitions(_.map(t=>(t.name,t.score)))
		.groupByKey()
		.mapPartitions(_.map(t=>(t._1,t._2.sum*1.0f/t._1.size)))
		.reduceByKey(_+_)
		.foreach(println)

    // 12班(13)平均总成绩
    scores
		.mapPartitions(_.map(t=>(t.classId,t.score)))
		.groupByKey()
		.mapPartitions(_.map(t=>(t._1,t._2.sum*1.0f/t._2.size)))
		.reduceByKey(_+_)
		.foreach(println)

    // 12班(13班)男生(女生)平均总成绩
    scores
		.mapPartitions(_.map(t=>((t.classId,t.gender),t.score)))
		.groupByKey()
		.mapPartitions(_.map(t=>(t._1,t._2.sum*1.0f/t._2.size)))
		.reduceByKey(_+_)
		.foreach(println)

    // 全校语文成绩最高分
    val maxScore: Int = scores
		.mapPartitions(_.filter(_.subject.equals("chinese")).map(_.score))
		.max()
    println(s"全校语文成绩最高分为 $maxScore")

    // 12 班语文成绩最高分
    val chineseScore: RDD[(Int, Int)] = scores
		.mapPartitions(_.filter(_.subject == "chinese").map(t => (t.classId, t.score)))
    val chineseMaxScore: Int = chineseScore.mapPartitions(_.filter(_._1 == 12).map(_._2)).max()
    println(s"12 班语文成绩最高分为 $chineseMaxScore")

    // 13 班数学成绩最高分
    val mathScore: RDD[(Int, Int)] = scores
		.mapPartitions(_.filter(_.subject == "math").map(t => (t.classId, t.score)))
    val mathMaxScore: Int = mathScore.mapPartitions(_.filter(_._1 == 13).map(_._2)).max()
    println(s"13 班数学成绩最高分为 $mathMaxScore")

    // 总成绩大于 150 分 的 12 班的女生有几个
    // 方法一
    val BigScores: RDD[((Int, String), Iterable[Int])] = scores.mapPartitions(_.map(t => ((t.classId, t.gender), t.score)))
		.groupByKey()
		.mapPartitions(_.map({
			case t if t._2.sum > 150 => (t._1, t._2)
		}))
    val BigGenderScore: RDD[(Int, Iterable[Int])] = BigScores.mapPartitions(_.filter(_._1._2 == "女")).map(t => (t._1._1, t._2))

    val resultScore: Long = BigGenderScore.mapPartitions(_.filter(_._1 == 12)).map(_._2).count()
    println(s"总成绩大于 150 分的 12 班的女生有 $resultScore 个")

	val numSumScore12Gt150: Long = scores
		.filter(score => score.classId == 12 && score.gender.equals("女"))
		.mapPartitions(_.map(score => (score.name, score.score)))
		.reduceByKey(_+_)
		.filter(_._2 > 150)
		.count()

    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195

优化:optimize

org.apache.spark.util.Utils
	
/*
	shuffle性能较差:因为shuffle必须落盘,内存中等数据会OOM
	groupByKey只分组(存在Shuffle) + reduce只聚合
		<=结果同,性能不同=>
	reduceByKey先分组、预聚合、再聚合(存在Shuffle) ✔
*/

/*
【设置日志管理】
	日志级别:INFO|DEGUG|WARN|ERROR|FATAL
*/
sc.setLogLevel(logLevel:String)

/*
【设置检查点:容错,恢复】
*/
sc.setCheckpointDir(path:String)

/*
【RDD重用:检查点、缓存与持久化】
	cache      临时存储于【内存】重用,job结束后自动删除 ✔
		cache  相当于 persist(StorageLevel.MEMORY_ONLY)
	persisit   临时存储于【磁盘】重用,job结束后自动删除,涉及IO性能较差
		StorageLevel.MEMORY_ONLY
        StorageLevel.DISK_ONLY
        StorageLevel.OFF_HEAP
        StorageLevel.MEMORY_AND_DISK
        StorageLevel.MEMORY_AND_DISK_SER
        StorageLevel.MEMORY_AND_DISK_SER_2
	checkpoint 长久存储于【磁盘】重用,job结束后不会删除,涉及IO性能较差,安全且一般和cache组合使用
*/
val rddCache: RDD[T] = rdd.cache()
val rddCache: RDD[T] = rdd.persist(level:StorageLevel)
rdd.checkpoint()

/*
【广播变量】(分布式只读变量):broadcast
作用:
广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本

将数据集或配置广播到每个Executor以readonly方式存在,不会在Task之间传输

优势:
如果Executor端需要访问Driver端的某个变量,spark会向Executor端每个task都发送一个此变量的副本,
如果此变量很大,就会占用大量的Executor节点的内存,
利用广播变量,spark只会给一个Executor节点发送一个变量
*/
val bc:BroadCast[T] = sc.broadcast(value:T)
rdd.mapPartitions(itPar=>{
    val v:T = bc.value
    ...
})

/*
累加器:accumulate:只能 add 操作,常用于计数(累加器在Driver定义初始化,在Excutor端更新)
	1、定义在Driver端的一个变量,Cluster中每一个Task都会有一份Copy
	2、所有的Task都计算完成后,将所有Task中的Copy合并到驱动程序中的变量中
非累加器:在所有Task中的都会是独立Copy,不会有合并
自定义累加器:写一个类继承 AccumulatorV2[IN, OUT]
abstract class AccumulatorV2[IN, OUT] extends Serializable {
  // Returns if this accumulator is zero value or not
  def isZero: Boolean

  //  Creates a new copy of this accumulator, which is zero value
  def copyAndReset(): AccumulatorV2[IN, OUT] = {...}

  // Creates a new copy of this accumulator.
  def copy(): AccumulatorV2[IN, OUT]

  // Resets this accumulator, which is zero value.
  def reset(): Unit

  // 添加:Takes the inputs and accumulates.
  def add(v: IN): Unit

  // 合并:Merges another same-type accumulator and update its state.
  def merge(other: AccumulatorV2[IN, OUT]): Unit

  // 值列表:Defines the current value of this accumulator
  def value: OUT
}
*/
val accLong: LongAccumulator = sc.longAccumulator("longAcc")
val accDouble: DoubleAccumulator = sc.doubleAccumulator("doubleAcc")
rdd.mapPartitions(itPar=>{
    ...
    accLong.add(v:Long)
    accDouble.add(v:Double)
    ...
})
accXxx.reset()
val isZero:Boolean = accXxx.isZero
val num:Long|Double = accXxx.value|sum|count|avg

/*
【分区控制】
	【缩减分区节省资源】 或 【扩大分区提高并行度】
	 coalesce(numPartitions:Int, shuffle:Boolean):
		缩小分区
    		存在过多的小任务的时候收缩合并分区,减少分区的个数,减少任务调度成本
    		默认情况下,不会对数据重组,比如:3个合成2个,采用 {1+2},{3},容易导致数据倾斜
    		若需数据均衡,则将 shuffle 参数设置为 true 即可
    	扩大分区
    		若需要扩大分区,shuffle 参数必须设置为 true
    		若将2个分区拆分成3个,必须打乱重新分区,否则数据还是在两个分区,{1},{2},{空}
    		repartition(numPartitions:Int) <=> coalesce(numPartitions,true) 
*/
val rdd: RDD[String] = rdd.coalesce(numPartitions:Int, shuffle:Boolean)
val rdd: RDD[String] = rdd.repartition(numPartitions:Int) // ✔
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
阶段划分 DAG
/*
【为什么要划分阶段】
	1、基于数据的分区,本着传递计算的性能远高于传递数据,所以数据本地化是提升性能的重要途径之一
	2、一组串行的算子,无需 Shuffle,基于数据本地化可以作为一个独立的阶段连续执行
	3、经过一组串行算子计算,遇到 Shuffle 操作,默认情况下 Shuffle 不会改变分区数量,但会因为numPartitions:Int, partitioner:Partitioner 等参数重新分配,过程数据会【写盘供子RDD拉取(类MapReduce)】
*/

/*
Driver程序提交后
1、Spark调度器将所有的RDD看成是一个Stage
2、然后对此Stage进行逆向回溯,遇到Shuffle就断开,形成一个新的Stage
3、遇到窄依赖,则归并到同一个Stage(TaskSet)
4、等到所有的步骤回溯完成,便生成一个DAG图

RDD依赖关系
	Lineage:血统、遗传
        RDD最重要的特性之一,保存了RDD的依赖关系
        RDD实现了基于Lineage的容错机制
	依赖关系 org.apache.spark.Dependency
        窄依赖 NarrowDependency
        	1V1 OneToOneDependency
        	1VN RangeDependency
        宽依赖 ShuffleDependency
    当RDD分区丢失时
        Spark会对数据进行重新计算,对于窄依赖只需重新计算一次子RDD的父RDD分区
        若配合持久化更佳:cache,persist,checkpoint
*/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
算子宽窄依赖划分
// 窄依赖 
rdd.dependencies
	map
	flatMap
	mapPartitions ✔
	mapPartitionsWithIndex
	glom
	filter
	distinct
	intersection  ✔
	sample ✔
	union ✔
	subtract ✔
	zip...
	cogroup
// 宽依赖
ShuffledRDD extends RDD
	sortBy
	sortByKey
	partitionBy
	repartition

// 不一定
/*
reduceByKey(【partitioner: Partitioner】, func: (V, V) => V)
	若使用的是带 partitioner 的重载且 Partitioner 和父RDD的 Partitioner一致
	则为窄依赖RDD,否则为宽依赖ShuffledRDD
*/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/971753
推荐阅读
相关标签
  

闽ICP备14008679号