当前位置:   article > 正文

【Spark的五种Join策略解析】_spark sql中修改join策略

spark sql中修改join策略

join基本流程

Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter),通常streamIter为大表,buildIter为小表,我们不用担心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。

img

对于每条来自streamIter的记录,都要去buildIter中查找匹配的记录,所以buildIter一定要是查找性能较优的数据结构。spark提供了三种join实现:sort merge join、broadcast join以及hash join。

五种join策略

  • Shuffle Hash Join
  • Broadcast Hash Join
  • Sort Merge Join
  • Cartesian Join
  • Broadcast Nested Loop Join

大表join小表

Shuffle Hash Join

image-20220929103146591

Join 步骤:把大表和小表按照相同的分区算法和分区数进行分区(Join 的 keys 进行分区),保证了 hash 值一样(相同key)的数据都分发到同一个分区中(分区内不排序),然后在同一个 Executor 中两张表 hash 值一样的分区就可以在本地进行 hash Join 。在进行 Join 之前,还会对小表的分区构建 Hash 桶(这就要求每个分区都不能太大),便于查找。

注意,和broadcast hash join的区别,这里并没有广播小表,在双方shuffle后的分区内,小表转成Hash桶与大表进行hash join。

苛刻的条件:

  • buildIter总体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件
  • 开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin=false
  • 每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每个分区来自buildIter的记录要能放到内存中
  • streamIter的大小是buildIter三倍以上

特点:

  • 仅支持等值连接,join key不需要排序
  • 支持除了全外连接(full outer joins)之外的所有join类型
  • 需要对小表构建Hash map,属于内存密集型的操作,如果构建Hash表的一侧数据比较大,可能会造成OOM,不适合严重倾斜的join
  • 对于FullOuter Join,需要建立双向hash表,代价太大。因此FullOuterJoin默认都是基于SortJoin来实现

Broadcast Hash Join

image-20220929103203808

将小表的数据广播到 Spark 所有的 Executor 端,只能用于等值连接。避免了 Shuffle 操作。一般而言,Broadcast Hash Join 会比其他 Join 策略执行的要快。因为他直接在一个map中完成了,也称之为map join

Join 步骤:

  • 利用 collect 算子将小表的数据从 Executor 端拉到 Driver 端
  • 在 Driver 端调用 sparkContext.broadcast 广播到所有 Executor 端
  • 在 Executor 端使用广播的数据与大表进行 Join 操作

使用条件:

  • 必须为等值连接,不要求 Join 的 keys 可排序
  • 小表大小小于 spark.sql.autoBroadcastJoinThreshold(default 10M)设定的值

Broadcast Nested Loop Join

该方式是在没有合适的JOIN机制可供选择时,最终会选择该种join策略。优先级为:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.

最小的数据集被广播到另一个数据集的每个分区上,执行一个嵌套循环来执行join, 也就是说数据集1的每条记录都尝试join数据集2的每条记录(最笨的方法),效率比较低。既可以做等值join也可以做非等值join,而且是非等值join的默认策略。

没有排序,就是广播小表到每个分区上,尝试join每条记录,效率低!

大表之间join

Sort Merge Join

image-20220929103221072

先hash到同一个分区且排好序,然后再在分区内顺序查找比对

对表的大小没有条件,不管分区多大,SortMergeJoin 都不用把一侧的数据全部加载到内存中,而是即用即丢;两个序列都有序。从头遍历,碰到 key 相同的就输出,如果不同,左边小就继续取左边,反之取右边,由于排序的特性,每次处理完一条记录后只需要从上一次结束的位置开始查找,SortMergeJoinExec执行时就能够避免大量无用的操作,提高了大数据量下sql join 的稳定性。

Join 步骤:

  • shuffle: 将两张表按照 join key 进行shuffle,保证join key值相同的记录会被分在相应的分区
  • sort: 对每个分区内的数据进行排序
  • merge: 排序后再对相应的分区内的记录进行连接

使用条件:

  • 等值连接
  • 参与 join 的 key 可排序

Cartesian Join

笛卡尔积

如果左表有n个分区,右表有m个分区,那么笛卡尔积后的分区数是K=n * m个;并且这K个分区中,第K(i)个分区获取的左表分区为 kn=i / m,获取的右表分区为 km=i % m,然后kn和km这两个分区做笛卡尔积;由于是以分区为单位,所以不会触发shuffle;

join策略选择

等值连接的情况

有join提示(hints)的情况,按照下面的顺序
  • Broadcast Hint:如果join类型支持,则选择broadcast hash join
  • Sort merge hint:如果join key是排序的,则选择 sort-merge join
  • shuffle hash hint:如果join类型支持, 选择 shuffle hash join
  • shuffle replicate NL hint: 如果是内连接,选择笛卡尔积方式
没有join提示(hints)的情况,则逐个对照下面的规则
image-20220929194248135
  • 如果join类型支持,并且其中一张表能够被广播(spark.sql.autoBroadcastJoinThreshold值,默认是10MB),则选择 broadcast hash join
  • 如果参数spark.sql.join.preferSortMergeJoin设定为false,且一张表足够小(可以构建一个hash map) ,则选择shuffle hash join
  • 如果join keys 是排序的,则选择sort-merge join
  • 如果是内连接,选择 cartesian join
  • 没有可以选择的执行策略,则最终选择broadcast nested loop join,即使可能会发生OOM

非等值连接情况

有join提示(hints),按照下面的顺序
  • broadcast hint:选择broadcast nested loop join.
  • shuffle replicate NL hint: 如果是内连接,则选择cartesian product join
没有join提示(hints),则逐个对照下面的规则
  • 如果一张表足够小(可以被广播),则选择 broadcast nested loop join
  • 如果是内连接,则选择cartesian product join
  • 如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcast nested loop join

实验

非等值连接默认是BroadcastNestedLoopJoin

scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")
res1: String = true

scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res2: String = 10485760

scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)
data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)

scala> val df1 = data1.toDF("id1")
df1: org.apache.spark.sql.DataFrame = [id1: int]

scala> val data2 = Seq(30, 20, 40, 50)
data2: Seq[Int] = List(30, 20, 40, 50)

scala> val df2 = data2.toDF("id2")
df2: org.apache.spark.sql.DataFrame = [id2: int]

scala> val dfJoined = df1.join(df2, $"id1" >= $"id2")   //非等值连接
dfJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
// 注意查看执行计划是BroadcastNestedLoopJoin
scala> dfJoined.queryExecution.executedPlan
res3: org.apache.spark.sql.execution.SparkPlan =
BroadcastNestedLoopJoin BuildRight, Inner, (id1#3 >= id2#8)
:- LocalTableScan [id1#3]
+- BroadcastExchange IdentityBroadcastMode
   +- LocalTableScan [id2#8]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
image-20220929190714574

shuffle hash join

scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 2)

scala> spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")

scala> val dfhashJoined = df1.join(df2, $"id1" === $"id2")    //等值连接
dfhashJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]

scala> dfhashJoined.queryExecution.executedPlan
res7: org.apache.spark.sql.execution.SparkPlan =
ShuffledHashJoin [id1#3], [id2#8], Inner, BuildRight
:- Exchange hashpartitioning(id1#3, 200)
:  +- LocalTableScan [id1#3]
+- Exchange hashpartitioning(id2#8, 200)
   +- LocalTableScan [id2#8]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
image-20220929191340105

sort MergeJoin

scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

scala> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

scala> val sortJoined = df1.join(df2, $"id1" === $"id2")
sortJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]

scala> sortJoined.queryExecution.executedPlan
res11: org.apache.spark.sql.execution.SparkPlan =
*(3) SortMergeJoin [id1#3], [id2#8], Inner
:- *(1) Sort [id1#3 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id1#3, 200)
:     +- LocalTableScan [id1#3]
+- *(2) Sort [id2#8 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id2#8, 200)
      +- LocalTableScan [id2#8]

scala> sortJoined.show
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
image-20220929193014333

spark3 join策略提示

  1. Broadcast HashJoin
有三种方式
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.key = t2.key;

SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.key = t2.key;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. shuffle sort merge Join
SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
  • 1
  • 2
  • 3
  • 4
  • 5
  1. shuffle Hash Join
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/274487
推荐阅读
相关标签
  

闽ICP备14008679号