赞
踩
总结一下工作中常用的hive调优策略
目录
(2)空key转换(将空值key转换为其他随机数,避免数据倾斜)
5. Count(distinct key)对key去重后求总行数
6. 行列过滤(分区过滤,先where再join嵌套子查询)
Hive中某些情况的查询可以不使用MR计算,因为MR速度比较慢,例如select * from table;这种情况下Hive可以简单读取table对应的存储目录下的文件,然后输出查询结果到控制台,效率提高。
具体的操作方式:
在hive/conf/hive-default.xml.template文件中,2637行,修改hive.fetch.task.conversion为more;
这样全局查找,字段查找,filter查找,limit查找等都不走MR,直接Fetch。
- <property>
- <name>hive.fetch.task.conversion</name>
- <value>more</value>
- <description>
- Expects one of [none, minimal, more].
- Some select queries can be converted to single FETCH task minimizing latency.
- Currently the query should be single sourced not having any subquery and should not have
- any aggregations or distincts (which incurs RS), lateral views and joins.
- 0. none : disable hive.fetch.task.conversion
- 1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
- 2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
- </description>
- </property>
当小表Join大表时,如果不指定MapJoin,那么hive解析器会将join操作转换为Common Join操作,在Reduce端完成join,容易发生数据倾斜。开启MapJoin后可以将小表全部加载到内存中,在map端进行join,避免reducer处理。
优点:
(1)没有reducer处理,就不会产生数据倾斜;
(2)没有Map -> Reduce中间的shuffle操作,避免了IO。
- 1. 开启MapJoin设置
- (1)设置自动选择MapJoin,默认是true
- set hive.auto.convert.join = true;
-
- (2)设置小表阈值,默认是25M
- set hive.mapjoin.smalltable.filesize=25000000;
-
- 2. 再大表join小表,与小表join大表,执行速率几乎相等
Hive中mapjoin的原理:
map-side join:
小表数据映射成一张hashtable,再上传到分布式节点的内存中;
大表进行分片,每个节点一部分数据,大表数据文件作为map端输入,对map()函数每一对输入的kv都与已加载到内存中的小表数据连接,
把连接结果按key输出,有多少个map task,产生多少个结果文件;
由于join操作在map task中完成,所以无需启动reduce task,没有shuffle操作和reduce端,避免io和数据倾斜reduce-side join:
map端把结果按key输出,并在value中标记出数据来源于table1还是table2
因为在shuffle阶段已经按key分组,reduce阶段会判断每个value来自哪张表,然后两表相同key的记录连接
join操作在reduce task中完成缺点1:在map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低
缺点2:reduce对2个集合做城际计算,很耗内存,容易造成oom
有时join超时是因为某些key对应的数据太多了。由于相同的key对应的数据都会发送到相同的reducer上,如果出现数据倾斜可能导致内存不足。
常见对于key对应字段为空,可以采取的优化手段包括空值key过滤和空值key转换。
hive查询的MR流程可以在历史服务器的19888端口查看
- 1. 首先配置mapred-site.xml
- <property>
- <name>mapreduce.jobhistory.address</name>
- <value>hadoop100:10020</value>
- </property>
- <property>
- <name>mapreduce.jobhistory.webapp.address</name>
- <value>hadoop100:19888</value>
- </property>
-
- 2. 在shell端启动历史服务器
- sbin/mr-jobhistory-daemon.sh start historyserver
-
- 3. 查看jobhistory端口
- http://hadoop100:19888/jobhistory
- 不过滤空id列
- insert overwrite table A
- select b.* from B b left join C c on b.id = c.id;
-
- 过滤掉空id列
- insert overwrite table A
- select b.* from (select * from B where id is not null) b left join C c on b.id = c.id;
过滤掉空id列后,耗费时间会降低很多。
有时虽然某个key对应的null很多,但null并不是异常数据,不能过滤掉,必须包含在join的结果中,这样就可以考虑把表中key为null的字段赋一个随机值,使得数据随机均匀分到不同的reducer上。
适合无效字段(eg:id=-99,null等)产生的数据倾斜问题。因为空值不参与关联,即使分到不同的reduce上,也不影响最终的结果。
- 首先设置reduce个数
- set mapreduce.job.reduces = 5;
-
- 然后join两张表,随机设置null值
- insert overwrite table A
- select b.* from B b full join C c on
- case when b.id is null then concat ('hive',rand()) else b.id end = c.id;
原理:当表b的字段id为null时,如果null过多所有null对应同一个key即id,都挤到一个reduce上,通过优化将表b的key=id换成key=hive随机数,这样null分配到不同的key上,避免数据倾斜。
case when A then B else C end语法:
当b表的字段id为null时,对id取值为拼接字符串(hive+随机数),否则依然取b.id;然后on条件为:hive随机数=c.id或者b.id=c.id。
对于group by聚合,默认情况下,Map阶段同一Key的数据发给一个reduce,若某个key的数据量太大,可能会造成数据倾斜。
如果在Map端就直接完成部分聚合,最后在Reduce端得出最终结果,就可以避免数据倾斜。
需要在Map端进行聚合参数设置:
- 1. 是否在Map端进行聚合,默认是true
- set hive.map.aggr = true
-
- 2. 在Map端进行聚合操作的条目数,默认10w条
- set hive.groupby.mapaggr.checkinterval = 100000
-
- 3. 有数据倾斜时进行负载均衡,默认false
- set hive.groupby.skewindata = true
原理是当set hive.groupby.skewindata = true后,会生成两个MR Job,启两个任务。
job1将group by的key,相同的key可能随机分发到不同的Reduce中,然后Reduce依据key对数据进行聚合,此时每一个Reduce中每个数据的key不一定相同,但是经过这一步聚合后,大大减少了数据量。
job2是真正意义上MR的底层实现,将相同的key分配到同一个reduce中,进行key的聚合操作。
第一步job1实现负载均衡,第二步job2实现聚合需求。
如果skewindata参数=false,也就是默认情况下,只会进行job2操作,进行一次MapReduce。
当数据量大的时候,由于count(distinct key)去重聚合是全聚合操作,即便是设定了reduce tasks的个数,例如set mapred.reduce.tasks=100;hive也只会启动一个reducer(order by也是这个情况),这就造成一个reducer处理的数据量太大,导致整个Job完成的很慢。
可以将count(distinct key)的方式,改为先group by 再count的方式,也就是将distinct换成group by。
这种优化可以增加reducer的个数,虽然会用多个Job完成,但是适合处理数据量大的情况。
- 原始: select count(distinct id) from table;
-
- 优化后:select count(id) from (select id from table group by id) a;
列处理:在select中,只拿需要的列,如果有分区,尽量使用分区字段查询(分区过滤),避免使用select *全表扫描
select key from tablename where 分区字段 = '~'
行处理:两表连接时,对一个表的数据先where过滤,再join(如果先join再过滤,过滤的数据量会很大),即嵌套子查询
- eg:join两张表,A表joinB表,要求是把B表中id<10的过滤掉后,只查询联结表的id列
- 原始: select a.id from A a join B b on a.id=b.id where b.id<10
-
- 优化后:select a.id from A a join (select id from B where id<10) b on a.id=b.id
原理:因为执行顺序是先join再where,所以把where放到子查询中优先执行。
分区列是表的一个伪列,它对应HDFS的一个分区文件夹,并且分区列存在于表的最后。
如果不设定动态分区,往分区表中导入数据的方式如下:
- 1. 创建分区表
- create table stu_par(id int,name string)
- partitioned by (month string)
- row format delimited fields terminated by '\t';
-
- 2. 往分区中导入数据
- load data local inpath '/opt/module/datas/student.txt' into table stu_par partition(month='10');
如果设定动态分区,导入数据就不再需要指定分区字段了
1. 设置为非严格模式(默认strict,表示必须指定至少一个分区为静态分区;nonstrict表示允许所有分区可使用动态分区)
hive.exec.dynamic.partition.mode=nonstrict
2. 默认配置
(1)开启动态分区功能(默认true,开启)
hive.exec.dynamic.partition=true
(2)在所有执行MR节点上,最大一共可以创建多少个动态分区,默认1000个
hive.exec.max.dynamic.partitions=1000
(3)在每个执行MR的节点上,最大可以创建多少个分区,默认值100
eg:若源数据包含一年的数据,按照天数分区,day字段应该有365个值,这里的默认值100就应该修改为大于365的数。
hive.exec.max.dynamic.partitions.pernode=100
(4)整个MR job中,最大可以创建多少个HDFS文件,默认100000
hive.exec.max.created.files=100000
- 1. 创建分区表
- create table dept_par(id string,name string) partitioned by (location int)
- row format delimited fields terminated by '\t';
-
- 2. 设置动态分区非严格模式
- set hive.exec.dynamic.partition.mode = nonstrict;
-
- 3. 查看dept表
- +--------------+-------------+-----------+--+
- | dept.deptno | dept.dname | dept.loc |
- +--------------+-------------+-----------+--+
- | 10 | ACCOUNTING | 1700 |
- | 20 | RESEARCH | 1800 |
- | 30 | SALES | 1900 |
- | 40 | OPERATIONS | 1700 |
- +--------------+-------------+-----------+--+
-
- 4. 分区表动态导入数据(并未指定分区字段的值)
- insert into table dept_par partition(location) select deptno,dname,loc from dept;
- ...
- Loading partition {location=1900}
- Loading partition {location=1800}
- Loading partition {location=1700}
- ...
-
- 5. 查看分区表的分区情况
- hive (hive_db1)> show partitions dept_par;
- OK
- partition
- location=1700
- location=1800
- location=1900
-
- 6. 查询分区表
- select * from dept_par where location='1700';
- +--------------+----------------+--------------------+--+
- | dept_par.id | dept_par.name | dept_par.location |
- +--------------+----------------+--------------------+--+
- | 10 | ACCOUNTING | 1700 |
- | 40 | OPERATIONS | 1700 |
- +--------------+----------------+--------------------+--+
-
- select * from dept_par where location='1800';
- +--------------+----------------+--------------------+--+
- | dept_par.id | dept_par.name | dept_par.location |
- +--------------+----------------+--------------------+--+
- | 20 | RESEARCH | 1800 |
- +--------------+----------------+--------------------+--+
-
- select * from dept_par where location='1900';
- +--------------+----------------+--------------------+--+
- | dept_par.id | dept_par.name | dept_par.location |
- +--------------+----------------+--------------------+--+
- | 30 | SALES | 1900 |
- +--------------+----------------+--------------------+--+
原理:
将select的最后一列,认为是分区列(因为分区列在表的最后),将最后一列字段值相同的行,导入同一个分区中;
好处是避免了指定分区字段的值,直接动态的将值相同的行导入同一个分区中,加大效率。
ps:如果把deptno放在select最后一列,那么会生成四个分区
- +--------------+--+
- | partition |
- +--------------+--+
- | location=10 |
- | location=20 |
- | location=30 |
- | location=40 |
- +--------------+--+
适合对数据进行抽样查询的情况,clustered by 字段 into x buckets,将表数据以(字段hash值%分桶数)按照取模结果,对数据进行分桶,也就是随机分布成几块。
ps:分区列是伪列,需要指明字段类型;分桶列是实际列,不需要指明字段类型。
分桶前,需要设置属性set hive.enforce.bucketing = true
主要目的是提高hive查询效率,避免全表查询 select * from tablename where 分区字段 = '~'
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。