赞
踩
spark 提供五种执行join操作的机制,分别是:
先来看看这样一条SQL语句:
select * from order,item where item.id = order.i_id
这个Join采用的是hash join算法,整个过程会经历三步:
基本流程可以参考上图,这里有两个小问题需要关注:
1.hash join性能如何?很显然,hash join基本都只扫描两表一次,可以认为o(a+b),较之最极端的笛卡尔集运算a*b,不知甩了多少条街;
2.为什么Build Table选择小表?道理很简单,因为构建的Hash Table最好能全部加载在内存,效率最高;这也决定了hash join算法只适合至少一个小表的join场景,对于两个大表的join场景并不适用。
上文说过,hash join是传统数据库中的单机join算法,在分布式环境下需要经过一定的分布式改造,说到底就是尽可能利用分布式计算资源进行并行化计算,提高总体效率。hash join分布式改造一般有两种经典方案:
大家知道,在数据库的常见模型中(比如星型模型或者雪花模型),表一般分为两种:事实表和维度表。维度表一般指固定的、变动较少的表,例如联系人、物品种类等,一般数据有限。而事实表一般记录流水,比如销售清单等,通常随着时间的增长不断膨胀。
因为Join操作是对两个表中key值相同的记录进行连接,在SparkSQL中,对两个表做Join最直接的方式是先根据key分区,再在每个分区中把key值相同的记录拿出来做连接操作。但这样就不可避免地涉及到shuffle,而shuffle在Spark中是比较耗时的操作,我们应该尽可能的设计Spark应用使其避免大量的shuffle。
当维度表和事实表进行Join操作时,为了避免shuffle,我们可以将大小有限的维度表的全部数据分发到每个节点上,供事实表使用。executor存储维度表的全部数据,一定程度上牺牲了空间,换取shuffle操作大量的耗时,这在SparkSQL中称作Broadcast Join,也称之为Map端JOIN
Broadcast Join的条件有以下几个:
被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M (或者加了broadcast join的hint)
基表不能被广播,比如left outer join时,只能广播右表
缺点: 这个方案只能用于广播较小的表,否则数据的冗余传输就远大于shuffle的开销;另外,广播时需要将被广播的表现collect到driver端,当频繁有广播出现时,driver的内存压力也是蛮大的。
broadcast hash join可以分为两步:
SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数spark.sql.autoBroadcastJoinThreshold,默认为10M。
代码实现
// 不限定小表的大小
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)
import org.apache.spark.sql.SparkSession object BigRDDJoinSmallRDD { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().master("local[*]").appName("BigRDD Join SmallRDD").getOrCreate() val sc = sparkSession.sparkContext val list1 = List(("jame",23), ("wade",3), ("kobe",24)) val list2 = List(("jame", 13), ("wade",6), ("kobe",16)) val bigRDD = sc.makeRDD(list1) val smallRDD = sc.makeRDD(list2) println(bigRDD.getNumPartitions) println(smallRDD.getNumPartitions) // driver端rdd不broadcast广播smallRDD到各executor,RDD不能被broadcast,需要转换成数组array val smallRDDB= sc.broadcast(smallRDD.collect()) val joinedRDD = bigRDD.mapPartitions(partition => { val smallRDDBV = smallRDDB.value // 各个executor端的task读取广播value partition.map(element => { //println(joinUtil(element, smallRDDBV)) joinUtil(element, smallRDDBV) }) }) joinedRDD.foreach(x => println(x)) } /** * join操作:对两个rdd中的相同key的value1和value2进行聚合,即(key,value1).join(key,value2)得到(key,(value1, vlaue2)) * 如果bigRDDEle的key和smallRDD的某个key一致,那么返回(key,(value1, vlaue2)) * 该方法会在各executor的task上执行 * */ def joinUtil(bigRDDEle:(String,Int), smallRDD: Array[(String, Int)]): (String, (Int,Int)) = { var joinEle:(String, (Int, Int)) = null // 遍历数组smallRDD smallRDD.foreach(smallRDDEle => { if(smallRDDEle._1.equals(bigRDDEle._1)){ // 如果bigRDD中某个元素的key和数组smallRDD的key一致,返回join结果 joinEle = (bigRDDEle._1, (bigRDDEle._2, smallRDDEle._2)) } }) joinEle } }
当一侧的表比较小时,我们选择将其广播出去以避免shuffle,提高性能。但因为被广播的表首先被collect到driver段,然后被冗余分发到每个executor上,所以当表比较大时,采用broadcast join会对driver端和executor端造成较大的压力。
由于Spark是一个分布式的计算引擎,可以通过分区的形式将大批量的数据划分成n份较小的数据集进行并行计算。
这种思想应用到Join上便是Shuffle Hash Join了。利用key相同必然分区相同的这个原理,两个表中,key相同的行都会被shuffle到同一个分区中,SparkSQL将较大表的join分而治之,先将表划分成n个分区,再对两个表中相对应分区的数据分别进行Hash Join,这样即在一定程度上减少了driver广播一侧表的压力,也减少了executor端取整张被广播表的内存消耗,提升了计算时的稳定性
必需条件:
当两个表都非常大时,显然无论使用上述哪种都会对计算内存造成很大压力。这是因为join时两者采取的都是hash join,是将一侧的数据完全加载到内存中,使用hash code取join keys值相等的记录进行连接。
当两个表都非常大时,SparkSQL采用了一种全新的方案来对表进行Join,即Sort Merge Join。这种实现方式不用将一侧数据全部加载后再进行hash join,但需要在join前将数据排序,如下图所示:
可以看到,首先将两张表按照join keys进行了重新shuffle,保证join keys值相同的记录会被分在相应的分区。分区后对每个分区内的数据进行排序,排序后再对相应的分区内的记录进行连接。如下图
因为两个序列都是有序的,从头遍历,碰到key相同的就输出;如果不同,左边小就继续取左边,反之取右边。
可以看出,无论分区有多大,Sort Merge Join都不用把某一侧的数据全部加载到内存中,而是即用即取即丢,从而大大提升了大数据量下sql join的稳定性。
SparkSQL对两张大表join采用了全新的算法-sort-merge join,如下图所示,整个过程分为三个步骤:
shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理;
sort阶段:对单个分区节点的两表数据,分别进行排序;
merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边,见下图示意
代码实现
spark.conf.set("spark.sql.join.preferSortMergeJoin", true)
先hash到同一个分区且排好序,然后再在分区内顺序查找比对
对表的大小没有条件,不管分区多大,SortMergeJoin 都不用把一侧的数据全部加载到内存中,而是即用即丢;两个序列都有序。从头遍历,碰到 key 相同的就输出,如果不同,左边小就继续取左边,反之取右边,由于排序的特性,每次处理完一条记录后只需要从上一次结束的位置开始查找,SortMergeJoinExec执行时就能够避免大量无用的操作,提高了大数据量下sql join 的稳定性。
使用条件
笛卡尔积
用来实现cross join
如果左表有n个分区,右表有m个分区,那么笛卡尔积后的分区数是K=n * m个;并且这K个分区中,第K(i)个分区获取的左表分区为 kn=i / m,获取的右表分区为 km=i % m,然后kn和km这两个分区做笛卡尔积;由于是以分区为单位,所以不会触发shuffle。
条件:
Broadcast Nested Join将一个输入数据集广播到每个executor上,然后在各个executor上,另一个数据集的分区会和第一个数据集使用嵌套循环的方式进行Join输出结果。
Broadcast Nested Join
该方式是在没有合适的JOIN机制可供选择时,最终会选择该种join策略。优先级为:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.
有join提示(hints)的情况,按照小面的顺序
非等值连接情况
有join提示(hints),按照下面的顺序
broadcast hint:选择broadcast nested loop join.
shuffle replicate NL hint: 如果是内连接,则选择cartesian product join
没有join提示(hints),则逐个对照下面的规则
如果一张表足够小(可以被广播),则选择 broadcast nested loop join
如果是内连接,则选择cartesian product join
如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcast nested loop join
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。