赞
踩
Hive 是一种基于 Hadoop 的数据仓库工具,所以本质上是使用hdfs和MapReduce进行计算。
数据倾斜是指在处理大量数据时,数据被不均匀地分配到不同的节点上,导致某些节点工作负载过重,而其他节点则相对空闲。可以通过以下方式来应对:
1.1 内容倾斜: 优化数据倾斜可以通过重新设计数据的分区和分桶策略来进行。例如,对于经常一起查询的数据,可以探索基于业务逻辑的自定义分区键。
1.2 Group by优化: 优化GROUP BY
操作时,可以使用DISTRIBUTE BY
和 SORT BY
子句而不是ORDER BY
,这样可以减少对单一 Reducer 的压力。
1.3 小表 join 大表: 在执行小表与大表的JOIN
操作时,可能会使用map-side join或者bucketed join来优化执行计划。map-side join可以将小表完全载入内存,减少数据通过网络传输的需要。
过多的资源使用往往通过以下几个方面表现:
2.1 Join过多: 过多的JOIN
操作通常意味着多个MapReduce任务。优化这个问题通常需要重新设计查询或数据模型,减少JOIN
的数量,或者使用subquery来减少MapReduce任务的分配。
2.2 小文件过多: 大量小文件会导致Hadoop的NameNode压力过大,同时因为每个文件处理时都需要启动一个单独的Map任务。可通过合并小文件或调整数据生成策略来减少其数量。
2.3 Mapper或Reducer过多: 调整MapReduce作业的配置参数,如mapreduce.job.maps
,mapreduce.job.reduces
来控制Map和Reduce的数量,可以减少资源消耗,提高作业执行效率。
不当使用HiveQL也可能导致性能问题:
3.1 COUNT(DISTINCT)
: 使用COUNT(DISTINCT)
可能会导致性能下降,因为它需要大量的数据移动来进行去重计算。可以考虑拆分查询或使用近似计算方法如HyperLogLog。
3.2 JOIN ... ON ... WHERE
: 在JOIN
之后立即使用WHERE
子句可能会限制优化器的优化能力。更合理的做法是尽可能在JOIN
之前进行过滤,减少join操作的数据量。
3.3 SELECT SUM(field) FROM TABLE;
: 如果表非常大,这种全表聚合将非常低效。可以通过增加适当的索引,调用map-side aggregation或在可能的情况下使用分区查询来优化此类查询。
目的:整体最优,考虑全局
数据建模:星型模型(主要)、雪花模型、星座模型建模
insert into ... select ... join ...
的方式对源数据进行加工,建立维度表和事实表。充分了解业务,提前设计好预聚合
分层=>轻量聚合,分区=>避免交换,分桶=>拉链表和抽样,压缩=>减少体量,压缩格式和表存储格式
mapreduce.map.memory.mb 和 mapreduce.reduce.memory.mb:
mapreduce.map.memory.mb=256
指定每个Map任务可以使用256 MB内存。mapreduce.reduce.memory.mb=512
指定每个Reduce任务可以使用512 MB内存。mapreduce.map.java.opts 和 mapreduce.reduce.java.opts:
mapreduce.map.java.opts=-Xmx200m
表明Map任务的JVM最大可以使用200 MB的堆内存(注意,这应该小于 mapreduce.map.memory.mb
,因为还需要留一部分内存给任务执行中的非堆使用)。yarn.nodemanager.resource.memory-mb:
yarn.scheduler.minimum-allocation-mb 与 yarn.scheduler.maximum-allocation-mb:
yarn.scheduler.minimum-allocation-mb=1024
表示YARN调度器为每个容器可以分配的最小内存量设置为1024 MB。yarn.scheduler.maximum-allocation-mb=8192
表示可分配给单个容器的最大内存量为8192 MB。set hive.optimize.skewjoin=true;
:启用Hive针对倾斜数据的处理,自动优化Join操作中的数据分布。set hive.skewjoin.key=100000;
:设定触发倾斜优化的键值记录数阈值。超过此阈值的键会被视为倾斜,适用特殊处理。set hive.skewjoin.mapjoin.map.tasks=10000;
:指定用于处理数据倾斜的mapper数量。这个数值需要根据实际情况调整,以免产生过多小任务。set hive.skewjoin.mapjoin.min.split=32M;
:设置处理数据倾斜时,每个map任务处理的最小数据块大小,防止产生过多小任务增加调度和管理的开销。set hive.auto.convert.join=true;
:自动将适合的join操作转换为map join,减少数据在网络中的传输。set hive.mapjoin.smalltable.filesize=25M;
:定义小表的最大大小,只有小于此大小的表才会被考虑用于map join,这是因为小表可以完整加载进内存。set hive.optimize.bucketmapjoin=true;
:启用对于分桶表的map join优化,当两个表都进行了相同方式的分桶时,可以实现更高效的连接操作。set hive.map.aggr=true;
:允许在map阶段进行局部聚合,这可以显著减少传输到reduce阶段的数据量,从而减少网络传输和加速查询处理。set hive.groupby.skewindata=true;
:当启用时,Hive会在执行group by操作时自动调整处理策略,以应对数据倾斜问题。在Hive作业执行过程中,输出过多的小文件会导致后续处理效率低下,因此合并这些小文件是提高效率的一个重要方面。
set hive.merge.mapfiles=true;
:对于map-only任务的输出,默认启用文件合并机制。set hive.merge.mapredfiles=true;
:对于有reduce阶段的任务输出,控制是否合并输出文件。set hive.merge.size.per.task=256M;
:设置合并文件的操作阈值。只有当任务的输出数据量超过此阈值时,才会触发合并操作。set hive.merge.smallfiles.avgsize=16M;
:设置合并后每个文件的平均大小期望值。如果多个小文件合并后的平均大小小于该值,则会触发合并操作以形成较大的文件。合理控制Mapper和Reducer的数量对于平衡任务负载、优化资源使用和提高作业执行效率至关重要。
Mapper的数量通常由输入数据的量和HDFS块大小决定。合理调整以下参数可以有效控制Mapper的启动数量,避免过多的Mapper造成资源浪费:
set mapreduce.job.maps=2;
:新版Hadoop中设置任务的Mapper数量。set dfs.block.size=128M;
:设置HDFS块的大小,默认128MB。set mapred.max.split.size=256M;
与set mapred.min.split.size=1;
:分别控制单个Mapper处理数据的最大和最小大小。set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
:合并多个小文件作为一个Mapper的输入,避免产生大量针对小文件的Mapper任务。Reducer的数量会直接影响到作业的处理速度和资源消耗。可通过调整以下参数,根据数据量和计算复杂性来合理设置Reducer的数量:
set mapreduce.job.reduces=n;
:设置作业的Reducer数量。set hive.exec.reducers.bytes.per.reducer=256M;
:控制每个Reducer处理的数据量,进而影响Reducer的数量。set hive.exec.reducers.max=1009;
:设置单个作业中Reducer的最大数量限制。1. 直接使用聚合函数,但没有group by
select sum(sum_a) from (select sum(a) from A group by STH)T
,这种方法允许内层查询在多个Reducer上并行执行,外层查询则在单个Reducer上完成最终的聚合,从而提高效率。2. 使用了order by
order by
语句时,为了保证全局排序的正确性,Hive通常会限制Reducer的数量为1。group by
,可以使用distribute by
来在多个Reducer上分散数据,然后使用sort by
来在每个Reducer内部进行局部排序。这种方法通过增加Reducer的数量来并行处理,从而提高查询性能。set mapreduce.job.reduces=N;
允许用户指定Reducer数量以达到优化目的。select * from (select * from A distribute by a sort by a)T order by a;
,在这里,distribute by
和sort by
结合使用可以实现更高效的排序处理。3. 存在笛卡尔积
set hive.auto.convert.join=false
。set mapreduce.job.reduces=DN_COUNT;
指定Reducer的数量,这里的DN_COUNT应该与小表扩展的副本数一致,以保证数据均匀分布到每个Reducer上。存储格式的选择对于数据如何在磁盘上存储、读写性能有重大影响。Hive支持多种文件格式,包括但不限于TextFile, ORC, Parquet, SequenceFile和RCFile。
set hive.default.fileformat=orc;
:设置Hive表的默认存储格式为ORC(Optimized Row Columnar)。ORC格式特别适合于进行大量读操作的大数据应用,因为它支持高效的压缩和编码方式,提供了优秀的查询性能及更好的数据压缩比。启用数据压缩可以减少存储空间需求,同时减少在Mapper和Reducer过程中数据传输的网络开销。
set mapreduce.map.output.compress=true;
:开启Mapper阶段输出数据的压缩功能。当Map任务的输出作为Reduce任务的输入时,数据将先被压缩,有助于减少数据传输量。set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
:设置使用Snappy压缩方式压缩Map阶段的输出。Snappy是一种快速的压缩解压框架,适用于大数据处理场景。set hive.exec.compress.output=true;
及set mapreduce.output.fileoutputformat.compress=true;
:启用Reducer输出压缩功能,可显著降低存储数据所需的磁盘空间。set mapreduce.output.fileoutputformat.compress.type=BLOCK;
:指定压缩方式为块压缩(BLOCK),与记录压缩(RECORD)相比,块压缩可以更好地利用磁盘空间和IO,提高读写性能。set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
:使用Snappy作为最终的数据输出压缩方式。相较其他压缩方式,Snappy提供合理的压缩率和较好的性能,适用于对压缩解压速度有较高要求的场景。动态分区是处理大型、分区化的数据集时极为有用的功能,特别是当数据需要根据某些字段来自动划分为多个分区时。Hive通过动态分区允许用户在执行插入操作时动态创建分区,这意味着用户无需预先知道所有可能的分区值。
set hive.exec.dynamic.partition=true;
:这个设置启用了动态分区功能。set hive.exec.dynamic.partition.mode=nonstrict;
:在非严格模式下,Hive允许全部分区字段都可以动态插入,而不必指定至少一个分区作为静态分区。set hive.exec.max.dynamic.partitions=1000;
和 set hive.exec.max.dynamic.partitions.pernode=100;
:这些参数帮助控制动态分区操作时产生的分区数量,防止过多的分区导致的性能问题或是管理上的困难。在处理聚合查询时,去重是一个非常常见的需求。标准的COUNT(DISTINCT)
操作在处理大数据集时可能会变得低效。使用子查询和分组可以作为一种更加高效的方式来避免数据的不必要扫描。
select count(b) from (select a,b from TAB group by a,b) group by a;
此查询通过先对a,b
进行分组,然后再对外层的a
进行计数,这样可以有效地减少处理的数据量,进而提高查询的性能。set hive.cbo.enable=true;
:Hive中的CBO通过评估不同执行计划的成本来选择最优的执行路径。成本的评估依赖于表的统计信息,如数据分布、行数、数据大小等。因此,定期收集和更新统计信息对于CBO来说非常重要。CBO尤其在处理复杂查询和大数据集时表现出其优化性能,能够极大提升查询效率、减少资源消耗。
分区裁剪是查询优化中的一项关键技术。它允许系统只查询包含相关数据的分区,从而减少数据扫描量。对于存储在多重分区表中的数据,这种方法通过利用WHERE
和ON
子句中的条件直接定位到包含所需数据的分区,有效减少查询时间和资源消耗。
hive.execution.engine=tez
时,Hive通过使用Tez引擎进一步优化分区裁剪,特别是在动态分区剪裁的场景下。Hive可以在没有数据依赖关系的情况下并行执行多个任务,这大大减少了作业执行时间。
set hive.exec.parallel=true;
允许Hive执行并行操作。set hive.exec.parallel.thread.number=8;
定义了可以并行执行的最大任务数。在Hive 3之前版本中,可以通过JVM重用来提高MapReduce作业的执行效率。尽管Hive 3取消了这一特性,但理解它的本质对于优化前版本的Hive作业仍然很有价值。通过重用JVM实例,Hive可以减少任务初始化和销毁的开销,从而提高执行效率。
对于小数据集,Hive可以配置为在单个节点上以本地模式执行任务,避免了在Hadoop集群中启动和管理任务的开销。
mapreduce.job.reduces=0
或1
以及mapreduce.framework.name=local
,可以将特定的作业在本地化模式下运行,当数据量小于设定的阈值时,这可以显著提高执行速度。hive.exec.mode.local.auto=true
和相应的文件大小及数量限制配置,可以使Hive根据任务的特性自动选择是否本地化执行。LLAP为长时间运行的守护进程服务,它大大加快了Hive查询的速度,特别是对于交互式和增量加载任务。
hive.execution.mode=llap
将使得部分或全部查询在LLAP模式下运行,可显著减少查询延迟。对于简单的查询,Hive可以直接从存储系统获取结果而无需通过MapReduce或Tez等框架处理,这显著降低了执行时间。
set hive.fetch.task.conversion=more;
表示开启更广泛的fetch任务优化,使得更多类型的查询可以直接获取数据。谓词下推优化允许Hive将过滤操作尽可能早地应用于数据读取过程中,从而降低需要处理的数据量。在进行JOIN操作时,根据连接类型(如左连接、内连接等),Hive可以智能地将过滤条件下推,优化查询性能。
set hive.optimize.ppd=true;
开启谓词下推优化使得查询在数据读取阶段就尽可能地过滤掉不需要的记录,减少了之后操作的负担。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。