当前位置:   article > 正文

Spark相关_spark full join

spark full join

1.Spark Shuffle实现原理及代码解析
Shuffle,简而言之,就是对数据进行重新分区,其中会涉及大量的网络io和磁盘io,为什么需要shuffle,以词频统计reduceByKey过程为例,
在这里插入图片描述
shuffle过程如下图:
在这里插入图片描述
spark的shuffle操作有之前的版本和现在优化后的版本,它可以通过一个参数来调节,具体我们后面会详述,本篇主要从以下几个方面来深入Shuffle原理:

  • 普通shuffle操作的原理剖析
  • 优化后的shuffle操作原理剖析
  • shuffle源码剖析

普通shuffle操作的原理剖析
首先来看看早期没有任何优化的shuffle操作的原理,如下图是shuffle简单原理图:
在这里插入图片描述
这里我们假设每一个节点上运行着两个ShuffleMapTask,每一个ShuffleMapTask都会为每一个ResultTask创建一个bucket缓存,并且接着会将bucket缓存中的数据刷新到磁盘文件shuffleBlockFile中,ShuffleMapTask刷新到磁盘中的数据信息会封装在MapStatus中,
发送到Driver的DAGScheduler的MapOutputTracker中,而每一个ResultTask会用BlockStoreShuffleFetcher去MapOutputTracker中获取自己所需要的数据的信息,然后通过底层的BlockManager将数据拉取过来。
将数据拉取过来之后,会将这些数据组成一个RDD,即ShuffleRDD,优先存入内存当中,其次再写入磁盘中。然后每一个ResultTask针对这些RDD数据执行自己的聚合操作或者算子函数生成MapPartitionRDD。
以上就是普通Shuffle操作的执行原理,从上图我们可以发现一个问题,每一个ShuffleMapTask都需要为每一个ResultTask生成一个文件和bucket缓存,假设有100个ShuffleMapTask,100个ResultTask,那么就需要总共生成10000个文件,此时会有大量的磁盘IO操作,严重的影响shuffle的性能。
因此,在后期的新版本的Spark中,加入了优化操作,具体原理我们来看看。

优化后的shuffle操作原理剖析
优化后的Shuffle操作原理图如下:
在这里插入图片描述
这里假设我们的服务器上有两个CPU cores,运行着4个ShuffleMapTask,因此每次可以并行执行两个两个ShuffleMapTask,在之前的版本中,当前并行执行的一批ShuffleMapTask执行完毕之后执行下一批时会重新生成bucket缓存,而且在刷新到磁盘上的时候也会重新生成ShuffleBlockFile。但是在优化后的Shuffle操作中它不会重新生成缓存和磁盘文件,而是将数据写入之前的缓存和磁盘文件中,即合并了多个ShuffleMpaTask产生的文件,这也叫做consolidation机制。在多个ShuffleMapTask合并产生的文件称为一组ShuffleGroup,里面存储了多个ShuffleMapTask的数据,每个ShuffleMapTask的数据称为一个segment,此外还会通过一些索引来标识每个ShuffleMapTask在ShuffleBlockFile中的位置以及偏移量,来进行区分不同的ShuffleMapTask产生的数据。

优化参数的设置只需在SparkConf中设置即可,即设置spark.shuffle.consolidateFiles参数为true即可,可以看出来,在优化后的shuffle操作,它产生的磁盘文件是
cpu core数量*ResultTask的数量,比如这里假设了2个cpu core,有100个ResultTask,因此会产生200个磁盘文件,相比之前没有优化的Shuffle操作,减少了20倍的磁盘文件,对系统的性能有很大的提升。

对Spark Shuffle操作有以下两个特点

  • 在早期版本中,bucket缓存十分重要,因为ShuffleMapTask只有将数据写入缓存中,然后才会刷新到磁盘中,但是如果缓存过多,有可能会导致OutOfMemory,因此,在新版中,进行了优化设置,缩小了缓存的大小,默认是100KB,当超过这个阀值时,就会将数据一点点写入磁盘中。但是这样也有一个缺点,当数据过多的时候 ,会有大量的磁盘IO操作。
  • 与MpaReduce的Shuffle不一样,MapReduce它必须将所有的数据写入磁盘文件之后才会去进行Reduce操作,因为MapReduce会对数据进行排序。但是Spark不会对数据进行排序,因此不需要等待全部数据写入磁盘就ResultTask就可以拉取数据进行计算。这样,明显比MapReduce快很多,但是MapReduce可以直接在Reduce端对每一个key的value进行计算,但是Spark由于实时拉取的机制,因此只有先执行action操作,进行Shuffle操作,生成对应的MapPartitionRDD,然后去进行计算。

2.Spark中join的实现原理
spark中join有两种,一种是RDD的join,一种是sql中的join,分别来看:

在此之前,先说一下join的基本要素
如下图所示,Join大致包括三个要素:Join方式、Join条件以及过滤条件。其中过滤条件也可以通过AND语句放在Join条件中。
在这里插入图片描述

Spark支持所有类型的Join,包括:

  • inner join
  • left outer join
  • right outer join
  • full outer join
  • left semi join
  • left anti join

先说一下Spark Sql的join:
SQL的所有操作,可以分为简单操作(如过滤where、限制次数limit等)和聚合操作(groupBy,join等)。
其中,join操作是最复杂、代价最大的操作类型,是大部分业务场景的性能瓶颈所在;
首先,我们需要知道数仓中表格的分类:按照是否会经常涉及到Join操作,可以简单分为低层次表和高层次表

低层次表:直接导入数仓的表,列数少,与其他表存在外键依赖,查询起来经常会用到大量Join算法,查询效率较低

高层次表:由低层次表加工而来,使用SQL将需要join的表预先合并,形成“宽表”。宽表上查询不需要大量Join,因此效率较高。但是,相对的是,宽表的数据存在大量冗余,同时生成滞后,查询不及时。

join的基本流程
总体上来说,Join的基本实现流程如下图所示,Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter),通常streamIter为大表,buildIter为小表,我们不用担心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。
在这里插入图片描述
在实际计算时,spark会基于streamIter来遍历,每次取出streamIter中的一条记录rowA,根据Join条件计算keyA,然后根据该keyA去buildIter中查找所有满足Join条件(keyB==keyA)的记录rowBs,并将rowBs中每条记录分别与rowAjoin得到join后的记录,最后根据过滤条件得到最终join的记录。

从上述计算过程中不难发现,对于每条来自streamIter的记录,都要去buildIter中查找匹配的记录,所以buildIter一定要是查找性能较优的数据结构。spark提供了三种join实现:sort merge join、broadcast join以及hash join。

Join常见分类&实现机制
当前SparkSQL支持三种Join算法-shuffle hash join、broadcast hash join以及sort merge join。其中前两者归根到底都属于hash join,只不过在hash join之前需要先shuffle还是先broadcast。所以,首先我们来看一下内核hash join的机制。
Hash Join
先来看一个简单的SQL:select * from order,item where item.id = order.id

参与join的两张表是item和order,join key分别是item.id以及order.id,假设这个Join采用的是hash join算法,整个过程会经历三步:

  1. 确定Build Table(映射表、小表)以及Probe Table(探查表、大表)。其中Build Table用于构建Hash Table,而Probe会遍历自身所有key,映射到所生成的Hash Table上去匹配。

  2. Build Table构建Hash Table。依次读取Build Table(item)的数据,对于每一行数据根据join key(item.id)进行hash,hash到对应的Bucket,生成hash table中的一条记录。数据缓存在内存中,如果内存放不下需要dump到外存。

  3. Probe Table探测。依次扫描Probe Table(order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功之后再检查join条件(item.id= order.i_id),如果匹配成功就可以将两者join在一起。
    在这里插入图片描述

两点补充:

1 hash join的性能。从上面的原理图可以看出,hash join对两张表基本只扫描一次,算法效率是o(a+b),比起蛮力的笛卡尔积算法的a*b快了很多数量级。

2 为什么说Build Table要尽量选择小表呢?从原理上也看到了,构建的Hash Table是需要被频繁访问的,所以Hash Table最好能全部加载到内存里,这也决定了hash join只适合至少一个小表join的场景。

看完了hash join的内核,我们来看一下这种单机的算法,在大数据分布式情况下,应该如何去做。目前成熟的有两套算法:broadcast hash join和shuffler hash join。

Broadcast Hash Join
broadcast hash join是将其中一张小表广播分发到另一张大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。broadcast适用于小表很小,可以直接广播的场景。
(为了能具有相同key的记录分到同一个分区,我们通常是做shuffle,而shuffle在Spark中是比较耗时的操作,我们应该尽可能的设计Spark应用使其避免大量的shuffle。。那么如果buildIter是一个非常小的表,那么其实就没有必要大动干戈做shuffle了,直接将buildIter广播到每个计算节点,然后将buildIter放到hash表中)
在执行上,主要可以分为以下两步:

  1. broadcast阶段:将小表广播分发到大表所在的所有主机。分发方式可以有driver分发,或者采用p2p方式。

  2. hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探;

需要注意的是,Spark中对于可以广播的小表,默认限制是10M以下。(参数是spark.sql.autoBroadcastJoinThreshold)
在这里插入图片描述
Broadcast Join的条件有以下几个:
在这里插入图片描述

Shuffle Hash Join
当join的一张表很小的时候,使用broadcast hash join,无疑效率最高。但是随着小表逐渐变大,广播所需内存、带宽等资源必然就会太大,所以才会有默认10M的资源限制。

所以,当小表逐渐变大时,就需要采用另一种Hash Join来处理:Shuffle Hash Join。

Shuffle Hash Join按照join key进行分区,根据key相同必然分区相同的原理,将大表join分而治之,划分为小表的join,充分利用集群资源并行化执行。

在执行上,主要可以分为以下两步:

  1. shuffle阶段:分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点。

  2. hash join阶段:每个分区节点上的数据单独执行单机hash join算法。
    在这里插入图片描述

刚才也说过,Hash Join适合至少有一个小表的情况,那如果两个大表需要Join呢?
要让两条记录能join到一起,首先需要将具有相同key的记录在同一个分区,所以通常来说,需要做一次shuffle,map阶段根据join条件确定每条记录的key,基于该key做shuffle write,将可能join到一起的记录分到同一个分区中,这样在shuffle read阶段就可以将两个表中具有相同key的记录拉到同一个分区处理(在shuffle read阶段不对记录排序,反正来自两格表的具有相同key的记录会在同一个分区,只是在分区内不排序,将来自buildIter的记录放到hash表中,以便查找。由于Spark是一个分布式的计算引擎,可以通过分区的形式将大批量的数据划分成n份较小的数据集进行并行计算。)。
前面我们也提到,对于buildIter一定要是查找性能较优的数据结构,通常我们能想到hash表,但是对于一张较大的表来说,不可能将所有记录全部放到hash表中,SparkSQL采用了一种全新的方案来对表进行Join,即Sort Merge Join。这种实现方式不用将一侧数据全部加载后再进行hash join,但需要在join前将数据排序

Sort-Merge Join
SparkSQL对两张大表join采用了全新的算法-sort-merge join,整个过程分为三个步骤:

  1. shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理

  2. sort阶段:对单个分区节点的两表数据,分别进行排序

  3. merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则继续取更小一边的key(由于两个表都是排序的,每次处理完streamIter的一条记录后,对于streamIter的下一条记录,只需从buildIter中上一次查找结束的位置开始查找,所以说每次在buildIter中查找不必重头开始,整体上来说,查找性能还是较优的。)。
    在这里插入图片描述

仔细分析的话会发现,sort-merge join的代价并不比shuffle hash join小,反而是多了很多。那为什么SparkSQL还会在两张大表的场景下选择使用sort-merge join算法呢?

这和Spark的shuffle实现有关,目前spark的shuffle实现都适用sort-based shuffle算法,因此在经过shuffle之后partition数据都是按照key排序的。因此理论上可以认为数据经过shuffle之后是不需要sort的,可以直接merge。

join的使用结论
在这里插入图片描述

结论:如何优化
经过上文的分析,可以明确每种Join算法都有自己的适用场景。在优化的时候,除了要根据业务场景选择合适的join算法之外,还要注意以下几点:

1 数据仓库设计时最好避免大表与大表的join查询。

2 SparkSQL也可以根据内存资源、带宽资源适量将参数spark.sql.autoBroadcastJoinThreshold调大,让更多join实际执行为broadcast hash join。

3.Spark 支持的Join类型
1) inner join
inner join是一定要找到左右表中满足join条件的记录,我们在写sql语句或者使用DataFrmae时,可以不用关心哪个是左表,哪个是右表,在spark sql查询优化阶段,spark会自动将大表设为左表,即streamIter,将小表设为右表,即buildIter。这样对小表的查找相对更优。其基本实现流程如下图所示,在查找阶段,如果右表不存在满足join条件的记录,则跳过。
在这里插入图片描述

2) left outer join
left outer join是以左表为准,在右表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。我们在写sql语句或者使用DataFrmae时,一般让大表在左边,小表在右边。其基本实现流程如下图所示。
在这里插入图片描述

3) right outer join
right outer join是以右表为准,在左表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。所以说,右表是streamIter,左表是buildIter,我们在写sql语句或者使用DataFrmae时,一般让大表在右边,小表在左边。

4) full outer join
full outer join相对来说要复杂一点,总体上来看既要做left outer join,又要做right outer join,但是又不能简单地先left outer join,再right outer join,最后union得到最终结果,因为这样最终结果中就存在两份inner join的结果了。因为既然完成left outer join又要完成right outer join,所以full outer join仅采用sort merge join实现,左边和右表既要作为streamIter,又要作为buildIter,其基本实现流程如下图所示。
在这里插入图片描述

由于左表和右表已经排好序,首先分别顺序取出左表和右表中的一条记录,比较key,如果key相等,则joinrowA和rowB,并将rowA和rowB分别更新到左表和右表的下一条记录;如果keyA<keyB,则说明右表中没有与左表rowA对应的记录,那么joinrowA与nullRow,紧接着,rowA更新到左表的下一条记录;如果keyA>keyB,则说明左表中没有与右表rowB对应的记录,那么joinnullRow与rowB,紧接着,rowB更新到右表的下一条记录。如此循环遍历直到左表和右表的记录全部处理完。

5) left semi join
left semi join是以左表为准,在右表中查找匹配的记录,如果查找成功,则仅返回左边的记录,否则返回null,其基本实现流程如下图所示。
在这里插入图片描述

6) left anti join
left anti join与left semi join相反,是以左表为准,在右表中查找匹配的记录,如果查找成功,则返回null,否则仅返回左边的记录,其基本实现流程如下图所示。
在这里插入图片描述

总结
Join是数据库查询中一个非常重要的语法特性,在数据库领域可以说是“得join者的天下”,SparkSQL作为一种分布式数据仓库系统,给我们提供了全面的join支持,并在内部实现上无声无息地做了很多优化,了解join的实现将有助于我们更深刻的了解我们的应用程序的运行轨迹。

引申
union all和union的区别 怎么使用
union对两个结果集进行并集操作, 不包括重复行,相当于distinct, 同时进行默认规则百的排序;
union all只是合并查询结果,并不会进行去重和排序操作
在没有去答重的前提下,使用union all的执行效率要比union高

4.Map-side Join和Reduce-side Join
在大数据处理场景中,多表Join是非常常见的一类运算。为了便于求解,通常会将多表join问题转为多个两表连接问题。两表Join的实现算法非常多,一般我们会根据两表的数据特点选取不同的join算法,其中,最常用的两个算法是map-side join和reduce-side join。本文将介绍如何在apache spark中实现这两种算法。
Map-side Join
Map-side Join使用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。

在Hadoop MapReduce中, map-side join是借助DistributedCache实现的
(因为reduce端会按map输出的key的分布处理相应的数据,在数据倾斜的情况下就会造成单个task压力过大,拖累整个job时间,甚至OOM等诸多问题。而如果能在map端完成join,就会极大的减小reduce端的压力,提升并行度。
map端的join适用于在join的表比较小的情况,另外如字典表这种的与其他表join时,因为本身数据就很少,势必会造成数据严重的倾斜,因而这种情况下使用map端的join就再适合不过。)
DistributedCache可以帮我们将小文件分发到各个节点的Task工作目录下,这样,我们只需在程序中将文件加载到内存中(比如保存到Map数据结构中),然后借助Mapper的迭代机制,遍历另一个大表中的每一条记录,并查找是否在小表中,如果在则输出,否则跳过。

在Apache Spark中,同样存在类似于DistributedCache的功能,称为“广播变量”(Broadcast variable)。其实现原理与DistributedCache非常类似,但提供了更多的数据/文件广播算法,包括高效的P2P算法,该算法在节点数目非常多的场景下,效率远远好于DistributedCache这种基于HDFS共享存储的方式.使用MapReduce DistributedCache时,用户需要显示地使用File API编写程序从本地读取小表数据,而Spark则不用,它借助Scala语言强大的函数闭包特性,可以隐藏数据/文件广播过程,让用户编写程序更加简单。

假设两个文件,一小一大,且格式类似为:
Key,value,value
Key,value,value
则利用Spark实现map-side的算法如下:

	var table1 = sc.textFile(args(1))
    var table2 = sc.textFile(args(2))
     
    // table1 is smaller, so broadcast it as a map<String, String>
    var pairs = table1.map { x =>
      var pos = x.indexOf(',')
      (x.substring(0, pos), x.substring(pos + 1))
    }.collectAsMap
    var broadCastMap = sc.broadcast(pairs) //save table1 as map, and broadcast it
    // table2 join table1 in map side
    var result = table2.map { x =>
      var pos = x.indexOf(',')
      (x.substring(0, pos), x.substring(pos + 1))
      //mapPartitions作用是通过向这个RDD的每个分区应用一个函数来返回一个新的RDD
    }.mapPartitions({ iter =>
      var m = broadCastMap.value
      for{
        (key, value) <- iter
        if(m.contains(key))
      } yield (key, (value, m.get(key).getOrElse("")))
    })
    result.saveAsTextFile(args(3)) //save result to local or HDFS
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

Reduce-side Join
当两个文件/目录中的数据非常大,难以将某一个存放到内存中时,Reduce-side Join是一种解决思路。该算法需要通过Map和Reduce两个阶段完成,在Map阶段,将key相同的记录划分给同一个Reduce Task(需标记每条记录的来源,便于在Reduce阶段合并),在Reduce阶段,对key相同的进行合并。

Spark提供了Join算子,可以直接通过该算子实现reduce-side join,但要求RDD中的记录必须是pair,即RDD[KEY, VALUE],同样前一个例利用Reduce-side join实现如下:

var table1 = sc.textFile(args(1))
    var table2 = sc.textFile(args(2))
     
    var pairs = table1.map{x =>
      var pos = x.indexOf(',')
      (x.substring(0, pos), x.substring(pos + 1))
    }
    var result = table2.map{x =>
      var pos = x.indexOf(',')
      (x.substring(0, pos), x.substring(pos + 1))
    }.join(pairs)
    result.saveAsTextFile(args(3))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

注:在使用这两种算法处理较大规模的数据时,通常需要对多个参数进行调优,否则可能会产生OOM问题。通常需要调优的相关参数包括,map端数据输出buffer大小,reduce端数据分组方法(基于map还是基于sort),等等。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/771934
推荐阅读
相关标签
  

闽ICP备14008679号