赞
踩
hive是基于大数据开发的一个用于数据仓库的工具,其主要功能是将HQL(HIVE SQL)转换成mapreduce执行。所以对hive语句的优化几乎等于对mapreduce的优化,主要在io和数据倾斜方面进行优化。本文将从语句、任务、模型三个方面简单阐述优化方法和理论
map针对每一个文件产生一个或多个map任务,如果输入小文件过多,则会产生许多map任务处理每个小文件,严重耗费了资源。通过如下设置可以对输入小文件进行合并操作
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
可以通过压缩中间文件减少io消耗,提高效率
hive中存储格式和压缩格式如下:
Text File text格式,此为默认的格式。可以使用Gzip或者Bzip2压缩格式
SequenceFile 二进制文件格式,支持NONE/RECORD/BLOCK压缩格式
Custom INPUTFORMAT and OUTPUTFORMAT 用户自定义文件格式
推荐orc files 和 parquet。其中orc用的较多
压缩格式主要有 bzip2、gzip、lzo、snappy等
在进行shuffle中,由于进行数据传输,会产生较大的io。此时对map输出文件进行压缩,能够减小数据文件大小,降低io,提高执行效率,一般建议采用SnappyCodec压缩格式,此格式有较高的压缩比和低cpu消耗
- set hive.exec.compress.intermediate=true;
- set mapreduce.map.output.compress=true
- set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
数据倾斜指由于数据表中某些值数据量较大时,导致某些reducer上数据量较大。在执行过程中会出现其它reducer都已完成,某些reducer还在执行且进度条一直呈现99%,严重影响了整个任务的执行效率(也称长尾),数据倾斜优化就是要解决某些值数据量较大的情况。
当大表和小表join出现数据倾斜时,可以将小表缓存至内存,在map端进行join操作,设置如下:
- set hive.auto.convert.join.noconditionaltask = true;
- set hive.auto.convert.join.noconditionaltask.size = 10000000;
我司可缓存内存默认大小为500M,最高可达2G
对空值进行随机处理
- select ...
- from (select *
- from tbcdm.dim_tb_itm
- where ds = '${bizdate}'
- )t
- left join (select *
- from tbods.s_standard_brand
- where ds='${bizdate}'
- and status=3
- ) t1
- on coalesce(t.org_brand_id,rand() * 9999 ) = t1.value_id; // 此处进行了随机处理,请确保随机后的数不和 value_id 碰撞
如果是因为热点值导致长尾,并且JOIN的输入比较大无法用MAP JOIN,可以先将热点Key取出,对于主表数据用热点Key切分成热点数据和非热点数据两部分分别处理,最后合并。以淘宝的PV日志表关联商品维表取商品属性为例:
- insert overwrite table topk_item partition (ds = '${bizdate}')
- select item_id
- , count(1) as cnt
- from dwd_tb_log_pv_di
- where ds = '${bizdate}'
- and url_type = 'ipv'
- and item_id is not null
- group by item_id
- having cnt >= cnt >= 50000;
b1.item_id is null
,取出关联不到的数据即非热点商品的日志数据,此时需要用map join。再用非热点数据关联商品维表,因为已经排除了热点数据,不会存在长尾。 - select ...
- from (select *
- from dim_tb_itm
- where ds = '${bizdate}'
- ) a
- right join (select /* mapjoin(b1) */
- b2.*
- from (select item_id
- from topk_item
- where ds = '${bizdate}'
- ) b1
- right join (select *
- from dwd_tb_log_pv_di
- where ds = '${bizdate}'
- and url_type = 'ipv'
- ) b2
- on b1.item_id = coalesce(b2.item_id,concat("tbcdm",rand())
- where b1.item_id is null
- ) l
- on a.item_id = coalesce(l.item_id,concat("tbcdm",rand());
- select /* mapjoin(a) */
- ...
- from
- (select /* mapjoin(b1) */
- b2.*
- from
- (select item_id
- from topk_item
- where ds = '${bizdate}'
- )b1
- join
- (select *
- from dwd_tb_log_pv_di
- where ds = '${bizdate}'
- and url_type = 'ipv'
- and item_id is not null
- ) b2
- on (b1.item_id = b2.item_id)
- ) l
- left outer join
- (select /* mapjoin(a1) */
- a2.*
- from
- (select item_id
- from topk_item
- where ds = '${bizdate}'
- ) a1
- join
- (select *
- from dim_tb_itm
- where ds = '${bizdate}'
- ) a2
- on (a1.item_id = a2.item_id)
- ) a
- on a.item_id = l.item_id;
union all
合并后即得到完整的日志数据,并且关联了商品的信息。注:代码来自阿里云官网。仅供参考,写的属实有点乱
对于skewjoin,平台在执行job时会自动进行如下操作
➢ hive.optimize.skewjoin.compiletime
如果建表语句元数据中指定了skew key,则使用set hive.optimize.skewjoin.compiletime=true开启skew join。
可以通过如下建表语句指定skewed key:
- create table list_bucket_single (key string, value string)
- skewed by (key) on (1,5,6) [stored as directories];
➢ hive.optimize.skewjoin
该参数为在运行时动态指定数据进行skewjoin,一般和hive.skewjoin.key参数一起使用
- set hive.optimize.skewjoin=true;
- set hive.skewjoin.key=100000;
以上参数表示当key记录条数超过100000时采用skewjoin操作
区别
hive.optimize.skewjoin.compiletime和hive.optimize.skewjoin区别为前者为编译时参数(倾斜key事先知道),后者为运行时参数(运行时动态判断) 前者在生成执行计划时根据元数据生成skewjoin,此参数要求倾斜值一定;后者为运行过程中根据数据条数进行skewjoin优化。hive.optimize.skewjoin实际上应该重名为为hive.optimize.skewjoin.runtime参数,考虑兼容性没有进行重命名
如果数据量超大,请 采用 bucket mapside join 或 sort-merge-bucket join ,具体参考 join详解
- set hive.map.aggr=true;
- select count(*) from table2;
group by Key出现长尾,是因为某个Key内的计算量特别大。
对于确定的倾斜值,先均匀分布到各个reducer上,然后开启新一轮reducer进行统计操作。写法如下
// 此处只是一个例,实际情况可以在map端开启聚(此处忽略) -- 正常写法 // select key , count(1) as cnt from tb_name group by key; -- 改进后写法 ,假设热点key值为 key001 select a.key , sum(cnt) as cnt from (select key , if(key = 'key001',random(),0) , count(1) as cnt from tb_name group by key, if(key = 'key001',random(),0) ) t group by t.key;
由上可见,这次的执行计划变成了M>R>R。虽然执行的步骤变长了,但是长尾的Key经过2个步骤的处理,整体的时间消耗可能反而有所减少
如果在不确定倾斜值的情况下,可以设置hive.groupby.skewindata参数
- set hive.groupby.skewindata=true;
- select key
- , count(1) as cnt
- from tb_name
- group by key;
其原理和上述写法调整中类似,是先对key值进行均匀分布,然后开启新一轮reducer求值
对于Distinct,把长Key进行拆分的策略已经不生效了。对这种场景,您可以考虑通过其它方式解决。
- --原始sql,不考虑uid为空。
- select count(uid) as pv
- , count(distinct uid) as uv
- from userlog;
-
-
- -- 改写后写法
- select sum(pv) as pv
- , count(*) as uv
- from (select count(*) as pv
- , uid
- from userlog
- group by uid
- )t;
一个任务中有多个语句,或者join时涉及到多张表且涉及表数据量较大,计算逻辑复杂,建议将各语句进行拆分,多表join也可以进行拆开。好处如下
代码清晰,各任务各司其职
有些任务可以串行执行,提供执行效率
如果任务依赖于多张上游表,上游表产出时间有先后,可以将先产出表先进行计算,避免最后上游表产出后任务统一计算,大大延迟结果表产出时间(分散计算)
方便观察发现瓶颈任务,针对瓶颈任务单独进行优化
在生产中需要对tb_a_df全量表和tb_b_df全量表进行关联,补齐A表中value1中信息,表数据信息如下
tb_a_df:1.2 亿条数据,存储4GB
tb_b_df:130 亿条数据,存储11TB
优化前
- -- yyyymmdd 为当天分区
- insert overwrite table result_table partition (ds = '${yyyymmdd}')
- select t.key
- , t.value
- , t1.value1
- from tb_a_df t
- left join tb_b_df t1
- on t.key = t1.key
- and t1.ds = '${yyyymmdd}'
- where t.ds = '${yyyymmdd}'
其中tb_b_df表数据量过大,join过程需要shuffle,极大消耗资源,以上任务运行时长为40分钟左右,且运行时长不稳定,影响数据产出时间
优化思路:
经验证发现:
tb_a_df表对应的增量表 tb_a_delta 最大更新35w条左右
tb_a_df 对应的增量表 tb_b_delta 每天更新量为9kw,100GB左右。
可以考虑将全量数据分为变动数据和非变动数据
于是我们从增量表入手将任务拆分为3个子task
- -- taska
- -- 计算 tb_a 中有变动的数据
- -- tb_a_delta 表数据量很小,可以采用map join,极大降低了消耗
- insert overwrite table result_table_delta_a partition(ds = '${yyyymmdd}')
- select /*+ mapjoin(t) */
- t.key
- , t.value
- , t1.value1
- from tb_a_delta t -- 数据量 35w
- join tb_b_df t1 -- 130 亿条数据
- on t.key = t1.key
- and t1.ds = '${yyyymmdd}'
- where t.ds = '${yyyymmdd}'
-
- -- taskb
- -- 计算 tb_b 中有变动的数据,此时数据量小了很多,大大降低了shuffle的成本
- insert overwrite table result_table_delta_b partition(ds = '${yyyymmdd}')
- select t.key
- , t.value
- , t1.value1
- from tb_a_df t -- 数据量 1.2亿
- join tb_b_delta t1 -- 数据量 9kw
- on t.key = t1.key
- and t1.ds = '${yyyymmdd}'
- where t.ds = '${yyyymmdd}'
-
- -- taskc
- --
- with tmp as
- (
- -- a表变动数据
- select t.key
- , t.value
- , t.value1
- , 1 flg
- from result_table_delta_a t
- where t.ds = '${yyyymmdd}'
-
- union all
-
- -- b表变动数据
- select t.key
- , t.value
- , t.value1
- , 2 flg
- from result_table_delta_b t
- where t.ds = '${yyyymmdd}'
- ),
- tmp1 as
- (
- -- 去重
- select t.key
- , t.value
- , t.value1
- , row_number() over(partition by key order by flg desc) as rn -- 优先取b变动数据
- from tmp t
- )
- insert overwrite table result_table partition (ds = '${yyyymmdd}')
- -- 变动数据
- select key
- , value
- , value1
- from tmp1 t
- where rn = 1
-
- union all
-
- -- 非变动数据,用昨天分区来补全
- -- 注意,此处可以直接用昨天分区数据 left anti join tmp1 即可。由于生产中未作验证,因此此处没表述
- select t.key
- , t.value
- , t2.value1
- from tb_a_df t
- left anti join tmp1 t1 -- 排除变动数据
- on t.key = t1.key
- and t1.rn = 1
- left join tb_a_df t2
- on t.key = t2.key
- and t2.ds = '${yyyymmdd-1}' -- 昨天分区
- where t.ds = '${yyyymmdd}'
效果
taska平均运行时长 5min
taskb平均运行时长 2.5min
taskc运行时长 7min
其中taska和taskb并行运行,待运行完成后再运行taskc,因此优化后任务时长为 5min + 7min = 12 min ,时长从 40min -> 12 min,时长降为原来的30%,也大大降低了计算资源
此处刚好 tb_a_delta 表较小,刚好可以采用 mapjoin,如果不能使用 mapjoin,需作何解?
可以采用hive中的buckted sort merge join方案,我司针对两个大表之间的join大量采用了此方案。
关于join介绍,请点击此处
请参考 hive简介 9.企业级调优
计算公式 computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
详情请看 mapreduce简介 -> 3.1.2 Job提交流程和切片源码 -> 2.FileInputFormat切片源码解析(input.getSplits(job))
hive中调整 maxSize 最大值即可
set mapreduce.input.fileinputformat.split.maxsize=100;
reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:
计算reducer数的公式N=min(参数2,总输入数据量/参数1)
即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;
- select user_id
- , count(1)
- from dwd_tb_crm_trd_rfd_case_df
- where ds= '20201001'
- group by user_id;
- // dwd_tb_crm_trd_rfd_case_df 总大小为9G多,因此这句有10个reduce
通过这个栗子我们可以看到模型对日常设计的重要性(此处只是一个案例,还有其它案例笔者后续总结出来)。是采用增量还是全量计算,是否将核心字段和非核心字段剥离 是平时需要关注的重要点
以上优化方式为一般且常见的优化方式,对于具体问题应该进行具体分析
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。