当前位置:   article > 正文

Spark join执行机制_spark join 流程

spark join 流程

spark 提供五种执行join操作的机制,分别是:

  • shuffle hash join
  • broadcast hash join
  • sort merge join
  • cartesian join
  • broadcast nested loop join

Hash Join简介

先来看看这样一条SQL语句

select * from order,item where item.id = order.i_id
  • 1

这个Join采用的是hash join算法,整个过程会经历三步:

  1. 确定Build Table以及Probe Table:这个概念比较重要,Build Table使用join key构建Hash Table,而Probe Table使用join key进行探测,探测成功就可以join在一起。通常情况下,小表会作为Build Table,大表作为Probe Table。此事例中item为Build Table,order为Probe Table;
  2. 构建Hash Table:依次读取Build Table(item)的数据,对于每一行数据根据join key(item.id)进行hash,hash到对应的Bucket,生成hash table中的一条记录。数据缓存在内存中,如果内存放不下需要dump到外存;
  3. 探测:再依次扫描Probe Table(order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功之后再检查join条件(item.id = order.i_id),如果匹配成功就可以将两者join在一起。


基本流程可以参考上图,这里有两个小问题需要关注:
1.hash join性能如何?很显然,hash join基本都只扫描两表一次,可以认为o(a+b),较之最极端的笛卡尔集运算a*b,不知甩了多少条街;
2.为什么Build Table选择小表?道理很简单,因为构建的Hash Table最好能全部加载在内存,效率最高;这也决定了hash join算法只适合至少一个小表的join场景,对于两个大表的join场景并不适用。
上文说过,hash join是传统数据库中的单机join算法,在分布式环境下需要经过一定的分布式改造,说到底就是尽可能利用分布式计算资源进行并行化计算,提高总体效率。hash join分布式改造一般有两种经典方案:

  1. broadcast hash join:将其中一张小表广播分发到另一张大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。broadcast适用于小表很小,可以直接广播的场景;
  2. shuffler hash join:一旦小表数据量较大,此时就不再适合进行广播分发。这种情况下,可以根据join key相同必然分区相同的原理,将两张表分别按照join key进行重新组织分区,这样就可以将join分而治之,划分为很多小join,充分利用集群资源并行化。

Broadcast Hash Join

大家知道,在数据库的常见模型中(比如星型模型或者雪花模型),表一般分为两种:事实表和维度表。维度表一般指固定的、变动较少的表,例如联系人、物品种类等,一般数据有限。而事实表一般记录流水,比如销售清单等,通常随着时间的增长不断膨胀。
因为Join操作是对两个表中key值相同的记录进行连接,在SparkSQL中,对两个表做Join最直接的方式是先根据key分区,再在每个分区中把key值相同的记录拿出来做连接操作。但这样就不可避免地涉及到shuffle,而shuffle在Spark中是比较耗时的操作,我们应该尽可能的设计Spark应用使其避免大量的shuffle。
当维度表和事实表进行Join操作时,为了避免shuffle,我们可以将大小有限的维度表的全部数据分发到每个节点上,供事实表使用。executor存储维度表的全部数据,一定程度上牺牲了空间,换取shuffle操作大量的耗时,这在SparkSQL中称作Broadcast Join,也称之为Map端JOIN
Broadcast Join的条件有以下几个:
被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M (或者加了broadcast join的hint)
基表不能被广播,比如left outer join时,只能广播右表
缺点: 这个方案只能用于广播较小的表,否则数据的冗余传输就远大于shuffle的开销;另外,广播时需要将被广播的表现collect到driver端,当频繁有广播出现时,driver的内存压力也是蛮大的。
broadcast hash join可以分为两步:

  1. broadcast阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给driver,driver再统一分发给所有executor;要不就是基于bittorrete的p2p思路;
  2. hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探;实现分布式join操作。

SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数spark.sql.autoBroadcastJoinThreshold,默认为10M。
代码实现

// 不限定小表的大小
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)
  • 1
  • 2
import org.apache.spark.sql.SparkSession

object BigRDDJoinSmallRDD {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("local[*]").appName("BigRDD Join SmallRDD").getOrCreate()
    val sc = sparkSession.sparkContext
    val list1 = List(("jame",23), ("wade",3), ("kobe",24))
    val list2 = List(("jame", 13), ("wade",6), ("kobe",16))
    val bigRDD = sc.makeRDD(list1)
    val smallRDD = sc.makeRDD(list2)

    println(bigRDD.getNumPartitions)
    println(smallRDD.getNumPartitions)

    // driver端rdd不broadcast广播smallRDD到各executor,RDD不能被broadcast,需要转换成数组array
    val  smallRDDB= sc.broadcast(smallRDD.collect())

    val joinedRDD = bigRDD.mapPartitions(partition => {
      val smallRDDBV = smallRDDB.value  // 各个executor端的task读取广播value
      partition.map(element => {
        //println(joinUtil(element, smallRDDBV))
        joinUtil(element, smallRDDBV)
      })
    })
    joinedRDD.foreach(x => println(x))
  }

  /**
  * join操作:对两个rdd中的相同key的value1和value2进行聚合,即(key,value1).join(key,value2)得到(key,(value1, vlaue2))
  * 如果bigRDDEle的key和smallRDD的某个key一致,那么返回(key,(value1, vlaue2))
  * 该方法会在各executor的task上执行
  * */
  def joinUtil(bigRDDEle:(String,Int), smallRDD: Array[(String, Int)]): (String, (Int,Int)) = {
    var joinEle:(String, (Int, Int)) = null
    // 遍历数组smallRDD
    smallRDD.foreach(smallRDDEle => {
      if(smallRDDEle._1.equals(bigRDDEle._1)){
        // 如果bigRDD中某个元素的key和数组smallRDD的key一致,返回join结果
        joinEle = (bigRDDEle._1, (bigRDDEle._2, smallRDDEle._2))
      }
    })
    joinEle
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

Shuffle Hash Join

当一侧的表比较小时,我们选择将其广播出去以避免shuffle,提高性能。但因为被广播的表首先被collect到driver段,然后被冗余分发到每个executor上,所以当表比较大时,采用broadcast join会对driver端和executor端造成较大的压力。
由于Spark是一个分布式的计算引擎,可以通过分区的形式将大批量的数据划分成n份较小的数据集进行并行计算。
这种思想应用到Join上便是Shuffle Hash Join了。利用key相同必然分区相同的这个原理,两个表中,key相同的行都会被shuffle到同一个分区中,SparkSQL将较大表的join分而治之,先将表划分成n个分区,再对两个表中相对应分区的数据分别进行Hash Join,这样即在一定程度上减少了driver广播一侧表的压力,也减少了executor端取整张被广播表的内存消耗,提升了计算时的稳定性
必需条件:

  • 只用于等值Join
  • 不能用于Full Outer Join
  • spark.sql.join.prefersortmergeJoin 参数默认值为true,设置为false

Sort Merge Join

当两个表都非常大时,显然无论使用上述哪种都会对计算内存造成很大压力。这是因为join时两者采取的都是hash join,是将一侧的数据完全加载到内存中,使用hash code取join keys值相等的记录进行连接。
当两个表都非常大时,SparkSQL采用了一种全新的方案来对表进行Join,即Sort Merge Join。这种实现方式不用将一侧数据全部加载后再进行hash join,但需要在join前将数据排序,如下图所示:

可以看到,首先将两张表按照join keys进行了重新shuffle,保证join keys值相同的记录会被分在相应的分区。分区后对每个分区内的数据进行排序,排序后再对相应的分区内的记录进行连接。如下图

因为两个序列都是有序的,从头遍历,碰到key相同的就输出;如果不同,左边小就继续取左边,反之取右边。
可以看出,无论分区有多大,Sort Merge Join都不用把某一侧的数据全部加载到内存中,而是即用即取即丢,从而大大提升了大数据量下sql join的稳定性。
SparkSQL对两张大表join采用了全新的算法-sort-merge join,如下图所示,整个过程分为三个步骤:

shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理;
sort阶段:对单个分区节点的两表数据,分别进行排序;
merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边,见下图示意

代码实现

spark.conf.set("spark.sql.join.preferSortMergeJoin", true)
  • 1

先hash到同一个分区且排好序,然后再在分区内顺序查找比对
对表的大小没有条件,不管分区多大,SortMergeJoin 都不用把一侧的数据全部加载到内存中,而是即用即丢;两个序列都有序。从头遍历,碰到 key 相同的就输出,如果不同,左边小就继续取左边,反之取右边,由于排序的特性,每次处理完一条记录后只需要从上一次结束的位置开始查找,SortMergeJoinExec执行时就能够避免大量无用的操作,提高了大数据量下sql join 的稳定性。
使用条件

  • 等值连接
  • 参与join的key可排序
  • spark.sql.join.prefersortmergeJoin 参数默认值为true,设置为true

Cartesian Join

笛卡尔积
用来实现cross join
如果左表有n个分区,右表有m个分区,那么笛卡尔积后的分区数是K=n * m个;并且这K个分区中,第K(i)个分区获取的左表分区为 kn=i / m,获取的右表分区为 km=i % m,然后kn和km这两个分区做笛卡尔积;由于是以分区为单位,所以不会触发shuffle。
条件:

  • 仅支持内连接
  • 支持等值和不等值连接
  • 开启参数spark.sql.crossJoin.enabled=true

Broadcast Nested Loop Join

Broadcast Nested Join将一个输入数据集广播到每个executor上,然后在各个executor上,另一个数据集的分区会和第一个数据集使用嵌套循环的方式进行Join输出结果。
Broadcast Nested Join
该方式是在没有合适的JOIN机制可供选择时,最终会选择该种join策略。优先级为:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.

Join策略选择

  1. 等值连接的情况

有join提示(hints)的情况,按照小面的顺序

  • Broadcast Hint:如果join类型支持,则选择broadcast hash join
  • Sort merge hint:如果join key是排序的,则选择 sort-merge join
  • shuffle hash hint:如果join类型支持, 选择 shuffle hash join
  • shuffle replicate NL hint: 如果是内连接,选择笛卡尔积方式
没有join提示(hints)的情况,则逐个对照下面的规则

  • 如果join类型支持,并且其中一张表能够被广播(spark.sql.autoBroadcastJoinThreshold值,默认是10MB),则选择 broadcast hash join
  • 如果参数spark.sql.join.preferSortMergeJoin设定为false,且一张表足够小(可以构建一个hash map) ,则选择shuffle hash join
  • 如果join keys 是排序的,则选择sort-merge join
  • 如果是内连接,选择 cartesian join
  • 没有可以选择的执行策略,则最终选择broadcast nested loop join,即使可能会发生OOM

非等值连接情况
有join提示(hints),按照下面的顺序
broadcast hint:选择broadcast nested loop join.
shuffle replicate NL hint: 如果是内连接,则选择cartesian product join
没有join提示(hints),则逐个对照下面的规则
如果一张表足够小(可以被广播),则选择 broadcast nested loop join
如果是内连接,则选择cartesian product join
如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcast nested loop join

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/829924
推荐阅读
相关标签
  

闽ICP备14008679号