赞
踩
1、SQL 代码层面优化
Step1、阻塞调度的长耗时 SQL 提取
Step2、划分长耗时 SQL
Step3、对长耗时 SQL 逐个分析
1.1、数据倾斜优化
1.1.1、sqoop 导入数据时的倾斜:
经过分析发现,数据倾斜是因为sqoop 同步数据时,采用的是sys_time字段进行数据进行split的,客户可能在某一时间对业务数据库进行过导入,从而导致很多数据的sys_time 在同一时间。所以sqoop 导入的数据,即存在了数据倾斜
解决办法:
将sqoop split_by 的字段由sys_time,修改为id,或者其他不倾斜的字段,从而保证数据源在同步时,不会出现数据倾斜。从而保证spark 程序在scan数据时,不会出现数据倾斜。
假如还存在倾斜的话,可以考虑适当的增加 num-mappers 的数量,将数据尽可能的分散开。
如果通过修改 num-mappers 数量和修改 split_by 字段都没办法解决,可以通过自定义 sqoop 的 query sql ,生成一个严格均匀分布的字段,然后指定为分割字段,从而解决数据倾斜。
1.1.2、spark 运行时产生的数据倾斜:
2.1、增加spark job 的并行度。
2.2、大表 join 小表 时产生的数据倾斜:可以通过开启 broadcast join 通过driver 将小表进行广播出去,避免shuffle得产生,从而避免大表产生的倾斜。
其实很多场景,通过以上两种方案都可以适当的缓解或者避免掉数据倾斜,接下来,我们简单的聊一聊通过2.1和2.2 避免不了的问题。
1.1.3、过滤少数导致倾斜的key
方案适用场景:
如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。
方案实现思路:
如果判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。
方案实现原理:
将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。
1.1.4、双重聚合
方案适用场景:
对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。
方案实现思路:
这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。
方案实现原理:
将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。
1.1.5、采样倾斜key并分拆join操作
方案适用场景:
两个RDD/Hive表进行join的时候,如果数据量都比较大,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。
方案实现思路:
对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。而另外两个普通的RDD就照常join即可。最后将两次join的结果使用union算子合并起来即可,就是最终的join结果 。
1.1.6、使用随机前缀和扩容RDD进行join
方案适用场景:
如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。
方案实现思路:
该方案的实现思路基本和1.1.5类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。然后将该RDD的每条数据都打上一个n以内的随机前缀。同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。最后将两个处理后的RDD进行join即可。
1.2、复杂 sql 逻辑优化
复杂 sql 部分嵌套逻辑
从org.apache.spark.sql.execution.SparkStrategyz注释可以看到,BroadcastNestedLoopJoin是最终解决方案,当 sql 复杂度太高,spark无法优化,就会选择低效的
BroadcastNestedLoopJoin
这时候可以将sql子查询逻辑拆分为外层的join,进行优化,从而让spark 能选择高效的join方式
但是难免每层的job都会存在部分表数据量比较大,内存不够,从而导致落盘,于是我们针对一些数据量比较大的sql,再进行单独资源分配,确保每一层的job不会出现特别长时间的job
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。