当前位置:   article > 正文

hive任务优化_skewed by (key)

skewed by (key)

hive是基于大数据开发的一个用于数据仓库的工具,其主要功能是将HQL(HIVE SQL)转换成mapreduce执行。所以对hive语句的优化几乎等于对mapreduce的优化,主要在io和数据倾斜方面进行优化。本文将从语句、任务、模型三个方面简单阐述优化方法和理论

1.语句优化

1.1合并小文件

map针对每一个文件产生一个或多个map任务,如果输入小文件过多,则会产生许多map任务处理每个小文件,严重耗费了资源。通过如下设置可以对输入小文件进行合并操作

  set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 

1.2压缩选择

可以通过压缩中间文件减少io消耗,提高效率

hive中存储格式和压缩格式如下:

1.2.1存储格式

  • Text File text格式,此为默认的格式。可以使用Gzip或者Bzip2压缩格式

  • SequenceFile 二进制文件格式,支持NONE/RECORD/BLOCK压缩格式

  • RCFile

  • Avro Files

  • ORC Files 

  • Parquet

  • Custom INPUTFORMAT and OUTPUTFORMAT 用户自定义文件格式

推荐orc files 和 parquet。其中orc用的较多

1.2.2压缩格式

压缩格式主要有 bzip2、gzip、lzo、snappy等

在进行shuffle中,由于进行数据传输,会产生较大的io。此时对map输出文件进行压缩,能够减小数据文件大小,降低io,提高执行效率,一般建议采用SnappyCodec压缩格式,此格式有较高的压缩比和低cpu消耗

  1. set hive.exec.compress.intermediate=true;
  2. set mapreduce.map.output.compress=true
  3. set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

1.3join优化

数据倾斜指由于数据表中某些值数据量较大时,导致某些reducer上数据量较大。在执行过程中会出现其它reducer都已完成,某些reducer还在执行且进度条一直呈现99%,严重影响了整个任务的执行效率(也称长尾),数据倾斜优化就是要解决某些值数据量较大的情况。

1.3.1 mapjoin

当大表和小表join出现数据倾斜时,可以将小表缓存至内存,在map端进行join操作,设置如下:

  1. set hive.auto.convert.join.noconditionaltask = true;
  2. set hive.auto.convert.join.noconditionaltask.size = 10000000;

我司可缓存内存默认大小为500M,最高可达2G

1.3.2 空值导致长尾

对空值进行随机处理

  1. select ...
  2. from (select *
  3. from tbcdm.dim_tb_itm
  4. where ds = '${bizdate}'
  5. )t
  6. left join (select *
  7. from tbods.s_standard_brand
  8. where ds='${bizdate}'
  9. and status=3
  10. ) t1
  11. on coalesce(t.org_brand_id,rand() * 9999 ) = t1.value_id; // 此处进行了随机处理,请确保随机后的数不和 value_id 碰撞

1.3.3 热点值导致长尾(手动处理)

如果是因为热点值导致长尾,并且JOIN的输入比较大无法用MAP JOIN,可以先将热点Key取出,对于主表数据用热点Key切分成热点数据和非热点数据两部分分别处理,最后合并。以淘宝的PV日志表关联商品维表取商品属性为例:

  1. 取出热点Key:将PV大于50000的商品ID取出到临时表。
    1. insert overwrite table topk_item partition (ds = '${bizdate}'
    2. select item_id
    3. , count(1) as cnt
    4. from dwd_tb_log_pv_di
    5. where ds = '${bizdate}'
    6. and url_type = 'ipv'
    7. and item_id is not null
    8. group by item_id
    9. having cnt >= cnt >= 50000;
  2. 取出非热点数据。将主表(sdwd_tb_log_pv_di)和热点key表(topk_item)外关联后通过条件b1.item_id is null,取出关联不到的数据即非热点商品的日志数据,此时需要用map join。再用非热点数据关联商品维表,因为已经排除了热点数据,不会存在长尾。
    1. select ...
    2. from (select *
    3. from dim_tb_itm
    4. where ds = '${bizdate}'
    5. ) a
    6. right join (select /* mapjoin(b1) */
    7. b2.*
    8. from (select item_id
    9. from topk_item
    10. where ds = '${bizdate}'
    11. ) b1
    12. right join (select *
    13. from dwd_tb_log_pv_di
    14. where ds = '${bizdate}'
    15. and url_type = 'ipv'
    16. ) b2
    17. on b1.item_id = coalesce(b2.item_id,concat("tbcdm",rand())
    18. where b1.item_id is null
    19. ) l
    20. on a.item_id = coalesce(l.item_id,concat("tbcdm",rand());
  3. 取出热点数据。将主表(sdwd_tb_log_pv_di)和热点Key表(topk_item)内关联,此时需要用MAP JOIN,取到热点商品的日志数据。同时,需要将商品维表(dim_tb_itm)和热点Key表(topk_item)内关联,取到热点商品的维表数据,然后将第一部分数据外关联第二部分数据,因为第二部分只有热点商品的维表,数据量比较小,可以用MAP JOIN避免长尾。
    1. select /* mapjoin(a) */
    2. ...
    3. from
    4. (select /* mapjoin(b1) */
    5. b2.*
    6. from
    7. (select item_id
    8. from topk_item
    9. where ds = '${bizdate}'
    10. )b1
    11. join
    12. (select *
    13. from dwd_tb_log_pv_di
    14. where ds = '${bizdate}'
    15. and url_type = 'ipv'
    16. and item_id is not null
    17. ) b2
    18. on (b1.item_id = b2.item_id)
    19. ) l
    20. left outer join
    21. (select /* mapjoin(a1) */
    22. a2.*
    23. from
    24. (select item_id
    25. from topk_item
    26. where ds = '${bizdate}'
    27. ) a1
    28. join
    29. (select *
    30. from dim_tb_itm
    31. where ds = '${bizdate}'
    32. ) a2
    33. on (a1.item_id = a2.item_id)
    34. ) a
    35. on a.item_id = l.item_id;
  4. 将步骤2和步骤3的数据通过union all合并后即得到完整的日志数据,并且关联了商品的信息。

注:代码来自阿里云官网。仅供参考,写的属实有点乱

1.3.4热点值导致长尾(自动处理)

对于skewjoin,平台在执行job时会自动进行如下操作

  1. 将它们存入临时的HDFS目录。其它数据正常执行
  2. 对倾斜数据开启map join操作,对非倾斜值采取普通join操作
  3. 将倾斜数据集和非倾斜数据集进行合并操作

hive.optimize.skewjoin.compiletime

如果建表语句元数据中指定了skew key,则使用set hive.optimize.skewjoin.compiletime=true开启skew join。

可以通过如下建表语句指定skewed key:

  1. create table list_bucket_single (key string, value string)
  2. skewed by (key) on (1,5,6) [stored as directories];

hive.optimize.skewjoin

该参数为在运行时动态指定数据进行skewjoin,一般和hive.skewjoin.key参数一起使用

  1. set hive.optimize.skewjoin=true;
  2. set hive.skewjoin.key=100000;

以上参数表示当key记录条数超过100000时采用skewjoin操作

  • 区别

    hive.optimize.skewjoin.compiletime和hive.optimize.skewjoin区别为前者为编译时参数(倾斜key事先知道),后者为运行时参数(运行时动态判断) 前者在生成执行计划时根据元数据生成skewjoin,此参数要求倾斜值一定;后者为运行过程中根据数据条数进行skewjoin优化。hive.optimize.skewjoin实际上应该重名为为hive.optimize.skewjoin.runtime参数,考虑兼容性没有进行重命名

参考文档: 文档一 文档二  参考文档  

1.3.5 大表join优化(未倾斜)

如果数据量超大,请 采用 bucket mapside join 或 sort-merge-bucket join ,具体参考  join详解

1.4group by

1.4.1map端聚合

  1. set hive.map.aggr=true;
  2. select count(*) from table2;

1.4.2热点值导致长尾(手动)

group by Key出现长尾,是因为某个Key内的计算量特别大。

对于确定的倾斜值,先均匀分布到各个reducer上,然后开启新一轮reducer进行统计操作。写法如下

  1. // 此处只是一个例,实际情况可以在map端开启聚(此处忽略)
  2.   -- 正常写法 //
  3.   select key
  4.        , count(1) as cnt
  5.     from tb_name
  6.    group  by  key;
  7.   ​
  8.   -- 改进后写法 ,假设热点key值为 key001
  9.   select a.key
  10.        , sum(cnt) as cnt
  11.    from (select key
  12.               , if(key = 'key001',random(),0)
  13.               , count(1) as cnt
  14.            from tb_name
  15.           group by key, 
  16.                    if(key = 'key001',random(),0)
  17.          ) t
  18.    group by t.key;

由上可见,这次的执行计划变成了M>R>R。虽然执行的步骤变长了,但是长尾的Key经过2个步骤的处理,整体的时间消耗可能反而有所减少

1.4.3热点值导致长尾(自动)

如果在不确定倾斜值的情况下,可以设置hive.groupby.skewindata参数

  1. set hive.groupby.skewindata=true;
  2. select key
  3.     , count(1) as cnt
  4.  from tb_name
  5. group by key;
其原理和上述写法调整中类似,是先对key值进行均匀分布,然后开启新一轮reducer求值

1.4.4 distinct 长尾

对于Distinct,把长Key进行拆分的策略已经不生效了。对这种场景,您可以考虑通过其它方式解决。

  1. --原始sql,不考虑uid为空。
  2. select count(uid) as pv
  3. , count(distinct uid) as uv
  4. from userlog;
  5. -- 改写后写法
  6. select sum(pv) as pv
  7. , count(*) as uv
  8. from (select count(*) as pv
  9. , uid
  10. from userlog
  11. group by uid
  12. )t;

参考文档

2.任务优化

一个任务中有多个语句,或者join时涉及到多张表且涉及表数据量较大,计算逻辑复杂,建议将各语句进行拆分,多表join也可以进行拆开。好处如下

  • 代码清晰,各任务各司其职

  • 有些任务可以串行执行,提供执行效率

  • 如果任务依赖于多张上游表,上游表产出时间有先后,可以将先产出表先进行计算,避免最后上游表产出后任务统一计算,大大延迟结果表产出时间(分散计算)

  • 方便观察发现瓶颈任务,针对瓶颈任务单独进行优化

3.模型优化

3.1一个栗子

在生产中需要对tb_a_df全量表和tb_b_df全量表进行关联,补齐A表中value1中信息,表数据信息如下

tb_a_df:1.2 亿条数据,存储4GB
tb_b_df:130 亿条数据,存储11TB

3.2优化

优化前

  1. -- yyyymmdd 为当天分区
  2. insert overwrite table result_table partition (ds = '${yyyymmdd}')
  3. select t.key
  4. , t.value
  5. , t1.value1
  6. from tb_a_df t
  7. left join tb_b_df t1
  8. on t.key = t1.key
  9. and t1.ds = '${yyyymmdd}'
  10. where t.ds = '${yyyymmdd}'

其中tb_b_df表数据量过大,join过程需要shuffle,极大消耗资源,以上任务运行时长为40分钟左右,且运行时长不稳定,影响数据产出时间

优化思路:

经验证发现:

tb_a_df表对应的增量表 tb_a_delta 最大更新35w条左右

tb_a_df 对应的增量表 tb_b_delta   每天更新量为9kw,100GB左右。

可以考虑将全量数据分为变动数据和非变动数据

于是我们从增量表入手将任务拆分为3个子task

3.3优化后

  1. -- taska
  2. -- 计算 tb_a 中有变动的数据
  3. -- tb_a_delta 表数据量很小,可以采用map join,极大降低了消耗
  4. insert overwrite table result_table_delta_a partition(ds = '${yyyymmdd}')
  5. select /*+ mapjoin(t) */
  6. t.key
  7. , t.value
  8. , t1.value1
  9. from tb_a_delta t -- 数据量 35w
  10. join tb_b_df t1 -- 130 亿条数据
  11. on t.key = t1.key
  12. and t1.ds = '${yyyymmdd}'
  13. where t.ds = '${yyyymmdd}'
  14. -- taskb
  15. -- 计算 tb_b 中有变动的数据,此时数据量小了很多,大大降低了shuffle的成本
  16. insert overwrite table result_table_delta_b partition(ds = '${yyyymmdd}')
  17. select t.key
  18. , t.value
  19. , t1.value1
  20. from tb_a_df t -- 数据量 1.2亿
  21. join tb_b_delta t1 -- 数据量 9kw
  22. on t.key = t1.key
  23. and t1.ds = '${yyyymmdd}'
  24. where t.ds = '${yyyymmdd}'
  25. -- taskc
  26. --
  27. with tmp as
  28. (
  29. -- a表变动数据
  30. select t.key
  31. , t.value
  32. , t.value1
  33. , 1 flg
  34. from result_table_delta_a t
  35. where t.ds = '${yyyymmdd}'
  36. union all
  37. -- b表变动数据
  38. select t.key
  39. , t.value
  40. , t.value1
  41. , 2 flg
  42. from result_table_delta_b t
  43. where t.ds = '${yyyymmdd}'
  44. ),
  45. tmp1 as
  46. (
  47. -- 去重
  48. select t.key
  49. , t.value
  50. , t.value1
  51. , row_number() over(partition by key order by flg desc) as rn -- 优先取b变动数据
  52. from tmp t
  53. )
  54. insert overwrite table result_table partition (ds = '${yyyymmdd}')
  55. -- 变动数据
  56. select key
  57. , value
  58. , value1
  59. from tmp1 t
  60. where rn = 1
  61. union all
  62. -- 非变动数据,用昨天分区来补全
  63. -- 注意,此处可以直接用昨天分区数据 left anti join tmp1 即可。由于生产中未作验证,因此此处没表述
  64. select t.key
  65. , t.value
  66. , t2.value1
  67. from tb_a_df t
  68. left anti join tmp1 t1 -- 排除变动数据
  69. on t.key = t1.key
  70. and t1.rn = 1
  71. left join tb_a_df t2
  72. on t.key = t2.key
  73. and t2.ds = '${yyyymmdd-1}' -- 昨天分区
  74. where t.ds = '${yyyymmdd}'

3.4效果&收益

效果

  • taska平均运行时长 5min

  • taskb平均运行时长 2.5min

  • taskc运行时长 7min

其中taska和taskb并行运行,待运行完成后再运行taskc,因此优化后任务时长为 5min + 7min = 12 min ,时长从 40min -> 12 min,时长降为原来的30%,也大大降低了计算资源

3.5其它方案

此处刚好 tb_a_delta 表较小,刚好可以采用 mapjoin,如果不能使用 mapjoin,需作何解?

可以采用hive中的buckted sort merge join方案,我司针对两个大表之间的join大量采用了此方案。

关于join介绍,请点击此处

4.系统级别

请参考 hive简介  9.企业级调优

5.其它拓展

5.1调整map数

5.1.1 map分割文件大小计算公式

计算公式 computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M

详情请看 mapreduce简介 -> 3.1.2 Job提交流程和切片源码 -> 2.FileInputFormat切片源码解析(input.getSplits(job))

5.1.2 hive设置

hive中调整 maxSize 最大值即可

set mapreduce.input.fileinputformat.split.maxsize=100;

5.2调整reduce数

reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:

  1. hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)
  2. hive.exec.reducers.max(每个任务最大的reduce数,默认为999)

计算reducer数的公式N=min(参数2,总输入数据量/参数1)

即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;
 

  1. select user_id
  2. , count(1)
  3. from dwd_tb_crm_trd_rfd_case_df
  4. where ds= '20201001'
  5. group by user_id;
  6. // dwd_tb_crm_trd_rfd_case_df 总大小为9G多,因此这句有10个reduce

感悟

通过这个栗子我们可以看到模型对日常设计的重要性(此处只是一个案例,还有其它案例笔者后续总结出来)。是采用增量还是全量计算,是否将核心字段和非核心字段剥离 是平时需要关注的重要点

以上优化方式为一般且常见的优化方式,对于具体问题应该进行具体分析

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/976840
推荐阅读
相关标签
  

闽ICP备14008679号