当前位置:   article > 正文

Hive优化_hive sql 查询分区数据量大如何优化

hive sql 查询分区数据量大如何优化

1、本质

Hive 是一种基于 Hadoop 的数据仓库工具,所以本质上是使用hdfs和MapReduce进行计算。


2、问题来源

(1)数据倾斜

数据倾斜是指在处理大量数据时,数据被不均匀地分配到不同的节点上,导致某些节点工作负载过重,而其他节点则相对空闲。可以通过以下方式来应对:

  • 1.1 内容倾斜: 优化数据倾斜可以通过重新设计数据的分区和分桶策略来进行。例如,对于经常一起查询的数据,可以探索基于业务逻辑的自定义分区键。

  • 1.2 Group by优化: 优化GROUP BY操作时,可以使用DISTRIBUTE BY 和 SORT BY子句而不是ORDER BY,这样可以减少对单一 Reducer 的压力。

  • 1.3 小表 join 大表: 在执行小表与大表的JOIN操作时,可能会使用map-side join或者bucketed join来优化执行计划。map-side join可以将小表完全载入内存,减少数据通过网络传输的需要。

(2)过度Resources使用和其优化

过多的资源使用往往通过以下几个方面表现:

  • 2.1 Join过多: 过多的JOIN操作通常意味着多个MapReduce任务。优化这个问题通常需要重新设计查询或数据模型,减少JOIN的数量,或者使用subquery来减少MapReduce任务的分配。

  • 2.2 小文件过多: 大量小文件会导致Hadoop的NameNode压力过大,同时因为每个文件处理时都需要启动一个单独的Map任务。可通过合并小文件或调整数据生成策略来减少其数量。

  • 2.3 Mapper或Reducer过多: 调整MapReduce作业的配置参数,如mapreduce.job.mapsmapreduce.job.reduces来控制Map和Reduce的数量,可以减少资源消耗,提高作业执行效率。

(3)使用不当导致的性能问题

不当使用HiveQL也可能导致性能问题:

  • 3.1 COUNT(DISTINCT): 使用COUNT(DISTINCT)可能会导致性能下降,因为它需要大量的数据移动来进行去重计算。可以考虑拆分查询或使用近似计算方法如HyperLogLog。

  • 3.2 JOIN ... ON ... WHERE: 在JOIN之后立即使用WHERE子句可能会限制优化器的优化能力。更合理的做法是尽可能在JOIN之前进行过滤,减少join操作的数据量。

  • 3.3 SELECT SUM(field) FROM TABLE;: 如果表非常大,这种全表聚合将非常低效。可以通过增加适当的索引,调用map-side aggregation或在可能的情况下使用分区查询来优化此类查询。

3、解决方案

数据模型设计

目的:整体最优,考虑全局

数据建模:星型模型(主要)、雪花模型、星座模型建模

  • 由维度表(描述维度信息)和事实表(存储事实和度量)构成。其中事实表中的数据是动态的,用4W1H(谁、什么时候、在哪里、做什么、怎么做)来描述。
  • 在数据加载(从ODS到DWD层)过程中,可通过insert into ... select ... join ...的方式对源数据进行加工,建立维度表和事实表。

充分了解业务,提前设计好预聚合

分层=>轻量聚合,分区=>避免交换,分桶=>拉链表和抽样,压缩=>减少体量,压缩格式和表存储格式

hadoop内存管理

MapReduce参数
  1. mapreduce.map.memory.mb 和 mapreduce.reduce.memory.mb:

    • 这些参数设置Map和Reduce任务可以使用的最大内存量(单位为MB)。
    • mapreduce.map.memory.mb=256 指定每个Map任务可以使用256 MB内存。
    • mapreduce.reduce.memory.mb=512 指定每个Reduce任务可以使用512 MB内存。
    • 设置这些参数可以防止各个任务因内存不足而OOM(Out of Memory)。
  2. mapreduce.map.java.opts 和 mapreduce.reduce.java.opts:

    • 这些参数用于设置Map和Reduce任务的JVM启动选项。
    • 常用的设置包括JVM的初始和最大堆内存等。
    • 例如,mapreduce.map.java.opts=-Xmx200m 表明Map任务的JVM最大可以使用200 MB的堆内存(注意,这应该小于 mapreduce.map.memory.mb,因为还需要留一部分内存给任务执行中的非堆使用)。
    • Reduce任务同理,但通常会设置更高的值,因为Reduce任务在处理数据汇总和输出时,可能会需要更多的内存。
YARN参数
  1. yarn.nodemanager.resource.memory-mb:

    • 这是配置NodeManager管理的资源总量。如果设置为-1,则意味着不对其进行限制(不推荐)。通常应该设置为该节点物理内存的一个合理百分比。
  2. yarn.scheduler.minimum-allocation-mb 与 yarn.scheduler.maximum-allocation-mb:

    • yarn.scheduler.minimum-allocation-mb=1024 表示YARN调度器为每个容器可以分配的最小内存量设置为1024 MB。
    • yarn.scheduler.maximum-allocation-mb=8192 表示可分配给单个容器的最大内存量为8192 MB。
    • 这些参数的设置需要根据集群的具体内存容量和作业的需求来确定。它们设定了容器可能获取的内存范围,过小可能导致资源不足,过大则可能导致资源闲置。

数据倾斜处理与优化

Join操作数据倾斜
  • 手动处理方法
    • 连接键的选择:优选那些分布均匀的键作为连接键,可以显著减少数据倾斜问题。
    • 连接键拆分或随机映射:通过对连接键进行拆分或添加随机数映射,使得数据在各个Reducer间的分布更为均匀。
    • 引入hash分区或分桶:通过hash分区或使用分桶技术,可以控制数据如何分配到不同的分区或桶中,从而实现更均匀的数据分布。
    • 增加或减少任务的并行度:根据实际数据的处理需求和集群的能力,适当调整MapReduce作业的并行度,均衡各节点的负载。
  • 自动处理方法
    • set hive.optimize.skewjoin=true;:启用Hive针对倾斜数据的处理,自动优化Join操作中的数据分布。
    • set hive.skewjoin.key=100000;:设定触发倾斜优化的键值记录数阈值。超过此阈值的键会被视为倾斜,适用特殊处理。
    • set hive.skewjoin.mapjoin.map.tasks=10000;:指定用于处理数据倾斜的mapper数量。这个数值需要根据实际情况调整,以免产生过多小任务。
    • set hive.skewjoin.mapjoin.min.split=32M;:设置处理数据倾斜时,每个map任务处理的最小数据块大小,防止产生过多小任务增加调度和管理的开销。
Map Join优化
  • set hive.auto.convert.join=true;:自动将适合的join操作转换为map join,减少数据在网络中的传输。
  • set hive.mapjoin.smalltable.filesize=25M;:定义小表的最大大小,只有小于此大小的表才会被考虑用于map join,这是因为小表可以完整加载进内存。
  • set hive.optimize.bucketmapjoin=true;:启用对于分桶表的map join优化,当两个表都进行了相同方式的分桶时,可以实现更高效的连接操作。
Combiner优化
  • set hive.map.aggr=true;:允许在map阶段进行局部聚合,这可以显著减少传输到reduce阶段的数据量,从而减少网络传输和加速查询处理。
GroupBy数据倾斜优化
  • set hive.groupby.skewindata=true;:当启用时,Hive会在执行group by操作时自动调整处理策略,以应对数据倾斜问题。
数据倾斜的SQL手动优化技巧
  • 抽样统计:首先通过抽样了解数据是否存在倾斜,倾斜的程度,例如20%的键是否占用了80%的数据量。
  • 加盐处理:对倾斜严重的键增加随机前缀(即“加盐”),在reducer端进行初步聚集后再去除盐值,进行二次聚集,以此方式分散数据,减轻特定reducer的压力。

Map或Reduce输出过多小文件合并

在Hive作业执行过程中,输出过多的小文件会导致后续处理效率低下,因此合并这些小文件是提高效率的一个重要方面。

  • set hive.merge.mapfiles=true;:对于map-only任务的输出,默认启用文件合并机制。
  • set hive.merge.mapredfiles=true;:对于有reduce阶段的任务输出,控制是否合并输出文件。
  • set hive.merge.size.per.task=256M;:设置合并文件的操作阈值。只有当任务的输出数据量超过此阈值时,才会触发合并操作。
  • set hive.merge.smallfiles.avgsize=16M;:设置合并后每个文件的平均大小期望值。如果多个小文件合并后的平均大小小于该值,则会触发合并操作以形成较大的文件。

控制Mapper和Reducer数量

合理控制Mapper和Reducer的数量对于平衡任务负载、优化资源使用和提高作业执行效率至关重要。

Mapper

Mapper的数量通常由输入数据的量和HDFS块大小决定。合理调整以下参数可以有效控制Mapper的启动数量,避免过多的Mapper造成资源浪费:

  • set mapreduce.job.maps=2;:新版Hadoop中设置任务的Mapper数量。
  • set dfs.block.size=128M;:设置HDFS块的大小,默认128MB。
  • set mapred.max.split.size=256M;set mapred.min.split.size=1;:分别控制单个Mapper处理数据的最大和最小大小。
  • set mapred.min.split.size.per.node=1;默认单个节点处理的数据下限1字节
  • set mapred.min.split.size.per.rack=1;默认单个机架处理的数据下限1字节
  • set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;:合并多个小文件作为一个Mapper的输入,避免产生大量针对小文件的Mapper任务。
Reducer

Reducer的数量会直接影响到作业的处理速度和资源消耗。可通过调整以下参数,根据数据量和计算复杂性来合理设置Reducer的数量:

  • set mapreduce.job.reduces=n;:设置作业的Reducer数量。
  • set hive.exec.reducers.bytes.per.reducer=256M;:控制每个Reducer处理的数据量,进而影响Reducer的数量。
  • set hive.exec.reducers.max=1009;:设置单个作业中Reducer的最大数量限制。
当Reducer只能为1的情况

1. 直接使用聚合函数,但没有group by

  • 在某些情况下,如果查询只包含聚合操作(如sum, count, max等),而没有group by语句,Hive可能会默认使用单个Reducer来保证全局聚合的准确性。
  • 优化方案:可以通过在外层对内层查询的结果再次进行聚合的方式来优化此类查询。例如,select sum(sum_a) from (select sum(a) from A group by STH)T,这种方法允许内层查询在多个Reducer上并行执行,外层查询则在单个Reducer上完成最终的聚合,从而提高效率。

2. 使用了order by

  • 当查询中使用了order by语句时,为了保证全局排序的正确性,Hive通常会限制Reducer的数量为1。
  • 优化方案
    • 如果查询同时包含了group by,可以使用distribute by来在多个Reducer上分散数据,然后使用sort by来在每个Reducer内部进行局部排序。这种方法通过增加Reducer的数量来并行处理,从而提高查询性能。
    • set mapreduce.job.reduces=N; 允许用户指定Reducer数量以达到优化目的。
    • 示例:select * from (select * from A distribute by a sort by a)T order by a;,在这里,distribute bysort by结合使用可以实现更高效的排序处理。

3. 存在笛卡尔积

  • 查询中包含笛卡尔积操作时(尤其是小表与大表的笛卡尔积),往往会对性能产生重大影响,因为它会生成大量数据并且通常需要大量的计算资源。
  • 优化方案
    • 尽量避免使用笛卡尔积,特别是在不同大小的表之间。
    • 如果必须使用,可以考虑通过“加盐”的方式来优化。具体来说,对小表的每条记录生成多份副本,每份副本的“盐值”不同;同时,大表的每条记录也根据这些“盐值”进行扩展,并在join时使用这个扩展的“盐值”。
    • 关闭自动MapJoin以避免自动转化成MapJoin导致的性能问题:set hive.auto.convert.join=false
    • 通过set mapreduce.job.reduces=DN_COUNT;指定Reducer的数量,这里的DN_COUNT应该与小表扩展的副本数一致,以保证数据均匀分布到每个Reducer上。

调整存储格式

存储格式的选择对于数据如何在磁盘上存储、读写性能有重大影响。Hive支持多种文件格式,包括但不限于TextFile, ORC, Parquet, SequenceFile和RCFile。

  • set hive.default.fileformat=orc;:设置Hive表的默认存储格式为ORC(Optimized Row Columnar)。ORC格式特别适合于进行大量读操作的大数据应用,因为它支持高效的压缩和编码方式,提供了优秀的查询性能及更好的数据压缩比。

数据压缩设置

启用数据压缩可以减少存储空间需求,同时减少在Mapper和Reducer过程中数据传输的网络开销。

Mapper压缩
  • set mapreduce.map.output.compress=true;:开启Mapper阶段输出数据的压缩功能。当Map任务的输出作为Reduce任务的输入时,数据将先被压缩,有助于减少数据传输量。
  • set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;:设置使用Snappy压缩方式压缩Map阶段的输出。Snappy是一种快速的压缩解压框架,适用于大数据处理场景。
Reducer压缩
  • set hive.exec.compress.output=true;set mapreduce.output.fileoutputformat.compress=true;:启用Reducer输出压缩功能,可显著降低存储数据所需的磁盘空间。
  • set mapreduce.output.fileoutputformat.compress.type=BLOCK;:指定压缩方式为块压缩(BLOCK),与记录压缩(RECORD)相比,块压缩可以更好地利用磁盘空间和IO,提高读写性能。
  • set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;:使用Snappy作为最终的数据输出压缩方式。相较其他压缩方式,Snappy提供合理的压缩率和较好的性能,适用于对压缩解压速度有较高要求的场景。

动态分区 (Dynamic Partitioning)

动态分区是处理大型、分区化的数据集时极为有用的功能,特别是当数据需要根据某些字段来自动划分为多个分区时。Hive通过动态分区允许用户在执行插入操作时动态创建分区,这意味着用户无需预先知道所有可能的分区值。

  • set hive.exec.dynamic.partition=true;:这个设置启用了动态分区功能。
  • set hive.exec.dynamic.partition.mode=nonstrict;:在非严格模式下,Hive允许全部分区字段都可以动态插入,而不必指定至少一个分区作为静态分区。
  • set hive.exec.max.dynamic.partitions=1000; 和 set hive.exec.max.dynamic.partitions.pernode=100;:这些参数帮助控制动态分区操作时产生的分区数量,防止过多的分区导致的性能问题或是管理上的困难。

数据去重策略

在处理聚合查询时,去重是一个非常常见的需求。标准的COUNT(DISTINCT)操作在处理大数据集时可能会变得低效。使用子查询和分组可以作为一种更加高效的方式来避免数据的不必要扫描。

  • 示例:select count(b) from (select a,b from TAB group by a,b) group by a; 此查询通过先对a,b进行分组,然后再对外层的a进行计数,这样可以有效地减少处理的数据量,进而提高查询的性能。

基于成本的优化器 (CBO)

  • set hive.cbo.enable=true;:Hive中的CBO通过评估不同执行计划的成本来选择最优的执行路径。成本的评估依赖于表的统计信息,如数据分布、行数、数据大小等。因此,定期收集和更新统计信息对于CBO来说非常重要。

CBO尤其在处理复杂查询和大数据集时表现出其优化性能,能够极大提升查询效率、减少资源消耗。

分区裁剪 (Partition Pruning)

分区裁剪是查询优化中的一项关键技术。它允许系统只查询包含相关数据的分区,从而减少数据扫描量。对于存储在多重分区表中的数据,这种方法通过利用WHEREON子句中的条件直接定位到包含所需数据的分区,有效减少查询时间和资源消耗。

  • 当设置hive.execution.engine=tez时,Hive通过使用Tez引擎进一步优化分区裁剪,特别是在动态分区剪裁的场景下。

并行执行

Hive可以在没有数据依赖关系的情况下并行执行多个任务,这大大减少了作业执行时间。

  • set hive.exec.parallel=true; 允许Hive执行并行操作。
  • set hive.exec.parallel.thread.number=8; 定义了可以并行执行的最大任务数。

JVM重用

在Hive 3之前版本中,可以通过JVM重用来提高MapReduce作业的执行效率。尽管Hive 3取消了这一特性,但理解它的本质对于优化前版本的Hive作业仍然很有价值。通过重用JVM实例,Hive可以减少任务初始化和销毁的开销,从而提高执行效率。

本地化运算

对于小数据集,Hive可以配置为在单个节点上以本地模式执行任务,避免了在Hadoop集群中启动和管理任务的开销。

  • 设置mapreduce.job.reduces=01以及mapreduce.framework.name=local,可以将特定的作业在本地化模式下运行,当数据量小于设定的阈值时,这可以显著提高执行速度。
  • hive.exec.mode.local.auto=true 和相应的文件大小及数量限制配置,可以使Hive根据任务的特性自动选择是否本地化执行。

LLAP (Live Long and Process)

LLAP为长时间运行的守护进程服务,它大大加快了Hive查询的速度,特别是对于交互式和增量加载任务。

  • 设置hive.execution.mode=llap将使得部分或全部查询在LLAP模式下运行,可显著减少查询延迟。

Fetch任务优化

对于简单的查询,Hive可以直接从存储系统获取结果而无需通过MapReduce或Tez等框架处理,这显著降低了执行时间。

  • set hive.fetch.task.conversion=more; 表示开启更广泛的fetch任务优化,使得更多类型的查询可以直接获取数据。

谓词下推 (Predicate Pushdown)

谓词下推优化允许Hive将过滤操作尽可能早地应用于数据读取过程中,从而降低需要处理的数据量。在进行JOIN操作时,根据连接类型(如左连接、内连接等),Hive可以智能地将过滤条件下推,优化查询性能。

  • set hive.optimize.ppd=true; 开启谓词下推优化使得查询在数据读取阶段就尽可能地过滤掉不需要的记录,减少了之后操作的负担。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/976997
推荐阅读
相关标签
  

闽ICP备14008679号