当前位置:   article > 正文

Spark数据倾斜解决方案

Spark数据倾斜解决方案

数据开发指导策略

​ 数据开发规范的第一性原则是:首先保证人效,然后才是提升机器性能

​ 这一原则强调,数据开发的首要任务并非单纯追求机器性能的极致,而是要确保人效能得到最大化提升。这是因为,无论技术如何进步,机器始终是为人服务的工具,人的智慧与创造力才是推动数据开发不断前行的核心力量。当为了提升机器性能而丧失人效就变得得不偿失,机器性能的提升固然重要,但如果没有人的有效参与和精准操作,这些性能的提升也只是空中楼阁。数据开发规范的第一性原则要求我们在追求技术进步的同时,更多的要去关注代码的语义规范,尽量保证更多的开发同学可以在更短的时间内理解代码语义,这才是最重要的原则。

​ 在深入进入数据倾斜场景优化之前,我们先讨论数据开发工作开展的基本原则是有意义的。一般来说,为了解决数据倾斜问题我们需要涉及改造代码逻辑,往往机器性能提升的代码对开发人员来说并不友好,同时也提升了后续代码维护的成本。在实际工作中我们要随时评估机器性能和人效之间的均衡点,不到必要时刻,不能抛弃人效来提高机器性能。

​ 综上所述,不谈人效、仅谈优化的工程师对于岗位来说明显是不合格的。

二 数据倾斜&问题代码定位

1 定位数据倾斜

1) 日志监控

  • 99%:key分布倾斜
  • 67%:笛卡尔积倾斜

2) Spark UI

  • 执行进度

    总任务执行进度75%~100%执行时间明显比前边长,一般可以判断为数据倾斜;

  • 当前Stage中某些task一直处于执行状态,一般可定位为数据倾斜;

3) 自动监控

​ 监控spark task执行时长,当某task执行时间明显高于其他task执行时长,基本可以定位为数据倾斜。

2 定位问题代码

​ Spartk UI 中在当前Stage执行详情页面,根据stageid可以在DAG执行图和SQL执行图共同定位问题SQL片段;

三 解决数据倾斜

1 key倾斜

1) 开启map端预聚合

参数开启:

set spark.sql.map.aggregates.enabled = true;
  • 1

​ 通过map端预聚合可以降低reduce端数据量,从而解决数据倾斜。

2) count distinct倾斜

  • 非去重计数聚合优化

    • 倾斜代码如下:

      select 
      	t1.city_id        as city_id,
      	count(1)          as order_num, -- 订单量 
      	sum(t1.order_amt) as order_amt  -- 订单金额 
      from tbl_order t1 
      group by 
      	t1.city_id 
      ;
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • 问题分析:如果业务城市分布不均,可能一线城市订单量和三四线城市订单量差异巨大,从而造成单点task数据量巨大,从而引发数据倾斜。

      select
      	tmp.city_id           as city_id,
      	sum(tmp.order_num)    as order_num, -- 订单量 
      	sum(tmp.order_amt)    as order_amt  -- 订单金额 
      from (
          select 
              t1.city_id        as city_id,
              count(1)          as order_num, -- 订单量 
              sum(t1.order_amt) as order_amt  -- 订单金额 
          from tbl_order t1 
          group by 
              t1.city_id,
              ceil(rand()+10)
      ) tmp 
      group by 
      	tmp.city_id 
      ;
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
  • count distinct优化

    • 倾斜代码如下:

      select 
      	count(distinct t1.user_id) as user_cnt 
      from tbl_visit_user_log t1 
      ;
      
      • 1
      • 2
      • 3
      • 4
    • 问题分析:这段SQL造成数据倾斜主要应为由一个reduce任务来处理去重计数造成的,我们通过改写SQL绕过由一个reduce任务去重计数即可以解决数据倾斜问题。

      select 
      	count(1) as user_cnt
      from (
          select 
              t1.user_id 
          from tbl_visit_user_log t1 
          group by 
              t1.user_id
      ) tmp 
      ;
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
  • 分组count distinct优化

    • 倾斜代码如下:

      select 
      	t1.city_id                            as city_id,
      	max(t1.city_name)                     as city_name,
      	count(distinct t1.user_id)            as user_cnt 
      from tbl_visit_user_log t1 
      group by 
      	t1.city_id
      ;
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • 问题分析:城市业务分布不均造成数据倾斜,比如北京上海业务突出,三四线城市业务一般

      select  
          t.city_id        as city_id,
          max(t.city_name) as city_name,
          sum(user_cnt)    as user_cnt
      from (
          select 
              city_id                          as city_id,
              max(city_name)                   as city_name,
              count(distinct user_id)          as user_cnt 
          from tbl_visit_user_log
          group by 
          	city_id,
          	cast(hash(user_id) % 100 as int)
      ) t
      group by 
          city_id
      ;
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17

2 Join倾斜

1) MapJoin解决大小表关联倾斜

​ MapJoin 通过将小表加载到内存中,并在 Map 端将其与大表进行关联,从而减少了数据的传输量和节点间的数据倾斜。

  • 倾斜代码

    -- reduce端Join 
    select 
    	large_table.*, 
    	small_table.*
    from large_table
    join small_table on large_table.id = small_table.id
    ;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 解决方案

    -- 使用 MapJoin 进行关联操作
    select /*+ MAPJOIN(small_table) */ 
    	large_table.*, 
    	small_table.*
    from large_table
    join small_table on large_table.id = small_table.id
    ;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

2) 拆解SQL,绕过数据倾斜

​ 通过建立临时表、提前去重、cache table等方式将复杂SQL分割成多个任务,从而解决数据倾斜。

3) 热点key分离解决数据倾斜

​ 比如热点新闻和普通新闻访问量差异巨大,我们可以考虑将热点新闻过滤出来,让普通新闻集合走Join,然后热点新闻单独处理,最后union两个结果集方式来解决Join倾斜。

4) 脚本方式切分大小表绕过倾斜

​ 通过python脚本将小表拆分成n个表,然后分别让n个表与大表进行MapJoin,最后将结果union到一起方式解决数据倾斜,这种版本一般用在原始代码执行失败的场景,可以最终解决数据倾斜。

四 结语

​ 数据倾斜场景千千万,这里列举常见的一些数据倾斜场景进行简单的描述,更多示例,后续更新。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/349278
推荐阅读
相关标签
  

闽ICP备14008679号