赞
踩
Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter),通常streamIter为大表,buildIter为小表,我们不用担心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。
对于每条来自streamIter的记录,都要去buildIter中查找匹配的记录,所以buildIter一定要是查找性能较优的数据结构。spark提供了三种join实现:sort merge join、broadcast join以及hash join。
Join 步骤:把大表和小表按照相同的分区算法和分区数进行分区(Join 的 keys 进行分区),保证了 hash 值一样(相同key)的数据都分发到同一个分区中(分区内不排序),然后在同一个 Executor 中两张表 hash 值一样的分区就可以在本地进行 hash Join 。在进行 Join 之前,还会对小表的分区构建 Hash 桶(这就要求每个分区都不能太大),便于查找。
注意,和broadcast hash join的区别,这里并没有广播小表,在双方shuffle后的分区内,小表转成Hash桶与大表进行hash join。
苛刻的条件:
特点:
将小表的数据广播到 Spark 所有的 Executor 端,只能用于等值连接。避免了 Shuffle 操作。一般而言,Broadcast Hash Join 会比其他 Join 策略执行的要快。因为他直接在一个map中完成了,也称之为map join
Join 步骤:
使用条件:
spark.sql.autoBroadcastJoinThreshold
(default 10M)设定的值该方式是在没有合适的JOIN机制可供选择时,最终会选择该种join策略。优先级为:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.
最小的数据集被广播到另一个数据集的每个分区上,执行一个嵌套循环来执行join, 也就是说数据集1的每条记录都尝试join数据集2的每条记录(最笨的方法),效率比较低。既可以做等值join也可以做非等值join,而且是非等值join的默认策略。
没有排序,就是广播小表到每个分区上,尝试join每条记录,效率低!
先hash到同一个分区且排好序,然后再在分区内顺序查找比对
对表的大小没有条件,不管分区多大,SortMergeJoin 都不用把一侧的数据全部加载到内存中,而是即用即丢;两个序列都有序。从头遍历,碰到 key 相同的就输出,如果不同,左边小就继续取左边,反之取右边,由于排序的特性,每次处理完一条记录后只需要从上一次结束的位置开始查找,SortMergeJoinExec执行时就能够避免大量无用的操作,提高了大数据量下sql join 的稳定性。
Join 步骤:
使用条件:
笛卡尔积
如果左表有n个分区,右表有m个分区,那么笛卡尔积后的分区数是K=n * m个;并且这K个分区中,第K(i)个分区获取的左表分区为 kn=i / m,获取的右表分区为 km=i % m,然后kn和km这两个分区做笛卡尔积;由于是以分区为单位,所以不会触发shuffle;
scala> spark.conf.get("spark.sql.join.preferSortMergeJoin") res1: String = true scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold") res2: String = 10485760 scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50) data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50) scala> val df1 = data1.toDF("id1") df1: org.apache.spark.sql.DataFrame = [id1: int] scala> val data2 = Seq(30, 20, 40, 50) data2: Seq[Int] = List(30, 20, 40, 50) scala> val df2 = data2.toDF("id2") df2: org.apache.spark.sql.DataFrame = [id2: int] scala> val dfJoined = df1.join(df2, $"id1" >= $"id2") //非等值连接 dfJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int] // 注意查看执行计划是BroadcastNestedLoopJoin scala> dfJoined.queryExecution.executedPlan res3: org.apache.spark.sql.execution.SparkPlan = BroadcastNestedLoopJoin BuildRight, Inner, (id1#3 >= id2#8) :- LocalTableScan [id1#3] +- BroadcastExchange IdentityBroadcastMode +- LocalTableScan [id2#8]
scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 2)
scala> spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
scala> val dfhashJoined = df1.join(df2, $"id1" === $"id2") //等值连接
dfhashJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
scala> dfhashJoined.queryExecution.executedPlan
res7: org.apache.spark.sql.execution.SparkPlan =
ShuffledHashJoin [id1#3], [id2#8], Inner, BuildRight
:- Exchange hashpartitioning(id1#3, 200)
: +- LocalTableScan [id1#3]
+- Exchange hashpartitioning(id2#8, 200)
+- LocalTableScan [id2#8]
scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) scala> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") scala> val sortJoined = df1.join(df2, $"id1" === $"id2") sortJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int] scala> sortJoined.queryExecution.executedPlan res11: org.apache.spark.sql.execution.SparkPlan = *(3) SortMergeJoin [id1#3], [id2#8], Inner :- *(1) Sort [id1#3 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id1#3, 200) : +- LocalTableScan [id1#3] +- *(2) Sort [id2#8 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id2#8, 200) +- LocalTableScan [id2#8] scala> sortJoined.show
有三种方式
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.key = t2.key;
SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.key = t2.key;
SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。