当前位置:   article > 正文

干货分享 | Hive调优小技巧_hive 调优,容器数量多

hive 调优,容器数量多

         Hive作为大数据领域常用的数据仓库组件,在设计和开发阶段需要注意效率。
         影响Hive效率的不仅仅是数据量过大;数据倾斜数据冗余job或I/O过多、MapReduce分配不合理等因素都对Hive的效率有影响。

对Hive的调优既包含对HiveQL语句本身的优化,也包含Hive配置项和MR方面的调整。

  1. 架构优化
  2. 参数优化
  3. SQL优化

一、架构优化

1.1执行引擎

Hive支持多种执行引擎,分别是 MapReduceTezSparkFlink。可以通过hivesite.xml文件中的hive.execution.engine属性控制。
Tez是一个构建于YARN之上的支持复杂的DAG(有向无环图)任务的数据处理框架。

由Hontonworks开源,将MapReduce的过程拆分成若干个子过程,同时可以把多个mapreduce任务组合成一个较大的DAG任务,减少了MapReduce之间的文件存储,同时合理组合其子过程从而大幅提升MR作业的性能

1.2 优化器

与关系型数据库类似,Hive会在真正执行计算之前,生成和优化逻辑执行计划与物理执行计划。

Hive有两种优化器:

  • Vectorize(矢量化优化器)
  • Cost-Based Optimization (CBO 成本优化器)。
1.2.1 矢量化查询执行

矢量化查询(要求执行引擎为Tez)执行通过一次批量执行1024行而不是每行一行来提高扫描,聚合,过滤器和连接等操作的性能,这个功能一显着缩短查询执行时间。

set hive.vectorized.execution.enabled = true;
-- 默认 false
set hive.vectorized.execution.reduce.enabled = true;
-- 默认 false
  • 1
  • 2
  • 3
  • 4

备注:要使用矢量化查询执行,必须用ORC格式存储数据

1.2.2 成本优化器

         Hive的CBO是基于apache Calcite的,Hive的CBO通过查询成本(有analyze收集的统计信息)会生成有效率的执行计划,最终会减少执行的时间和资源的利用,使用CBO
的配置如下:

SET hive.cbo.enable=true; --从 v0.14.0默认true
SET hive.compute.query.using.stats=true; -- 默认false
SET hive.stats.fetch.column.stats=true; -- 默认false
SET hive.stats.fetch.partition.stats=true; -- 默认true
  • 1
  • 2
  • 3
  • 4

定期执行表(analyze)的分析,分析后的数据放在元数据库中。

1.3分区表

         对于一张比较大的表,将其设计成分区表可以提升查询的性能,对于一个特定分区的查询,只会加载对应分区路径的文件数据,所以执行速度会比较快。

分区字段的选择是影响查询性能的重要因素,尽量避免层级较深的分区,这样会造成太多的子文件夹。一些常见的分区字段可以是:

  • 日期或时间
             如year、month、day或者hour,当表中存在时间或者日期字段时
  • 地理位置
             如国家、省份、城市等
  • 业务逻辑
             如部门、销售区域、客户等等

1.4 分桶表

与分区表类似,分桶表的组织方式是将HDFS上的文件分割成多个文件。

分桶可以加快数据采样,也可以提升join的性能(join的字段是分桶字段),因为分桶可以确保某个key对应的数据在一个特定的桶内(文件),巧妙地选择分桶字段可以大幅度提升join的性能。
通常情况下,分桶字段可以选择经常用在过滤操作或者join操作的字段。

分区与分桶区别

(1) 分区和分桶最大的区别就是分桶随机分割数据库,分区是非随机分割数据库。
         分桶是按照列的哈希函数进行分割的,相对比较平均
         分区是按照列的值来进行分割的,容易造成数据倾斜
(2) 其次两者的另一个区别
         就是分桶是对应不同的文件(细粒度)
         分区是对应不同的文件夹(粗粒度)

1.5文件格式

         存储格式一般需要根据业务进行选择,生产环境中绝大多数表都采用TextFileORCParquet存储格式之一。

         TextFile是最简单的存储格式,它是纯文本记录,也是Hive的默认格式。其磁盘开销大,查询效率低,更多的是作为跳板来使用。

RCFileORCParquet等格式的表都不能由文件直接导入数据,必须由TextFile来做中转。

ParquetORC都是Apache旗下的开源列式存储格式。列式存储比起传统的行式存储更适合批量OLAP查询,并且也支持更好的压缩编码

选择Parquet的原因主要是它支持Impala查询引擎,并且对updatedelete和事务性操作需求很低。

选择ORC,因为需要支持事务。

1.6 数据压缩

压缩技术可以减少map与reduce之间的数据传输,从而可以提升查询性能,关于压缩的配置可以在hive的命令行中或者hive-site.xml文件中进行配置。

SET hive.exec.compress.intermediate=true
  • 1

关于压缩的编码器可以通过mapred-site.xml, hive-site.xml进行配置,也可以通过命
令行进行配置,如:
 
-- 中间结果压缩
SET hive.intermediate.compression.codec=org.apache.hadoop.io.compre
ss.SnappyCodec ;
 
-- 输出结果压缩
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress.codec =
org.apache.hadoop.io.compress.SnappyCodc
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

二、参数优化

2.1本地模式(默认开启)

         当Hive处理的数据量较小时,启动分布式去处理数据会有点浪费,因为可能启动的时间比数据处理的时间还要长。
Hive支持将作业动态地转为本地模式,需要使用下面的配置:

SET hive.exec.mode.local.auto=true; -- 默认 false
SET hive.exec.mode.local.auto.inputbytes.max=50000000;
SET hive.exec.mode.local.auto.input.files.max=5; -- 默认 4
  • 1
  • 2
  • 3

备注

一个作业只要满足下面的条件,会启用本地模式

输入文件的大小小于hive.exec.mode.local.auto.inputbytes.max 配置的大小
map任务的数量小于hive.exec.mode.local.auto.input.files.max 配置的大小
reduce任务的数量是1或者0

2.2 严格模式

所谓严格模式,就是强制不允许用户执行3种有风险的HiveQL语句,一旦执行会直接失败。这3种语句是:

  • 查询分区表时不限定分区列的语句;
  • 两表join产生了笛卡尔积的语句;
  • 用order by来排序,但没有指定limit的语句。

要开启严格模式,需要将参数hive.mapred.mode 设为strict(缺省值)。
该参数可以不在参数文件中定义,在执行SQL之前设置(set hive.mapred.mode=nostrict )

2.3 JVM重用

         默认情况下,Hadoop会为为一个map或者reduce启动一个JVM,这样可以并行执行map和reduce。
当map或者reduce是那种仅运行几秒钟的轻量级作业时,JVM启动进程所耗费的时间会比作业执行的时间还要长。

Hadoop可以重用JVM,通过共享JVM以串行而非并行的方式运行map或者reduce。

JVM的重用适用于同一个作业的map和reduce,对于不同作业的task不能够共享JVM。如果要开启JVM重用,需要配置一个作业最大task数量,默认值为1,如果设置为-1,则表示不限制:

# 代表同一个MR job中顺序执行的5个task重复使用一个JVM,减少启动和关闭的开销
SET mapreduce.job.jvm.numtasks=5;
  • 1
  • 2

这个功能的缺点是,开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。

如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。

2.4 并行执行

Hive的查询通常会被转换成一系列的stage,这些stage之间并不是一直相互依赖的,可以并行执行这些stage,通过下面的方式进行配置:

SET hive.exec.parallel=true; -- 默认false
SET hive.exec.parallel.thread.number=16; -- 默认8
  • 1
  • 2

并行执行可以增加集群资源的利用率,如果集群的资源使用率已经很高了,那么并行执行的效果不会很明显。

2.5 推测执行(默认开启)

在分布式集群环境下,因为程序Bug、负载不均衡、资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任
务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。

为了避免这种情况发生,Hadoop采用了推测执行机制,它根据一定的规则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,

并最终选用最先成功运行完成任务的计算结果作为最终结果

set mapreduce.map.speculative=true
set mapreduce.reduce.speculative=true
set hive.mapred.reduce.tasks.speculative.execution=true
  • 1
  • 2
  • 3

2.6 合并小文件(默认开启)

  • 在map执行前合并小文件,减少map数
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
  • 1
  • 在Map-Reduce的任务结束时合并小文件
# 在 map-only 任务结束时合并小文件,默认true
SET hive.merge.mapfiles = true;
 
# 在 map-reduce 任务结束时合并小文件,默认false
SET hive.merge.mapredfiles = true;
 
# 合并文件的大小,默认256M
SET hive.merge.size.per.task = 268435456;
 
# 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
SET hive.merge.smallfiles.avgsize = 16777216;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

2.7 Fetch模式(默认开启)

Fetch模式是指Hive中对某些情况的查询可以不必使用MapReduce计算。selectcol1, col2 from tab ;

可以简单地读取表对应的存储目录下的文件,然后输出查询结果到控制台。在开启fetch模式之后,在全局查找、字段查找、limit查找等都不启动 MapReduce 。

set hive.fetch.task.conversion=more
  • 1

三、SQL优化

列裁剪和分区裁剪

列裁剪是在查询时只读取需要的列;分区裁剪就是只读取需要的分区。

简单的说:
select 中不要有多余的列,坚决避免 select * from tab;
查询分区表,不读多余的数据;

select uid, event_type, record_data
from calendar_record_log
where pt_date >= 20190201 and pt_date <= 20190224
and status = 0;
  • 1
  • 2
  • 3
  • 4

sort by 代替 order by

order by
将结果按某字段全局 排序,这会导致所有map 端数据都进入一个 reducer 中,在数据量大时可能会长时间 计算不完。
sort by
那么还是会视情况启动多个 reducer 进行排序,并且保证每个 reducer内局部有序。
为了控制 map 端数据分配到 reducer 的 key ,往往还要配合 distribute by 一同使用。如果不加 distribute by 的话, map 端数据就会随机分配到 reducer

group by 代替 count(distinct)

当要统计某一列的去重数时,如果数据量很大, count(distinct) 会非常慢。
原因与 order by类似, count(distinct) 逻辑只会有很少的 reducer 来处理。
此时可以用 group by 来改写

-- 原始SQL 
select count(distinct uid) from tab; 
 
-- 优化后的SQL 
select count(1) from (select uid from tab group by uid) tmp;
  • 1
  • 2
  • 3
  • 4
  • 5

这样写会启动两个 MR job (单纯 distinct 只会启动一个),所以要确保数据量大到启 动job 的 overhead 远小于计算耗时,才考虑这种方法。
当数据集很小或者 key 的倾斜 比较明显时,group by 还可能会比 distinct 慢。

group by 配置调整

map端预聚合

-- 默认为true 
set hive.map.aggr = true
 
--Map端进行聚合操作的条目数
set hive.groupby.mapaggr.checkinterval = 100000
 
设置map端预聚合的行数阈值,超过该值就会分拆job,默认值10W。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

倾斜均衡配置项

group by 时如果某些 key 对应的数据量过大,就会发生数据倾斜。 Hive 自带了一个均
衡数据倾斜的配置项 hive.groupby.skewindata ,默认值 false 。

set hive.groupby.skewindata=true;
  • 1

JSON基础优化

1、common join

普通连接,在 SQL 中不特殊指定连接方式使用的都是这种普通连接。
缺点:性能较差 ( 要将数据分区,有 shuffle)
优点:操作简单,普适性强

2、 map join

map 端连接,与普通连接的区别是这个连接中不会有 reduce 阶段存在,连接在 map 端完成
适用场景:大表与小表连接,小表数据量应该能够完全加载到内存,否则不适用
优点:在大小表连接时性能提升明显
缺点:使用范围较小,只针对大小表且小表能完全加载到内存中的情况。

map join 的配置项是 hive.auto.convert.join ,默认值 true 。
当 build table 大小小于 hive.mapjoin.smalltable.filesize 会启用 map join ,默认值25000000 (约 25MB )。
还有 hive.mapjoin.cache.numrows ,表示缓存 build table的多少行数据到内存,默认值 25000 。

3、bucket map join

分桶连接: Hive 建表的时候支持 hash 分区通过指定 clustered by (col_name,xxx ) into number_buckets buckets 关键字 . 当连接的两个表的 join key 就是 bucket column 的时候,
就可以通过设置 hive.optimize.bucketmapjoin= true 来执行优 化。
原理:通过两个表分桶在执行连接时会将小表的每个分桶映射成 hash 表,每个 task节点都需要这个小表的所有hash 表,但是在执行时只需要加载该 task 所持有大表分 桶对应的小表部分的hash 表就可以,
所以对内存的要求是能够加载小表中最大的 hash块即可。
注意点:小表与大表的分桶数量需要是倍数关系,这个是因为分桶策略决定的,分桶时会根据分桶字段对桶数取余后决定哪个桶的,所以要保证成倍数关系。
优点:比 map join 对内存的要求降低,能在逐行对比时减少数据计算量(不用比对小表全量)
缺点:只适用于分桶表

4 、倾斜均衡配置项

这个配置与 group by 的倾斜均衡配置项异曲同工,通过 hive.optimize.skewjoin 来配置,默认false 。
如果开启了,在 join 过程中 Hive 会将计数超过阈值 hive.skewjoin.key (默认 100000)的倾斜 key 对应的行临时写进文件中,然后再启动另一个 job 做 map join 生成结果。
通过 hive.skewjoin.mapjoin.map.tasks 参数还可以控制第二个 job 的 mapper数量,默认 10000 。

5、调整 Map 数

对于小文件采用的策略是合并,减少 Map 数。
对于复杂文件采用的策略是增加 Map 数。

set computeSliteSize(max(minSize, min(maxSize, blocksize))) = blocksize
minSize : mapred.min.split.size (默认值1)
maxSize : mapred.max.split.size (默认值256M)
调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。
建议用set的方式,针对SQL语句进行调整。

6、调整 Reduce 数

参数 hive.exec.reducers.bytes.per.reducer 用来设定每个 reducer 能够处 理的最大数据量,默认值256M
参数 hive.exec.reducers.max 用来设定每个 job 的最大 reducer 数量,默认值 999( 1.2 版本之前)或 1009 ( 1.2 版本之后)
即: min( 输入总数据量 / 256M, 1009)
reducer 数量与输出文件的数量相关。如果 reducer 数太多,会产生大量小文件,对
HDFS 造成压力。如果 reducer 数太少,每个 reducer 要处理很多数据,容易拖慢运行
时间或者造成 OOM 。

小结

优化可以从几个方面着手:

  • 好的模型设计,事半功倍
  • 解决数据倾斜问题。
    仅仅依靠参数解决数据倾斜,是通用的优化手段,收获有 限。开发人员应该熟悉业务,了解数据规律,通过业务逻辑解决数据倾斜往往更 可靠
  • 减少 job 数
  • 设置合理的map、reduce task数
  • 对小文件进行合并,是行之有效的提高Hive效率的方法
  • 优化把握整体,单一作业的优化不如整体最优

彩蛋

资源获取 获取Flink面试题,Spark面试题,程序员必备软件,hive面试题,Hadoop面试题,Docker面试题,简历模板等资源请去
GitHub自行下载 https://github.com/lhh2002/Framework-Of-BigData
Gitee 自行下载 https://gitee.com/li_hey_hey/dashboard/projects

扫码关注

大数据老哥
希望这篇文章可以帮到你~
记得点赞收藏哦
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/241066
推荐阅读
相关标签
  

闽ICP备14008679号