赞
踩
目标:在有限的资源下提升执行效率;
hive表的优化:分区
hive查询优化:
1、join优化:
hive.optimize.skewjoin=true;如果是join过程中出现倾斜 应该设置为true;
set hive.skewjoin.key=100000; 这个是join的键对应的记录条数,超过这个值则会进行优化;
2、mapjoin优化
set hive.auto.convert.join=true;
hive.mapjoin.smalltable.filesize默认值是25mb;
select/*+mapjoin(A)*/f.a, f.b from A t join B f on (f.a=t.a);
3、bucket join
两个表以相同方式划分桶,两个表的桶个数是倍数关系
create table order(cid int, price float) clustered by(cid) into 32 buckets;
create table customer(id int, first string) clustered by(id) into 32 buckets;
select price fromorder t join customers s on t.cid = s.id;
4、group by优化
hive.group.skewindata=true; 如果是group by过程出现倾斜,应该设置为true;
set hive.groupby.mapaggr.checkinterval=100000; 这个是group的键对应的记录条数超过这个值则会进行优化
5、count distinct优化
优化前:
selectcount(distinctid) from tablename;
select a, sum(b), count(distinct c), count(distinct d) fromtestgroupby a;
优化后:
select count(1) from (selectidfrom tablename groupbyid) tmp;
select a, sum(b) as b, count(c) as c, count(d) as d
from(
select a,0 as b, c, null as d from test group by a,c
union all
select a,0 as b, null as c, d from test group by a, d
union all
select a,b,null as c, null as d from test
)tmp1 group by a;
6、Hive job优化
每个查询被Hive转化为多个阶段,有些阶段关联性不大,则可以并行化执行,减少执行时间
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8;
job合并输出小文件
set hive.merge.smallfiles.avgsize=256000000;当输出文件平均大小小于该值,启动新job合并文件;
set hive.merge.size.per.task=64000000;合并之后的文件大小;
7、JVM重利用
set mapred.job.reuse.jvm.num.tasks=20;
JVM重利用可以是Job长时间保留slot,直到作业结束,这在对于有较多任务和较多小文件的任务时非常有意义的,减少执行时间。但是这个值不能设置过大,因为有些作业会有reduce任务,如果reduce任务没有完成,则map任务占用的slot不能释放,其他的作业可能就需要等待。
8、压缩数据
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
9、Hive Map优化
(1) 默认map个数
default_num = total_size / block_size;
(2)期望大小
goal_num = mapred.map.tasks;
(3)设置处理的文件大小
split_size=max(mapred.min.split.size, block_size)
split_num = total_size / split_size;
(4)计算map个数
compute_map_num = min(split_num, max(default_num, goal_num))
设置map个数总结如下:
(1) 如果想增加map个数,则设置mapred.map.tasks为一个较大的值
(2) 如果想减少map个数,则设置mapred.min.split.size为一个较大的值
情况1:输入文件size巨大,但不是小文件
增大mapred.min.split.size的值
情况2:输入文件数量巨大,且都是小文件,就是单个文件的size小于blockSize。
map端聚合
set hive.map.aggr=true;
10、Hive Shuffle优化
11、Hive Reduce优化
set mapred.reduce.tasks=10;直接设置
hive.exec.reducers.max 默认999
hive.exec.reducers.bytes.per.reducer 默认1G
numTasks = min(maxReducers, input.size / perReducer)
maxReducers = hive.exec.reducers.max
perReducer=hive.exec.reducers.bytes.per.reducer
12、列裁剪和分区裁剪
只获取我们需要的列和分区
13、优先过滤,把条件先放在where条件中,减少数据再进行关联
14、sort by 替换order by,配合distribute by一起使用
15、group by 代替distinct去重数据,会有多个mapreduce执行,大数据量情况下比较好用;
16、join优化
小表前置,因为hive在解析sql的时候会把第一个表放进内存;
17、多表关联时尽量key相同,会当成同一个mr任务执行
select a.event_type,a.event_code,a.event_desc,b.upload_time from calendar_event_code a inner join ( select event_type,upload_time from calendar_record_log where pt_date = 20190225 ) b on a.event_type = b.event_type inner join ( select event_type,upload_time from calendar_record_log_2 where pt_date = 20190225 ) c on a.event_type = c.event_type;
18、倾斜均衡配置项
这个配置与上面group by的倾斜均衡配置项异曲同工,通过hive.optimize.skewjoin
来配置,默认false。
如果开启了,在join过程中Hive会将计数超过阈值hive.skewjoin.key
(默认100000)的倾斜key对应的行临时写进文件中,然后再启动另一个job做map join生成结果。通过hive.skewjoin.mapjoin.map.tasks
参数还可以控制第二个job的mapper数量,默认10000。 再重复一遍,通过自带的配置项经常不能解决数据倾斜问题。join是数据倾斜的重灾区,后面还要介绍在SQL层面处理倾斜的各种方法。
19、优化SQL处理join数据倾斜
这其实是上面处理空值方法的拓展,不过倾斜的key变成了有意义的。一般来讲倾斜的key都很少,我们可以将它们抽样出来,对应的行单独存入临时表中,然后打上一个较小的随机数前缀(比如0~9),最后再进行聚合。SQL语句与上面的相仿,不再赘述。
不同数据类型
这种情况不太常见,主要出现在相同业务含义的列发生过逻辑上的变化时。 举个例子,假如我们有一旧一新两张日历记录表,旧表的记录类型字段是(event_type int),新表的是(event_type string)。为了兼容旧版记录,新表的event_type也会以字符串形式存储旧版的值,比如'17'。当这两张表join时,经常要耗费很长时间。其原因就是如果不转换类型,计算key的hash值时默认是以int型做的,这就导致所有“真正的”string型key都分配到一个reducer上。所以要注意类型转换:
select a.uid,a.event_type,b.record_data from calendar_record_log a left outer join ( select uid,event_type from calendar_record_log_2 where pt_date = 20190228 ) b on a.uid = b.uid and b.event_type = cast(a.event_type as string) where a.pt_date = 20190228;
有时,build table会大到无法直接使用map join的地步,比如全量用户维度表,而使用普通join又有数据分布不均的问题。这时就要充分利用probe table的限制条件,削减build table的数据量,再使用map join解决。代价就是需要进行两次join。举个例子:
select /*+mapjoin(b)*/ a.uid,a.event_type,b.status,b.extra_info from calendar_record_log a left outer join ( select /*+mapjoin(s)*/ t.uid,t.status,t.extra_info from (select distinct uid from calendar_record_log where pt_date = 20190228) s inner join user_info t on s.uid = t.uid ) b on a.uid = b.uid where a.pt_date = 20190228
MapReduce优化
调整mapper数
一般来讲,如果输入文件是少量大文件,就减少mapper数;如果输入文件是大量非小文件,就增大mapper数;至于大量小文件的情况,得参考下面“合并小文件”一节的方法处理。
调整reducer数
reducer数量与输出文件的数量相关。如果reducer数太多,会产生大量小文件,对HDFS造成压力。如果reducer数太少,每个reducer要处理很多数据,容易拖慢运行时间或者造成OOM。
reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)
。
合并小文件
输入阶段合并 需要更改Hive的输入文件格式,即参数hive.input.format
,默认值是org.apache.hadoop.hive.ql.io.HiveInputFormat
,我们改成org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
。 这样比起上面调整mapper数时,又会多出两个参数,分别是mapred.min.split.size.per.node
和mapred.min.split.size.per.rack
,含义是单节点和单机架上的最小split大小。如果发现有split大小小于这两个值(默认都是100MB),则会进行合并。具体逻辑可以参看Hive源码中的对应类。
输出阶段合并 直接将hive.merge.mapfiles
和hive.merge.mapredfiles
都设为true即可,前者表示将map-only任务的输出合并,后者表示将map-reduce任务的输出合并。 另外,hive.merge.size.per.task
可以指定每个task输出后合并文件大小的期望值,hive.merge.size.smallfiles.avgsize
可以指定所有输出文件大小的均值阈值,默认值都是1GB。如果平均大小不足的话,就会另外启动一个任务来进行合并。
所谓严格模式,就是强制不允许用户执行3种有风险的HiveSQL语句,一旦执行会直接失败。这3种语句是:
查询分区表时不限定分区列的语句;
两表join产生了笛卡尔积的语句;
用order by来排序但没有指定limit的语句。
要开启严格模式,需要将参数hive.mapred.mode
设为strict
采用合适的存储格式
在HiveSQL的create table语句中,可以使用stored as ...
指定表的存储格式。Hive表支持的存储格式有TextFile、SequenceFile、RCFile、Avro、ORC、Parquet等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。