当前位置:   article > 正文

spark做两张大表的join操作,mapPartition和重分区算子的使用策略_spark两张千万的表join

spark两张千万的表join

Spark中做两个大hive表的join操作,先读取过来处理成两个数据量很大的RDD,如果两个RDD直接进行join操作,势必会造成shuffle等导致运行非常缓慢,那么怎么优化呢?方法如下:

首先,对每个大hive表生成RDD进行优化

1. 对RDD进行repartition重分区

先依据Executor数和内存情况估算出对RDD分为多少个partition比较合适,因为一个partition对应一个task,会分发给一个Executor的core去执行运算操作。

所以首先对RDD进行重分区,将数据打散。

2. 采用mapPartition算子,一次性处理一个分区的数据

在这一步进行group by操作,将相同key的value值进行加和

3. reduceByKey算子,对不同的key进行聚合

经过上面的重分区,以及预聚合(group by,func加和),此时reduce端的计算压力就小了很多。

然后,对两个hive表对应的<经步骤一处理后的RDD>,进行重分区操作

这一步是因为如果两个RDD的分区数一致,join的时候就不会出现一个分区的key需要关联另一边多个分区的key的情况,也就是不会出现宽依赖,即不会出现shuffle操作;

这样也可以加快join的速度。

最后,两个RDD进行join即可。

-----------------------以下为mapPartition算子的使用技巧---------------------

因为mapPartition算子里func的函数类型:Iterator[T] => Iterator[U],也就是需要返回一个迭代器类型

所以代码如下,需要加容器

  1. //先分组,解决数据倾斜问题,
  2. val vertexPairRddGroup1: RDD[(String, String)] = vertexPairRdd.map {
  3. case ((srcId, dstId), (_, _)) => (srcId, dstId)
  4. }.mapPartitions(iter => {
  5. //对每个分区进行分组,groupbyKey
  6. val list = iter.toList
  7. val data = list.groupBy(x => x._1)
  8. //分组后的容器
  9. val buffer = new ListBuffer[(String, String)]
  10. data.foreach(x => {
  11. var str = ""
  12. x._2.foreach(y => {
  13. str = str + "\001" + y._2
  14. })
  15. if (!"".equals(str)) {
  16. val ids = str.trim
  17. buffer.+=((x._1, ids))
  18. }
  19. })
  20. buffer.toIterator
  21. }).reduceByKey((V1, V2) => V1 + "\001" + V2, 5000)

by 大牛王伟大牛

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/727531
推荐阅读
相关标签
  

闽ICP备14008679号