当前位置:   article > 正文

hive--从底层梳理优化_hsh.hive

hsh.hive

一、前言

Hadoop的MapReudce(分而治之

  • Input→程序的输入
  • Map→分,将任务在逻辑上划分多个小的任务
  • Shuffle→分组、排序
  • Reduce→合,将shuffle处理的结果,进行聚合
  • Output→程序的输出

Hive(翻译官

  • 将HDFS文件映射成表中的数据
  • 将SQL解析为MapReduce程序
  • 优化
    • 减少数据量(存储/计算)
    • 让mr处理适当

SQL

  • 针对一段sql,我们可以了解一下他的执行顺序
(7) SELECT
(8) DISTINCT <select_list>
(1) FROM <left_table>
(3) <join_type> JOIN <right_table>
(2) ON <join_condition>
(4) WHERE <where_condition>
(5) GROUP BY <group_by_list>
(6) HAVING <having_condition>
(9) ORDER BY <order_by_condition>
(10) LIMIT <limit_number>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

FROM (→WHERE )→ ON → JOIN (→WHERE )(→ SELECT)→ GROUP BY (→ SELECT)→ HAVING (→ SELECT) → DISTINCT → UNION → ORDER BY (→ SELECT)→ LIMIt

这里其实select 由于要为下一步输出做选择,其实比较特殊,不同写法会存在很多次不同的顺序中
where 由于谓词下推也会比较特殊,也可能会存在多次
所以有兴趣的可以进一步思考谓词下推,什么情况能下推,什么情况不会下推?
tips:这里只是初略看下,有个初步概念,详细的也可以查看这篇文章(会有一些差异,可以思考下怎么回事)

  • 这些关键字决定了什么
select 1 from 2 where 3 group by 4 having 5 order by 6 limit 7;
  • 1

1→决定了结果有哪些列:要么是已存在的列,要么是函数生成的列,列的过滤
2→决定了读取数据的数据源
3→决定了对哪些行进行过滤
4→按照什么条件进行分组
5→分组以后对哪些行进行过滤
6→按照什么条件进行排序
7→限制输出

二、优化方向

  • 数据层面的优化
    • 模型设计
    • 存储
    • 。。。
  • 操作层面的优化
    • 查询sql的优化
    • 控制参数的优化
    • 。。。

三、从mr角度看优化

Input

  • 减少数据量
    • 分区表
      • 指定分区
      • →尽量在where中使用分区字段
    • 逻辑中有临时表
      • →做好列裁剪(即只保留我们需要的列)
    • join
      • →先过滤再join

Map

  • 可以开启map端的推测执行
  • 可以开启mapjoin
-- 是否自动转换为mapjoin
set hive.auto.convert.join=true

-- 小表的最大文件大小,默认为25000000,即25M
set hive.mapjoin.smalltable.filesize=25000000

-- 是否将多个mapjoin合并为一个
set hive.auto.convert.join.noconditionaltask=true

-- 合并mapjoin有啥好处呢?
-- 因为每个mapjoin都要执行一次map,需要读写一次数据,所以多个mapjoin就要做多次的数据读写。
-- 合并mapjoin后只用读写一次,自然能大大加快速度。
-- 但是执行map是内存大小是有限制的,在一次map里对多个小表做mapjoin就必须把多个小表都加入内存
-- ,为了防止内存溢出,所以加了hive.auto.convert.join.noconditionaltask.size参数来做限制。
-- 不过,这个值只是限制输入的表文件的大小,并不代表实际mapjoin时hashtable的大小。 
-- 多个mapjoin转换为1个时,所有小表的文件大小总和的最大值。
set hive.auto.convert.join.noconditionaltask.size=20971520

-- 使用mapjoin时,会先执行一个本地任务(mapreduce local task)将小表转成hashtable并序列化为文件再压缩
--,随后这些hashtable文件会被上传到hadoop缓存,提供给各个mapjoin使用。这里有三个参数我们需要注意:

-- 将小表转成hashtable的本地任务的最大内存使用率,默认0.9
set hive.mapjoin.localtask.max.memory.usage=0.9

-- 如果mapjoin后面紧跟着一个group by任务,这种情况下 本地任务的最大内存使用率,默认是0.55
set hive.mapjoin.followby.gby.localtask.max.memory.usage=0.55

-- localtask每处理完多少行,就执行内存检查。默认为100000
set hive.mapjoin.check.memory.rows=100000

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 数量控制
  • 过多→需要调少
    • 一个小文件会启动一个map,如果有很多个小文件,可能一个map任务启动和初始化的时间远远大于逻辑处理的时间
    • 如何控制
      • 配置combine参数可以执行前进行小文将合并(一般为默认)
      • splitSize的公式:max(minSize, Math.min(maxSize, blockSize))
        • 一般配置这两个就行了(即下面的第二第三个参数)
        • 如果不可以,确定是否启动了压缩,且压缩的算法是否支持文件切分
-- hive0.5开始就是默认值,执行map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat 
set mapred.min.split.size= 10000000;
set mapred.max.split.size= 256000000;
-- 一个节点上split的至少的大小 ,决定了多个data node上的文件是否需要合并
set mapred.min.split.size.per.node= 100000000;
-- 一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
set mapred.min.split.size.per.rack= 100000000;

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 过少→需要调多
    • 当然也有128m左右(允许有10%的溢出)的文件只启动一个map,但是逻辑十分复杂
    • min(totalsize/mapred.reduce.tasks,块大小)
      • set mapred.reduce.tasks=100;
    • 利用中间表
desc formatted tmpdb.hzy_0701; -- numFiles为1
set hive.merge.mapredfiles = false; -- 关闭merge
set mapred.reduce.tasks=100; -- 设置reduce数量
DROP TABLE IF EXISTS tmpdb.hzy_0701_2;
create table tmpdb.hzy_0701_2 as
select * from tmpdb.hzy_0701
distribute by CAST(RAND() *100 AS INT)
;
desc formatted tmpdb.hzy_0701_2; -- numFiles为100
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

Shuffle

  • 对于on中某些key过多,可以加前缀

  • 空值的key变字符串+随机数分配到不同的reduce中,null关联不上,不影响结果

  • map输出结果压缩

-- 开启中间压缩(map输出结果压缩)
set hive.exec.compress.intermediate = true;
  • 1
  • 2
  • 配置压缩参数,减少网络输出
  • 关联时尽量指定条件,减少笛卡尔积的产生
  • 可以设计分桶表
  • 使数据尽量均匀分布到各个reduce中
    • 启动两个mr,通过参数skewjoin,先随机,再聚合
set hive.groupby.skewindata = true;
  • 1
  • 开启倾斜关联(运行时/编译时)、开启union的优化(避免二次读写),并设置判断key倾斜的阈值条数
set hive.optimize.skewjoin=true;
set hive.optimize.skewjoin.compiletime=true;
-- 使用hive.optimize.union.remove优化的时候必须设置mapred.input.dir.recursive=true。
set hive.optimize.union.remove=true;

set hive.skewjoin.key=100000; -- 默认值100000。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 对于on中某些key过多,可以加前缀
  • 空值的key变字符串+随机数分配到不同的reduce中,null关联不上,不影响结果
select *
from log a
left join users b
on case when a.user_id is null
		then concat('hive',rand())
		else a.user_id
	end = b.user_id

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Reduce

  • map端聚合
set hive.map.aggr = true;
  • 1
  • merge参数执行mr结束时合并小文件
-- 在map-reduce的任务结束时合并小文件
set hive.merge.mapredfiles = true;
-- 合并文件的大小,设置为块大小的两倍256m
set hive.merge.size.per.task = 256000000;
-- 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
set hive.merge.smallfiles.avgsize=128000000;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 配置压缩参数,减少数据存储空间

Output

  • 开启mr的推测执行
  • 开启mr的jvm重用
  • 配置压缩
  • 开启并行执行

四、其他常用优化&参数

  • 下面的一些可能融合到第三章,但是考虑到过于臃肿,所以单独列一章

动态分区

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
  • 1
  • 2

分区数量

set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.dynamic.partitions.pernode=100000;
  • 1
  • 2

开启并行

  • 默认只能编译一段HiveSQL,并上锁
  • 可以开启并行度,保证可以同时编译,及最大并行度
  • 同步执行hive的多个阶段,hive在执行过程,将一个查询转化成一个或者多个阶段。某个特定的job可能包含众多的阶段,而这些阶段可能并非完全相互依赖的,也就是说可以并行执行的,这样可能使得整个job的执行时间缩短
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8;
  • 1
  • 2

map端/reduce端内存

set mapreduce.map.memory.mb=8048;
-- mapreduce.map.java.opts一定要小于mapreduce.map.memory.mb
set mapreduce.map.java.opts=-Xmx8000m; -- 启动 JVM 虚拟机时,传递给虚拟机的启动参数,而默认值 -Xmx200m 表示这个 Java 程序可以使用的最大堆内存数,一旦超过这个大小,JVM 就会抛出 Out of Memory 异常,并终止进程。
set mapreduce.reduce.memory.mb=4096;
  • 1
  • 2
  • 3
  • 4

设置作业优先级(VERY_HIGH,HIGH,NORMAL,LOW)

set mapred.job.priority = NORMAL;
  • 1

Bucket-MapJoin开启

  • 一个表的Bucket数是另一个Bucket的整数倍,可以对两个表均做hash再进行join
set hive.optimize.bucketmapjoin = true;
-- 一个表的bucket数是另一个表bucket数的整数倍
-- bucket列 == join列
-- 必须是应用在map join的场景中
-- 如果表不是bucket的,则只是做普通join。
  • 1
  • 2
  • 3
  • 4
  • 5

SMBJoin开启

  • 基于有序桶表(针对bucket mapjoin 的一种优化)
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
-- 小表的bucket数=大表bucket数
-- Bucket 列 == Join 列 == sort 列
-- 必须是应用在bucket mapjoin 的场景中

-- hive并不检查两个join的表是否已经做好bucket且sorted,需要用户自己去保证join的表
-- ,否则可能数据不正确。有两个办法
-- 1)hive.enforce.sorting 设置为true。
-- 2)手动生成符合条件的数据,通过在sql中用distributed c1 sort by c1 或者 cluster by c1
-- 表创建时必须是CLUSTERED且SORTED,如下
create table test_smb_2(mid string,age_id string)
CLUSTERED BY(mid) SORTED BY(mid) INTO 500 BUCKETS;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

压缩配置优化

  • map端输出减少网络传输-是否可切分
  • shuffle过程减少网络传输-快
  • reduce端输出减少存储空间-压缩比

本地模式

  • 有些任务较小,无需使用mr进行处理,开启job的时间过长,也会影响整体的查询效率。Hive 可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
set hive.exec.mode.local.auto=true;
  • 1

JVM重用

  • 对于很难避免小文件的场景或者task特别多的场景,这类场景大多数执行时间都很短。
  • hadoop默认配置是使用派生JVM来执行map和reduce任务的,这时jvm的启动过程可能会造成相当大的开销,尤其是执行的job包含有成千上万个task任务的情况。
  • JVM重用可以使得JVM实例在同一个JOB中重新使用N次
  • JVM的一个缺点是,开启JVM重用将会一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡“的job中有几个reduce task 执行的时间要比其他reduce task消耗的时间多得多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。
set  mapred.job.reuse.jvm.num.tasks=10;
  • 1

推测执行

  • 缺点
    • Hadoop推测执行可能触发执行一些重复的任务。
    • 对重复的数据进行计算而导致消耗更多的计算资源。
-- 是否为Map Task打开推测执行机制,默认为true。
-- 如果为true,如果Map执行时间比较长,那么集群就会推测这个Map已经卡住了
-- ,会重新启动同样的Map进行并行的执行,哪个先执行完了,就采取哪个的结果来作为最终结果
-- ,一般直接关闭推测执行
set mapreduce.map.speculative=true;
-- 是否为Reduce Task打开推测执行机制,默认为true。
-- 如果reduce执行时间比较长,那么集群就会推测这个reduce已经卡住了
-- ,会重新启动同样的reduce进行并行的执行,哪个先执行完了
-- ,就采取哪个的结果来作为最终结果,一般直接关闭推测执行
set mapreduce.reduce.speculative=true;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

分桶

  • 分区进一步拆分成桶,获取更高的查询效率
  • 不能直接load进去,需要先开启分桶参数,再创建临时表,再insert

索引

  • 行组索引
    • 非等值连接
  • 布隆过滤器索引
    • 等值连接

小文件的处理

--是否和并Map输出文件,默认true
set hive.merge.mapfiles = true;
-- 在map-reduce的任务结束时(reduce端输出)合并小文件,默认为false 
set hive.merge.mapredfiles = true;  
-- 设置合并文件的大小,默认256000000字节
set hive.merge.size.per.task = 256000000;
--当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge,默认16000000字节
set hive.merge.smallfiles.avgsize=256000000;
-- orc的表同时需要下面两个,其它文件可以去掉
set hive.exec.orc.default.block.size=256000000;
set hive.merge.orcfile.stripe.level=false;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

GROUP BY和DISTINCT去重

  • GROUP BY:在进行去重的时候,会将数据分发到不同的reduce中,在每个reduce中分别去重后再输出结果
  • DISTINCT:在去重的时候会将数据分发到一个节点进行去重,可能会产生数据放大的问题,当然在count(distinct)的时候也会存在这样的问题,并且该过程只产生一个reduce任务。
  • 速度对比
    • 小数据量或者重复数据少时,DISTINCT在一定程度上处理速度会优于GROUP BY。
    • 数据量较大时,尽量不要使用DISTINCT方式去重,尤其是尽量不要使用COUNT(DISTINCT)方式处理数据。
  • 优化1
    • size(collect_set(id))
    • 风险
      • collect_set底层源码的数据结构为linkhashset,当存入海量数据的时候,linkhashset会占用超级大量的空间,导致数据处理速度很慢。最后使得数据占用资源的同时,效率也不见得能得到很大的提升,而且还有OOM的风险。
  • 优化2
    • row_number
    • 对需要处理的字段进行去重,再通过group by和count(if)筛选出每个去重字段所需要的数值进行数量累加,就可以计算出多个去重数据的数值。
    • 还可参考Hive–count(distinct)优化
  • 优化3
    • 先group by 小维度,再count distinct,或者sum

谓词下推

  • 解释
    • 在hive优化过程中,会尽可能的将where提前执行,以减少处理的数据量。具体可以参考前言的sql执行顺序。
  • 举例
    • Join(Inner Join),条件写在on后面(会谓词下推),还是where后面,性能上面没有区别;
    • 对于Left outer Join
-- 左侧的表写在where后面
select  *
from a
left join b
	on (a.id=b.id and b.name= 'zhangsan')
;

-- 右侧的表写在on后面
select  *
from a
left join b
	on a.id=b.id
where a.name= 'zhangsan'
;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 参数
set hive.optimize.ppd = true;
  • 1
  • 失效情况

    • Full outer Join都不会谓词下推;

    • 特殊函数可能会导致不进行谓词下推

select * 
from a 
join b 
	on a.id = b.id 
where a.dd =2019-10-09and a.create_time = unix_timestamp ()
;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

数据倾斜

参数调节
  • map端预聚合,相当于combiner
set hive.map.aggr=true;
  • 1
  • 启动两个map job,第一个随机分配map结果,局部聚合;第二个最终聚合
set hive.groupby.skewindata=true;
  • 1
  • 开启倾斜关联(运行时/编译时)、开启union的优化(避免二次读写),并设置判断key倾斜的阈值条数
set hive.optimize.skewjoin=true;
set hive.optimize.skewjoin.compiletime=true;
-- 使用hive.optimize.union.remove优化的时候必须设置mapred.input.dir.recursive=true。
set hive.optimize.union.remove=true;
set hive.skewjoin.key=100000;默认值100000
  • 1
  • 2
  • 3
  • 4
  • 5
SQL调节
  • join
    • 将分布均匀的表做为驱动表,做好列裁剪和过滤
    • 表小key集中
      • 小表先进内存
    • 表大也分桶了但是特殊值过多
      • 空值的key变字符串+随机数分配到不同的reduce中,null关联不上,不影响结果
  • group by–维度过小,某些值过多
  • count distinct–特殊值过多
    • 值为null的单独处理,在union

mapjoin的原理:

  • 在两个表做join操作的时候,一般会使用两个不通的mapper在join的key上排序然后生成临时文件,reduce使用这些文件作为input,做join操作。当一张表很大,另一张表很小的情况下,不够优。此时会有上千个mapper去读取大表的不同数据,同时这些mapper还需要去hdfs读取小表数据到本地内存,可能会引起性能瓶颈。
  • mapjoin的优化在于,在mapreduce task开始之前,创建一个local task, 小表以hshtable的形式加载到内存,然后序列化到磁盘,把内存的hashtable压缩为tar文件。然后把文件分发到 Hadoop Distributed Cache,然后传输给每一个mapper,mapper在本地反序列化文件并加载进内存再做join

mapreduce内存分配

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/718772
推荐阅读
相关标签
  

闽ICP备14008679号