当前位置:   article > 正文

Spark的五种JOIN策略解析_spark crossjoin函数返回结构

spark crossjoin函数返回结构

JOIN操作是非常常见的数据处理操作,Spark作为一个统一的大数据处理引擎,提供了非常丰富的JOIN场景。本文分享将介绍Spark所提供的5种JOIN策略,希望对你有所帮助。本文主要包括以下内容:

  • 影响JOIN操作的因素
  • Spark中JOIN执行的5种策略
  • Spark是如何选择JOIN策略的

影响JOIN操作的因素

数据集的大小

参与JOIN的数据集的大小会直接影响Join操作的执行效率。同样,也会影响JOIN机制的选择和JOIN的执行效率。

JOIN的条件

JOIN的条件会涉及字段之间的逻辑比较。根据JOIN的条件,JOIN可分为两大类:等值连接非等值连接。等值连接会涉及一个或多个需要同时满足的相等条件。在两个输入数据集的属性之间应用每个等值条件。当使用其他运算符(运算连接符不为**=**)时,称之为非等值连接。

JOIN的类型

在输入数据集的记录之间应用连接条件之后,JOIN类型会影响JOIN操作的结果。主要有以下几种JOIN类型:

  • 内连接(Inner Join):仅从输入数据集中输出匹配连接条件的记录。
  • 外连接(Outer Join):又分为左外连接、右外链接和全外连接。
  • 半连接(Semi Join):右表只用于过滤左表的数据而不出现在结果集中。
  • 交叉连接(Cross Join):交叉联接返回左表中的所有行,左表中的每一行与右表中的所有行组合。交叉联接也称作笛卡尔积。

Spark中JOIN执行的5种策略

Spark提供了5种JOIN机制来执行具体的JOIN操作。该5种JOIN机制如下所示:

  • Shuffle Hash Join
  • Broadcast Hash Join
  • Sort Merge Join
  • Cartesian Join
  • Broadcast Nested Loop Join

Shuffle Hash Join

简介

当要JOIN的表数据量比较大时,可以选择Shuffle Hash Join。这样可以将大表进行按照JOIN的key进行重分区,保证每个相同的JOIN key都发送到同一个分区中。如下图示:

如上图所示:Shuffle Hash Join的基本步骤主要有以下两点:

  • 首先,对于两张参与JOIN的表,分别按照join key进行重分区,该过程会涉及Shuffle,其目的是将相同join key的数据发送到同一个分区,方便分区内进行join。
  • 其次,对于每个Shuffle之后的分区,会将小表的分区数据构建成一个Hash table,然后根据join key与大表的分区数据记录进行匹配。
条件与特点
  • 仅支持等值连接,join key不需要排序
  • 支持除了全外连接(full outer joins)之外的所有join类型
  • 需要对小表构建Hash map,属于内存密集型的操作,如果构建Hash表的一侧数据比较大,可能会造成OOM
  • 将参数*spark.sql.join.prefersortmergeJoin (default true)*置为false

Broadcast Hash Join

简介

也称之为Map端JOIN。当有一张表较小时,我们通常选择Broadcast Hash Join,这样可以避免Shuffle带来的开销,从而提高性能。比如事实表与维表进行JOIN时,由于维表的数据通常会很小,所以可以使用Broadcast Hash Join将维表进行Broadcast。这样可以避免数据的Shuffle(在Spark中Shuffle操作是很耗时的),从而提高JOIN的效率。在进行 Broadcast Join 之前,Spark 需要把处于 Executor 端的数据先发送到 Driver 端,然后 Driver 端再把数据广播到 Executor 端。如果我们需要广播的数据比较多,会造成 Driver 端出现 OOM。具体如下图示:

Broadcast Hash Join主要包括两个阶段:

  • Broadcast阶段 :小表被缓存在executor中
  • Hash Join阶段:在每个 executor中执行Hash Join
条件与特点
  • 仅支持等值连接,join key不需要排序
  • 支持除了全外连接(full outer joins)之外的所有join类型
  • Broadcast Hash Join相比其他的JOIN机制而言,效率更高。但是,Broadcast Hash Join属于网络密集型的操作(数据冗余传输),除此之外,需要在Driver端缓存数据,所以当小表的数据量较大时,会出现OOM的情况
  • 被广播的小表的数据量要小于spark.sql.autoBroadcastJoinThreshold值,默认是10MB(10485760)
  • 被广播表的大小阈值不能超过8GB,spark2.4源码如下:BroadcastExchangeExec.scala
longMetric("dataSize") += dataSize
          if (dataSize >= (8L << 30)) {
   
            throw new SparkException(
              s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
          }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 基表不能被broadcast,比如左连接时,只能将右表进行广播。形如:fact_table.join(broadcast(dimension_table),可以不使用broadcast提示,当满足条件时会自动转为该JOIN方式。

Sort Merge Join

简介

该JOIN机制是Spark默认的,可以通过参数spark.sql.join.preferSortMergeJoin进行配置,默认是true,即优先使用Sort Merge Join。一般在两张大表进行JOIN时,使用该方式。Sort Merge Join可以减少集群中的数据传输,该方式不会先加载所有数据的到内存,然后进行hashjoin,但是在JOIN之前需要对join key进行排序。具体图示:

Sort Merge Join主要包括三个阶段:

  • Shuffle Phase : 两张大表根据Join key进行Shuffle重分区
  • Sort Phase: 每个分区内的数据进行排序
  • Merge Phase: 对来自不同表的排序好的分区数据进行JOIN,通过遍历元素,连接具有相同Join key值的行来合并数据集
条件与特点
  • 仅支持等值连接
  • 支持所有join类型
  • Join Keys是排序的
  • 参数**spark.sql.join.prefersortmergeJoin (默认true)**设定为true

Cartesian Join

简介

如果 Spark 中两张参与 Join 的表没指定join

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号