赞
踩
Spark中做两个大hive表的join操作,先读取过来处理成两个数据量很大的RDD,如果两个RDD直接进行join操作,势必会造成shuffle等导致运行非常缓慢,那么怎么优化呢?方法如下:
首先,对每个大hive表生成RDD进行优化
1. 对RDD进行repartition重分区
先依据Executor数和内存情况估算出对RDD分为多少个partition比较合适,因为一个partition对应一个task,会分发给一个Executor的core去执行运算操作。
所以首先对RDD进行重分区,将数据打散。
2. 采用mapPartition算子,一次性处理一个分区的数据
在这一步进行group by操作,将相同key的value值进行加和
3. reduceByKey算子,对不同的key进行聚合
经过上面的重分区,以及预聚合(group by,func加和),此时reduce端的计算压力就小了很多。
然后,对两个hive表对应的<经步骤一处理后的RDD>,进行重分区操作
这一步是因为如果两个RDD的分区数一致,join的时候就不会出现一个分区的key需要关联另一边多个分区的key的情况,也就是不会出现宽依赖,即不会出现shuffle操作;
这样也可以加快join的速度。
最后,两个RDD进行join即可。
因为mapPartition算子里func的函数类型:Iterator[T] => Iterator[U],也就是需要返回一个迭代器类型
所以代码如下,需要加容器
- //先分组,解决数据倾斜问题,
- val vertexPairRddGroup1: RDD[(String, String)] = vertexPairRdd.map {
- case ((srcId, dstId), (_, _)) => (srcId, dstId)
- }.mapPartitions(iter => {
- //对每个分区进行分组,groupbyKey
- val list = iter.toList
- val data = list.groupBy(x => x._1)
- //分组后的容器
- val buffer = new ListBuffer[(String, String)]
- data.foreach(x => {
- var str = ""
- x._2.foreach(y => {
- str = str + "\001" + y._2
- })
- if (!"".equals(str)) {
- val ids = str.trim
- buffer.+=((x._1, ids))
- }
- })
- buffer.toIterator
- }).reduceByKey((V1, V2) => V1 + "\001" + V2, 5000)
by 大牛王伟大牛
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。