当前位置:   article > 正文

Hive调优策略——Fetch抓取 & 表的各种优化策略(mapjoin原理)_hive fetch任务 桶表

hive fetch任务 桶表

总结一下工作中常用的hive调优策略

目录

1. Fetch抓取

2. 小表Join大表(mapjoin的原理)

3. 大表Join大表

(0)前提:配置历史服务器

(1)空key过滤

(2)空key转换(将空值key转换为其他随机数,避免数据倾斜)

4. Group By发生数据倾斜

5. Count(distinct key)对key去重后求总行数

6. 行列过滤(分区过滤,先where再join嵌套子查询)

7. 动态分区(将数据动态插入分区表)

8. 分桶

9. 分区表


1. Fetch抓取

Hive中某些情况的查询可以不使用MR计算,因为MR速度比较慢,例如select * from table;这种情况下Hive可以简单读取table对应的存储目录下的文件,然后输出查询结果到控制台,效率提高。

具体的操作方式:

在hive/conf/hive-default.xml.template文件中,2637行,修改hive.fetch.task.conversion为more

这样全局查找,字段查找,filter查找,limit查找等都不走MR,直接Fetch。

  1. <property>
  2. <name>hive.fetch.task.conversion</name>
  3. <value>more</value>
  4. <description>
  5. Expects one of [none, minimal, more].
  6. Some select queries can be converted to single FETCH task minimizing latency.
  7. Currently the query should be single sourced not having any subquery and should not have
  8. any aggregations or distincts (which incurs RS), lateral views and joins.
  9. 0. none : disable hive.fetch.task.conversion
  10. 1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
  11. 2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
  12. </description>
  13. </property>

2. 小表Join大表(mapjoin的原理)

当小表Join大表时,如果不指定MapJoin,那么hive解析器会将join操作转换为Common Join操作,在Reduce端完成join,容易发生数据倾斜。开启MapJoin后可以将小表全部加载到内存中,在map端进行join,避免reducer处理。

优点:

(1)没有reducer处理,就不会产生数据倾斜;

(2)没有Map -> Reduce中间的shuffle操作,避免了IO。

  1. 1. 开启MapJoin设置
  2. 1)设置自动选择MapJoin,默认是true
  3. set hive.auto.convert.join = true;
  4. 2)设置小表阈值,默认是25M
  5. set hive.mapjoin.smalltable.filesize=25000000;
  6. 2. 再大表join小表,与小表join大表,执行速率几乎相等

Hive中mapjoin的原理:

map-side join:

小表数据映射成一张hashtable,再上传到分布式节点的内存中;
大表进行分片,每个节点一部分数据,大表数据文件作为map端输入,对map()函数每一对输入的kv都与已加载到内存中的小表数据连接,
把连接结果按key输出,有多少个map task,产生多少个结果文件;
由于join操作在map task中完成,所以无需启动reduce task,没有shuffle操作和reduce端,避免io和数据倾斜

reduce-side join:

map端把结果按key输出,并在value中标记出数据来源于table1还是table2
因为在shuffle阶段已经按key分组,reduce阶段会判断每个value来自哪张表,然后两表相同key的记录连接
join操作在reduce task中完成

缺点1:在map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低
缺点2:reduce对2个集合做城际计算,很耗内存,容易造成oom

3. 大表Join大表

有时join超时是因为某些key对应的数据太多了。由于相同的key对应的数据都会发送到相同的reducer上,如果出现数据倾斜可能导致内存不足。

常见对于key对应字段为空,可以采取的优化手段包括空值key过滤和空值key转换。

(0)前提:配置历史服务器

hive查询的MR流程可以在历史服务器的19888端口查看

  1. 1. 首先配置mapred-site.xml
  2. <property>
  3. <name>mapreduce.jobhistory.address</name>
  4. <value>hadoop100:10020</value>
  5. </property>
  6. <property>
  7. <name>mapreduce.jobhistory.webapp.address</name>
  8. <value>hadoop100:19888</value>
  9. </property>
  10. 2. 在shell端启动历史服务器
  11. sbin/mr-jobhistory-daemon.sh start historyserver
  12. 3. 查看jobhistory端口
  13. http://hadoop100:19888/jobhistory

(1)空key过滤

  1. 不过滤空id列
  2. insert overwrite table A
  3. select b.* from B b left join C c on b.id = c.id;
  4. 过滤掉空id列
  5. insert overwrite table A
  6. select b.* from (select * from B where id is not null) b left join C c on b.id = c.id;

过滤掉空id列后,耗费时间会降低很多。

(2)空key转换(将空值key转换为其他随机数,避免数据倾斜)

有时虽然某个key对应的null很多,但null并不是异常数据,不能过滤掉,必须包含在join的结果中,这样就可以考虑把表中key为null的字段赋一个随机值,使得数据随机均匀分到不同的reducer上。

适合无效字段(eg:id=-99,null等)产生的数据倾斜问题。因为空值不参与关联,即使分到不同的reduce上,也不影响最终的结果。

  1. 首先设置reduce个数
  2. set mapreduce.job.reduces = 5;
  3. 然后join两张表,随机设置null
  4. insert overwrite table A
  5. select b.* from B b full join C c on
  6. case when b.id is null then concat ('hive',rand()) else b.id end = c.id;

原理:当表b的字段id为null时,如果null过多所有null对应同一个key即id,都挤到一个reduce上,通过优化将表b的key=id换成key=hive随机数,这样null分配到不同的key上,避免数据倾斜。

case when A then B else C end语法:

当b表的字段id为null时,对id取值为拼接字符串(hive+随机数),否则依然取b.id;然后on条件为:hive随机数=c.id或者b.id=c.id。

4. Group By发生数据倾斜

对于group by聚合,默认情况下,Map阶段同一Key的数据发给一个reduce,若某个key的数据量太大,可能会造成数据倾斜。

如果在Map端就直接完成部分聚合,最后在Reduce端得出最终结果,就可以避免数据倾斜。

需要在Map端进行聚合参数设置:

  1. 1. 是否在Map端进行聚合,默认是true
  2. set hive.map.aggr = true
  3. 2. 在Map端进行聚合操作的条目数,默认10w条
  4. set hive.groupby.mapaggr.checkinterval = 100000
  5. 3. 有数据倾斜时进行负载均衡,默认false
  6. set hive.groupby.skewindata = true

原理是当set hive.groupby.skewindata = true后,会生成两个MR Job,启两个任务。

job1将group by的key,相同的key可能随机分发到不同的Reduce中,然后Reduce依据key对数据进行聚合,此时每一个Reduce中每个数据的key不一定相同,但是经过这一步聚合后,大大减少了数据量。

job2是真正意义上MR的底层实现,将相同的key分配到同一个reduce中,进行key的聚合操作。

第一步job1实现负载均衡,第二步job2实现聚合需求。

如果skewindata参数=false,也就是默认情况下,只会进行job2操作,进行一次MapReduce。

5. Count(distinct key)对key去重后求总行数

当数据量大的时候,由于count(distinct key)去重聚合是全聚合操作,即便是设定了reduce tasks的个数,例如set mapred.reduce.tasks=100;hive也只会启动一个reducer(order by也是这个情况),这就造成一个reducer处理的数据量太大,导致整个Job完成的很慢。

可以将count(distinct key)的方式,改为先group by 再count的方式,也就是将distinct换成group by。

这种优化可以增加reducer的个数,虽然会用多个Job完成,但是适合处理数据量大的情况。

  1. 原始: select count(distinct id) from table;
  2. 优化后:select count(id) from (select id from table group by id) a;

6. 行列过滤(分区过滤,先where再join嵌套子查询)——谓词下推

列处理:在select中,只拿需要的列,如果有分区,尽量使用分区字段查询(分区过滤),避免使用select *全表扫描

              select key from tablename where 分区字段 = '~'

行处理:两表连接时,对一个表的数据先where过滤,再join(如果先join再过滤,过滤的数据量会很大),即嵌套子查询

  1. eg:join两张表,A表joinB表,要求是把B表中id<10的过滤掉后,只查询联结表的id列
  2. 原始: select a.id from A a join B b on a.id=b.id where b.id<10
  3. 优化后:select a.id from A a join (select id from B where id<10) b on a.id=b.id

原理:因为执行顺序是先join再where,所以把where放到子查询中优先执行。

7. 动态分区(将数据动态插入分区表)

分区列是表的一个伪列,它对应HDFS的一个分区文件夹,并且分区列存在于表的最后。

如果不设定动态分区,往分区表中导入数据的方式如下:

  1. 1. 创建分区表
  2. create table stu_par(id int,name string)
  3. partitioned by (month string)
  4. row format delimited fields terminated by '\t';
  5. 2. 往分区中导入数据
  6. load data local inpath '/opt/module/datas/student.txt' into table stu_par partition(month='10');

如果设定动态分区,导入数据就不再需要指定分区字段了

1. 设置为非严格模式(默认strict,表示必须指定至少一个分区为静态分区;nonstrict表示允许所有分区可使用动态分区)

hive.exec.dynamic.partition.mode=nonstrict

2. 默认配置

(1)开启动态分区功能(默认true,开启)

hive.exec.dynamic.partition=true

(2)在所有执行MR节点上,最大一共可以创建多少个动态分区,默认1000个

hive.exec.max.dynamic.partitions=1000

(3)在每个执行MR的节点上,最大可以创建多少个分区,默认值100

eg:若源数据包含一年的数据,按照天数分区,day字段应该有365个值,这里的默认值100就应该修改为大于365的数。

hive.exec.max.dynamic.partitions.pernode=100

(4)整个MR job中,最大可以创建多少个HDFS文件,默认100000

hive.exec.max.created.files=100000

  1. 1. 创建分区表
  2. create table dept_par(id string,name string) partitioned by (location int)
  3. row format delimited fields terminated by '\t';
  4. 2. 设置动态分区非严格模式
  5. set hive.exec.dynamic.partition.mode = nonstrict;
  6. 3. 查看dept表
  7. +--------------+-------------+-----------+--+
  8. | dept.deptno | dept.dname | dept.loc |
  9. +--------------+-------------+-----------+--+
  10. | 10 | ACCOUNTING | 1700 |
  11. | 20 | RESEARCH | 1800 |
  12. | 30 | SALES | 1900 |
  13. | 40 | OPERATIONS | 1700 |
  14. +--------------+-------------+-----------+--+
  15. 4. 分区表动态导入数据(并未指定分区字段的值)
  16. insert into table dept_par partition(location) select deptno,dname,loc from dept;
  17. ...
  18. Loading partition {location=1900}
  19. Loading partition {location=1800}
  20. Loading partition {location=1700}
  21. ...
  22. 5. 查看分区表的分区情况
  23. hive (hive_db1)> show partitions dept_par;
  24. OK
  25. partition
  26. location=1700
  27. location=1800
  28. location=1900
  29. 6. 查询分区表
  30. select * from dept_par where location='1700';
  31. +--------------+----------------+--------------------+--+
  32. | dept_par.id | dept_par.name | dept_par.location |
  33. +--------------+----------------+--------------------+--+
  34. | 10 | ACCOUNTING | 1700 |
  35. | 40 | OPERATIONS | 1700 |
  36. +--------------+----------------+--------------------+--+
  37. select * from dept_par where location='1800';
  38. +--------------+----------------+--------------------+--+
  39. | dept_par.id | dept_par.name | dept_par.location |
  40. +--------------+----------------+--------------------+--+
  41. | 20 | RESEARCH | 1800 |
  42. +--------------+----------------+--------------------+--+
  43. select * from dept_par where location='1900';
  44. +--------------+----------------+--------------------+--+
  45. | dept_par.id | dept_par.name | dept_par.location |
  46. +--------------+----------------+--------------------+--+
  47. | 30 | SALES | 1900 |
  48. +--------------+----------------+--------------------+--+

原理:

将select的最后一列,认为是分区列(因为分区列在表的最后),将最后一列字段值相同的行,导入同一个分区中

好处是避免了指定分区字段的值,直接动态的将值相同的行导入同一个分区中,加大效率。

ps:如果把deptno放在select最后一列,那么会生成四个分区

  1. +--------------+--+
  2. | partition |
  3. +--------------+--+
  4. | location=10 |
  5. | location=20 |
  6. | location=30 |
  7. | location=40 |
  8. +--------------+--+

8. 分桶

适合对数据进行抽样查询的情况,clustered by 字段 into x buckets,将表数据以(字段hash值%分桶数)按照取模结果,对数据进行分桶,也就是随机分布成几块。

ps:分区列是伪列,需要指明字段类型;分桶列是实际列,不需要指明字段类型。

分桶前,需要设置属性set hive.enforce.bucketing = true

分桶表的创建、抽样查询

9. 分区表

主要目的是提高hive查询效率,避免全表查询 select * from tablename where 分区字段 = '~'

分区表的创建、查询

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/148567
推荐阅读
相关标签
  

闽ICP备14008679号