当前位置:   article > 正文

大数据数据倾斜与数据膨胀的优化与经验总结

数据膨胀

大数据 SQL 数据倾斜与数据膨胀的优化与经验总结

本文参考多篇有关大数据的数据倾斜的解决思路,总结作为工作参考使用,引用自 1,大数据 SQL 数据倾斜与数据膨胀的优化与经验总结 2,Hive SQL 优化(解决数据倾斜)

问题描述

数据倾斜

数据倾斜是指在分布式计算时,大量相同的key被分发到同一个reduce节点中。针对某个key值的数据量比较多,会导致该节点的任务数据量远大于其他节点的平均数据量,运行时间远高于其他节点的平均运行时间,拖累了整体SQL 执行时间。其主要原因是key值分布不均导致的Reduce处理数据不均匀。本文将从Map端优化,Reduce端优化和Join端优化三方面给出相应解决方案。

数据膨胀

数据膨胀是指任务的输出条数/数据量级比输入条数/数据量级大很多,如100M的数据作为任务输入,最后输出1T的数据。这种情况不仅运行效率会降低,部分任务节点在运行key值量级过大时,有可能发生资源不足或失败情况。

排查定位篇(重要)

本节主要关注于业务SQL本身引起的长时间运行或者失败,对于集群资源情况,平台故障本身暂不考虑在内。

1.首先检查输入数据量级。与其他天相比有无明显量级变化,是否因为数据量级的问题天然引起任务运行时间过长,如双11,双十二等大促节点。

2.观察执行任务拆分后各个阶段运行时间。与其他天相比有无明显量级变化;在整个执行任务中时间耗时占比情况。

3.最耗时阶段中,观察各个Task的运行情况。Task列表中,观察是否存在某几个Task实例耗时明显比平均耗时更长,是否存在某几个Task实例处理输入/输出数据量级比平均数据量级消费产出更多。

4.根据步骤3中定位代码行数,定位问题业务处理逻辑。

优化篇

(1)数据倾斜

1,Map端优化
1.1 读取数据合并

在数据源读取查询时,动态分区数过多可能造成小文件数过多,每个小文件至少都会作为一个块启动一个Map任务来完成。对于文件数量而言,等于 map数量 * 分区数。对于一个Map任务而言,其初始化的时间可能远远大于逻辑处理时间,因此通过调整Map参数把小文件合并成大文件进行处理,避免造成很大的资源浪费。

1.2 列裁剪

减少使用select * from table语句,过多选择无用列会增加数据在集群上传输的IO开销;对于数据选择,需要加上分区过滤条件进行筛选数据。

1.3 谓词下推

在不影响结果的情况下,尽可能将过滤条件表达式靠近数据源位置,使之提前执行。通过在map端过滤减少数据输出,降低集群IO传输,从而提升任务的性能。

1.4 数据重分布

在Map阶段做聚合时,使用随机分布函数distribute by rand(),控制Map端输出结果的分发,即map端如何拆分数据给reduce端(默认hash算法),打乱数据分布,至少不会在Map端发生数据倾斜。

2,Reduce端优化
2.1 增大 reduce 并行度

计算公式:hashCode(key)% reduce 个数

优点:实现十分简单;

缺点:可能缓解数据倾斜,不一定有效果

2.3 排序优化

Order by为全局排序,当表数据量过大时,性能可能会出现瓶颈;

Sort by为局部排序,确保Reduce任务内结果有序,全局排序不保证;

Distribute by按照指定字段进行Hash分片,把数据划分到不同的Reducer中;

CLUSTER BY:根据指定的字段进行分桶,并在桶内进行排序,可以认为cluster by是distribute by+sort by。

对于排序而言,尝试用distribute by+sort by确保reduce中结果有序,最后在全局有序。

-- 原始脚本
select *
from user_pay_table
where dt = '20221015'
order by amt
limit 500
;

-- 改进脚本
SELECT  *
FROM    user_pay_table
WHERE   dt = '20221015'
DISTRIBUTE BY ( CASE
                   WHEN amt < 100                  THEN 0
                   WHEN amt >= 100 AND age <= 2000 THEN 1
                   ELSE 2
                 END )
 SORT BY amt
LIMIT 500
;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
3. Join端优化
3.1 大表join小表

适用场景:大表 join 小表【10M】【不适用于大表 join 大表,如果广播的数据很大,可能内存溢出】

通过将需要join的小表分发至map端内存中,将Join操作提前至map端执行,避免因分发key值不均匀引发的长尾效应,复杂度从(M*N)降至(M+N),从而提高执行效率。

ODPS SQL与Hive SQL使用mapjoin,SPARK使用broadcast。

较小的 RDD 创建一个广播变量【数据压缩、高效的通信框架 Netty、BT 协议】,广播给所有的 executor 节点,然后利用 map 算子实现来进行 join 即可

3.2 大表join大表

有两种可能,第一种是 Join 的 2个表都是大表,且由于空值导致长尾,可将空值处理成随机值。

第二种是 Join 的 2个表都是大表,且由于热点值导致长尾,可以先将热点Key取出,对于主表数据用热点Key切分成热点数据和非热点数据两部分分别处理,最后合并。比如下述示例。

3.2.1 关联key空值检验

部分实例发生长尾效应,很大程度上由于null值,空值导致,使得Reduce时含有脏值的数据被分发到同一台机器中。针对这种问题SQL,首先确认包含无效值的数据源表是否可以在Map阶段直接过滤掉这些异常数据;如果后续SQL逻辑仍然需要这些数据,可以通过将空值转变成随机值,既不影响关联也可以避免聚集。

## 第一种方法,可以直接将空值转成随机数
SELECT  ta.id
FROM    ta
LEFT JOIN tb
ON      coalesce(ta.id , rand()) = tb.id;

## 第二种方法,对空值和非空分开处理,接着union在一起,这样Spark会起到并行写入,比分开成两个job要好。
insert overwrite table dbName.tableName
SELECT t01.*
FROM
  (SELECT *
   FROM dbName.tableName
   WHERE dt='20211113'
     AND businessdaydate >='20200101'
     AND coalesce(custcardnumber,'null') NOT IN ('null','')) t01
LEFT JOIN
  (SELECT erp_code,
          min(user_id) AS user_id
   FROM dbName.tableName
   WHERE dt='20211113'
     AND erp_code <> ''
     AND erp_code IS NOT NULL
   GROUP BY erp_code) t06 ON t01.custcardnumber = t06.erp_code
   
union

SELECT t01.*
FROM
  (SELECT *
   FROM dbName.tableName
   WHERE dt='20211113'
     AND businessdaydate >='20200101'
     AND coalesce(custcardnumber,'null') IN ('null','')) t01
LEFT JOIN
  (SELECT erp_code,
          min(user_id) AS user_id
   FROM dbName.tableName
   WHERE dt='20211113'
     AND erp_code <> ''
     AND erp_code IS NOT NULL
   GROUP BY erp_code) t06 ON t01.custcardnumber = t06.erp_code

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
3.2.2 长尾效应由热点数据导致

(1) Join 的 2个表都是大表,且由于热点值导致长尾,可以先将热点Key取出,对于主表数据用热点Key切分成热点数据和非热点数据两部分分别处理,最后合并。比如下述示例

# 1,取出热点Key:将PV大于50000的商品ID取出到临时表
insert   overwrite table topk_item PARTITION (ds = '${bizdate}'select   item_id
from(
         select   item_id, count(1) as cnt
         from     dwd_tb_log_pv_di
         where    ds = '${bizdate}'
         and      url_type = 'ipv'
         and      item_id is not null
         group by item_id
) a
where    cnt >= 50000;
# 2. 取出非热点数据:将主表(sdwd_tb_log_pv_di)和热点key表(topk_item)外关联后通过条件b1.item_id is null,取出关联不到的数据即非热点商品的日志数据,此时需要用MAP JOIN。再用非热点数据关联商品维表,因为已经排除了热点数据,不会存在长尾。
select   ...
from(
         select   *
         from     dim_tb_itm
         where    ds = '${bizdate}'
) a
right outer join(
         select   /*  mapjoin(b1) */
                  b2.*
         from(
                  select   item_id
                  from     topk_item
                  where    ds = '${bizdate}'
         ) b1
         right outer join(
                  select   *
                  from     dwd_tb_log_pv_di
                  where    ds = '${bizdate}'
                  and      url_type = 'ipv'
         ) b2 on b1.item_id = coalesce(b2.item_id, concat("tbcdm",rand())
         where    b1.item_id is null
) l on a.item_id = coalesce(l.item_id,concat("tbcdm", rand());

# 3. 取出热点数据:将主表(sdwd_tb_log_pv_di)和热点Key表(topk_item)内关联,此时需要用 MAP JOIN,取到热点商品的日志数据。同时,需要将商品维表(dim_tb_itm)和热点Key表(topk_item)内关联,取到热点商品的维表数据,然后将第一部分数据外关联第二部分数据,因为第二部分只有热点商品的维表,数据量比较小,可以用 MAP JOIN避免长尾。
                            
select   /*  mapjoin(a) */
         ...
from(
         select   /*  mapjoin(b1) */
                  b2.*
         from(
                  select   item_id
                  from     topk_item
                  where    ds = '${bizdate}'
         )b1
         join(
                  select   *
                  from     dwd_tb_log_pv_di
                  where    ds = '${bizdate}'
                  and      url_type = 'ipv'
                  and      item_id is not null
         ) b2 on       (b1.item_id = b2.item_id)
) l
left outer join(
         select   /*  mapjoin(a1) */
                  a2.*
         from(
                  select   item_id
                  from     topk_item
                  where    ds = '${bizdate}'
         ) a1
         join(
                  select   *
                  from     dim_tb_itm
                  where    ds = '${bizdate}'
         ) a2 on       (a1.item_id = a2.item_id)
) a on a.item_id = l.item_id;
                            
# 4. 将步骤2和步骤3的数据通过union all合并后即得到完整的日志数据,并且关联了商品的信息。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

(2) 给key添加随机数强行打散数据

在 map 阶段先将 key 加上前缀或者后缀,shuffle 之后,先对打上随机数后的 key 进行局部聚合,再将各个 key 的随机数去掉后进行全局聚合,就得到了最终的结果。

在join的时候发生的数据倾斜,打散key之后,使得这些 key 分散到不同的 task 中;那么此时数据倾斜的 key 变了,如何 join 呢?于是我想到了将另外一份对应相同 key 的数据与随机前缀或者后缀作笛卡尔积,保证两个表可以 join。

但是如果出现的数据倾斜key较多的时候怎么办呢? 大表加盐,小表笛卡尔积扩容

(2)数据膨胀

1. 避免笛卡尔积

Join关联条件有误,表Join进行笛卡尔积,造成数据量爆炸。

2. 关联key区分度校验

关注JoinKey区分度,key值区分度越低(distinct数量少),越有可能造成数据爆炸情况。如用户下的性别列,交易下的省市列等。

3. 聚合操作误用

部分聚合操作需要将中间结果记录下来,最后再生成最终结果,这使得在select操作时,按照不同维度去重Distinct、不同维度开窗计算over Partition By可能会导致数据膨胀。针对这种业务逻辑,可以将一个SQL拆分成多个SQL分别进行处理操作。

如果以上优化执行后仍不能解决的 SQL 优化,那么还有 2个不得已而为之的技巧:

1)增加执行机器资源,有几个简单原则供借鉴:

  1. 增加机器资源时,优先 instance 个数:在没有出现数据倾斜的情况下,如果通过设置Cpu参数(含Memory参数)和设置Instance个数两种方式都能调优的话,最好是先设置Instance个数。因为如果Cpu/Memory参数设置不合理,执行任务的机器满足不了参数的要求,要重新找机器的,这样反而会影响效率。
  2. 执行日志中出现Dump,最好是Instance个数/Memory都增大一下:如何选择合适的参数个数?用二分法寻找最合适instance 个数,如果一个instance处理的数据量降到了1亿以下,或者instance的执行时间小于15-20Min,那么就说明当前的资源设置已经比较恰当了。
  3. 默认的Reduce instance一般是Map instance 的三分之一,一般Join instance个数一般是Reduce instance的个数之和

2)阉割需求,业务的需求也有可能是不尽合理的(做好需求沟通):

不要过分给业务承诺、给老板承诺,业务或是老板的需求也很可能是不合理的。过分的接受需求,过分的消耗资源并不是一个好的现象。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/854379
推荐阅读
相关标签
  

闽ICP备14008679号