赞
踩
本文参考多篇有关大数据的数据倾斜的解决思路,总结作为工作参考使用,引用自 1,大数据 SQL 数据倾斜与数据膨胀的优化与经验总结 2,Hive SQL 优化(解决数据倾斜)
数据倾斜是指在分布式计算时,大量相同的key被分发到同一个reduce节点中。针对某个key值的数据量比较多,会导致该节点的任务数据量远大于其他节点的平均数据量,运行时间远高于其他节点的平均运行时间,拖累了整体SQL 执行时间。其主要原因是key值分布不均导致的Reduce处理数据不均匀。本文将从Map端优化,Reduce端优化和Join端优化三方面给出相应解决方案。
数据膨胀是指任务的输出条数/数据量级比输入条数/数据量级大很多,如100M的数据作为任务输入,最后输出1T的数据。这种情况不仅运行效率会降低,部分任务节点在运行key值量级过大时,有可能发生资源不足或失败情况。
本节主要关注于业务SQL本身引起的长时间运行或者失败,对于集群资源情况,平台故障本身暂不考虑在内。
1.首先检查输入数据量级。与其他天相比有无明显量级变化,是否因为数据量级的问题天然引起任务运行时间过长,如双11,双十二等大促节点。
2.观察执行任务拆分后各个阶段运行时间。与其他天相比有无明显量级变化;在整个执行任务中时间耗时占比情况。
3.最耗时阶段中,观察各个Task的运行情况。Task列表中,观察是否存在某几个Task实例耗时明显比平均耗时更长,是否存在某几个Task实例处理输入/输出数据量级比平均数据量级消费产出更多。
4.根据步骤3中定位代码行数,定位问题业务处理逻辑。
在数据源读取查询时,动态分区数过多可能造成小文件数过多,每个小文件至少都会作为一个块启动一个Map任务来完成。对于文件数量而言,等于 map数量 * 分区数。对于一个Map任务而言,其初始化的时间可能远远大于逻辑处理时间,因此通过调整Map参数把小文件合并成大文件进行处理,避免造成很大的资源浪费。
减少使用select * from table语句,过多选择无用列会增加数据在集群上传输的IO开销;对于数据选择,需要加上分区过滤条件进行筛选数据。
在不影响结果的情况下,尽可能将过滤条件表达式靠近数据源位置,使之提前执行。通过在map端过滤减少数据输出,降低集群IO传输,从而提升任务的性能。
在Map阶段做聚合时,使用随机分布函数distribute by rand(),控制Map端输出结果的分发,即map端如何拆分数据给reduce端(默认hash算法),打乱数据分布,至少不会在Map端发生数据倾斜。
计算公式:hashCode(key)% reduce 个数
优点:实现十分简单;
缺点:可能缓解数据倾斜,不一定有效果
Order by为全局排序,当表数据量过大时,性能可能会出现瓶颈;
Sort by为局部排序,确保Reduce任务内结果有序,全局排序不保证;
Distribute by按照指定字段进行Hash分片,把数据划分到不同的Reducer中;
CLUSTER BY:根据指定的字段进行分桶,并在桶内进行排序,可以认为cluster by是distribute by+sort by。
对于排序而言,尝试用distribute by+sort by确保reduce中结果有序,最后在全局有序。
-- 原始脚本 select * from user_pay_table where dt = '20221015' order by amt limit 500 ; -- 改进脚本 SELECT * FROM user_pay_table WHERE dt = '20221015' DISTRIBUTE BY ( CASE WHEN amt < 100 THEN 0 WHEN amt >= 100 AND age <= 2000 THEN 1 ELSE 2 END ) SORT BY amt LIMIT 500 ;
适用场景:大表 join 小表【10M】【不适用于大表 join 大表,如果广播的数据很大,可能内存溢出】
通过将需要join的小表分发至map端内存中,将Join操作提前至map端执行,避免因分发key值不均匀引发的长尾效应,复杂度从(M*N)降至(M+N),从而提高执行效率。
ODPS SQL与Hive SQL使用mapjoin,SPARK使用broadcast。
较小的 RDD 创建一个广播变量【数据压缩、高效的通信框架 Netty、BT 协议】,广播给所有的 executor 节点,然后利用 map 算子实现来进行 join 即可
有两种可能,第一种是 Join 的 2个表都是大表,且由于空值导致长尾,可将空值处理成随机值。
第二种是 Join 的 2个表都是大表,且由于热点值导致长尾,可以先将热点Key取出,对于主表数据用热点Key切分成热点数据和非热点数据两部分分别处理,最后合并。比如下述示例。
部分实例发生长尾效应,很大程度上由于null值,空值导致,使得Reduce时含有脏值的数据被分发到同一台机器中。针对这种问题SQL,首先确认包含无效值的数据源表是否可以在Map阶段直接过滤掉这些异常数据;如果后续SQL逻辑仍然需要这些数据,可以通过将空值转变成随机值,既不影响关联也可以避免聚集。
## 第一种方法,可以直接将空值转成随机数 SELECT ta.id FROM ta LEFT JOIN tb ON coalesce(ta.id , rand()) = tb.id; ## 第二种方法,对空值和非空分开处理,接着union在一起,这样Spark会起到并行写入,比分开成两个job要好。 insert overwrite table dbName.tableName SELECT t01.* FROM (SELECT * FROM dbName.tableName WHERE dt='20211113' AND businessdaydate >='20200101' AND coalesce(custcardnumber,'null') NOT IN ('null','')) t01 LEFT JOIN (SELECT erp_code, min(user_id) AS user_id FROM dbName.tableName WHERE dt='20211113' AND erp_code <> '' AND erp_code IS NOT NULL GROUP BY erp_code) t06 ON t01.custcardnumber = t06.erp_code union SELECT t01.* FROM (SELECT * FROM dbName.tableName WHERE dt='20211113' AND businessdaydate >='20200101' AND coalesce(custcardnumber,'null') IN ('null','')) t01 LEFT JOIN (SELECT erp_code, min(user_id) AS user_id FROM dbName.tableName WHERE dt='20211113' AND erp_code <> '' AND erp_code IS NOT NULL GROUP BY erp_code) t06 ON t01.custcardnumber = t06.erp_code
(1) Join 的 2个表都是大表,且由于热点值导致长尾,可以先将热点Key取出,对于主表数据用热点Key切分成热点数据和非热点数据两部分分别处理,最后合并。比如下述示例
# 1,取出热点Key:将PV大于50000的商品ID取出到临时表 insert overwrite table topk_item PARTITION (ds = '${bizdate}') select item_id from( select item_id, count(1) as cnt from dwd_tb_log_pv_di where ds = '${bizdate}' and url_type = 'ipv' and item_id is not null group by item_id ) a where cnt >= 50000; # 2. 取出非热点数据:将主表(sdwd_tb_log_pv_di)和热点key表(topk_item)外关联后通过条件b1.item_id is null,取出关联不到的数据即非热点商品的日志数据,此时需要用MAP JOIN。再用非热点数据关联商品维表,因为已经排除了热点数据,不会存在长尾。 select ... from( select * from dim_tb_itm where ds = '${bizdate}' ) a right outer join( select /* mapjoin(b1) */ b2.* from( select item_id from topk_item where ds = '${bizdate}' ) b1 right outer join( select * from dwd_tb_log_pv_di where ds = '${bizdate}' and url_type = 'ipv' ) b2 on b1.item_id = coalesce(b2.item_id, concat("tbcdm",rand()) where b1.item_id is null ) l on a.item_id = coalesce(l.item_id,concat("tbcdm", rand()); # 3. 取出热点数据:将主表(sdwd_tb_log_pv_di)和热点Key表(topk_item)内关联,此时需要用 MAP JOIN,取到热点商品的日志数据。同时,需要将商品维表(dim_tb_itm)和热点Key表(topk_item)内关联,取到热点商品的维表数据,然后将第一部分数据外关联第二部分数据,因为第二部分只有热点商品的维表,数据量比较小,可以用 MAP JOIN避免长尾。 select /* mapjoin(a) */ ... from( select /* mapjoin(b1) */ b2.* from( select item_id from topk_item where ds = '${bizdate}' )b1 join( select * from dwd_tb_log_pv_di where ds = '${bizdate}' and url_type = 'ipv' and item_id is not null ) b2 on (b1.item_id = b2.item_id) ) l left outer join( select /* mapjoin(a1) */ a2.* from( select item_id from topk_item where ds = '${bizdate}' ) a1 join( select * from dim_tb_itm where ds = '${bizdate}' ) a2 on (a1.item_id = a2.item_id) ) a on a.item_id = l.item_id; # 4. 将步骤2和步骤3的数据通过union all合并后即得到完整的日志数据,并且关联了商品的信息。
(2) 给key添加随机数强行打散数据
在 map 阶段先将 key 加上前缀或者后缀,shuffle 之后,先对打上随机数后的 key 进行局部聚合,再将各个 key 的随机数去掉后进行全局聚合,就得到了最终的结果。
在join的时候发生的数据倾斜,打散key之后,使得这些 key 分散到不同的 task 中;那么此时数据倾斜的 key 变了,如何 join 呢?于是我想到了将另外一份对应相同 key 的数据与随机前缀或者后缀作笛卡尔积,保证两个表可以 join。
但是如果出现的数据倾斜key较多的时候怎么办呢? 大表加盐,小表笛卡尔积扩容
Join关联条件有误,表Join进行笛卡尔积,造成数据量爆炸。
关注JoinKey区分度,key值区分度越低(distinct数量少),越有可能造成数据爆炸情况。如用户下的性别列,交易下的省市列等。
部分聚合操作需要将中间结果记录下来,最后再生成最终结果,这使得在select操作时,按照不同维度去重Distinct、不同维度开窗计算over Partition By可能会导致数据膨胀。针对这种业务逻辑,可以将一个SQL拆分成多个SQL分别进行处理操作。
如果以上优化执行后仍不能解决的 SQL 优化,那么还有 2个不得已而为之的技巧:
1)增加执行机器资源,有几个简单原则供借鉴:
2)阉割需求,业务的需求也有可能是不尽合理的(做好需求沟通):
不要过分给业务承诺、给老板承诺,业务或是老板的需求也很可能是不合理的。过分的接受需求,过分的消耗资源并不是一个好的现象。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。