赞
踩
explain hql
来查看执行计划set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
join
的效率:若join
的两张表有相同的列,且该列均进行了分桶,则join
时将相同值的桶进行join
操作即可,大大减少join
的数据量set hive.enforce.bucketing=true;
set hive.enforce.sorting=true; # 开启强制排序,插数据到表中会进行强制排序,默认false
set hive.optimize.skewjoin=true; # 如果是join过程中出现倾斜 应该设置为true
set hive.skewjoin.key=100000; # 这个是join的键对应的记录条数,超过这个值则会进行优化
set hive.group.skewindata=true; # 如果是group by过程出现倾斜,应该设置为true
set hive.groupby.mapaggr.checkinterval=100000; # 这个是group的键对应的记录条数超过这个值则会进行优化
MR
是以GroupBy
分组,再对distinct
列排序,然后输出交给Reduce
,所以,在reduce
之前,本地的map
已经完成预计算,并且提前做了一次聚合运算,如果此时的distinct
造成了数据不平衡,则reduce
时就会出现计算的数据量有大有小,即数据倾斜
group by
的子查询来替换count(distinct)
# 替换前 select count(distinct id) from tablename; select a, sum(b), count(distinct c), count(distinct d) from test group by a; # 替换后 select count(1) from (select id from tablename group by id) tmp; select a, sum(b) as b, count(c) as c, count(d) as d from( select a,0 as b, c, null as d from test group by a,c union all select a,0 as b, null as c, d from test group by a, d union all select a,b,null as c, null as d from test )tmp1 group by a; ```
null
, 0, 1, -1, -99等默认值):如果常见值的占比比较大时,较容易出现数据倾斜MR
从后往前构建数据Map
搞定,如果不是同列,就会新开一个MR
set hive.auto.convert.join=true; # hive.mapjoin.smalltable.filesize默认值是25mb,小表小于25mb自动启动mapjoin
# 优化前
select m.cid, u.id
from order m join customer u on m.cid = u.id
where m.dt='2018-06-08'
# 优化后: where条件放在map端,而不是reduce端
select m.cid, u.id
from (select cid from order where dt = '2018-06-08') m join customer u on m.cid = u.id;
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8;
set hive.exec.mode.local.auto=true;
join
满足如下条件才可用本地模式
hive.exec.mode.local.auto.inputbytes.max(默认128MB)
map
数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4)
reduce
数必须为0或1mapred.max.split.size
限制的大小决定
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
set hive.merge.smallfiles.avgsize=256000000; # 当输出文件平均大小小于该值,启动新job合并文件
set hive.merge.size.per.task=64000000; # 合并之后的文件大小
reduce
任务,一旦没有完成,则map
占用slot不会释放,其它作业需要等待set mapred.job.reuse.jvm.num.tasks=20;
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapred.output.compression.type=BLOCK;
map
个数,则设置mapred.map.tasks
为一个较大的值map
个数,则设置mapred.min.split.size
为一个较大的值size
巨大,但不是小文件:增大mapred.min.split.size
的值size
小于blockSize
:增大mapred.min.split.size
不可行,需要使用CombineFileInputFormat
将多个input path
合并成一个InputSplit
送给mapper
处理,从而减少mapper
的数量# 默认map个数
default_num = total_size / block_size;
# 期望大小
goal_num = mapred.map.tasks;
# 设置处理的文件大小
split_size=max(mapred.min.split.size, block_size)
split_num = total_size / split_size;
# 计算map个数
compute_map_num = min(split_num, max(default_num, goal_num)) ```
set hive.map.aggr=true;
mapred.map.tasks.speculative.execution # 默认为true
# 默认100M。map节点运行没完成时,若内存数据过多,该设置就是内存缓冲的大小,在shuffle之前该项定了map输出结果在内存占用buffer的大小,当buffer达到阈值,则启动后台线程对buffer内存sort,然后spill到硬盘
io.sort.mb
# 上面参数buffer的阈值,默认0.8(80%)
io.sort.spill.percent
# 默认值3。当spill数量不低于该值时,则combiner函数会在merge产生结果文件之间运行
min.num.spill.for.combine
# 默认10。当一个map task执行完成后,本地磁盘上有若干spill文件,map task最后一件事就是执行merge sort,执行时每次同时打开多个spill文件,同时打开的文件数量由该值决定。说明:打开的文件越多,不一定merge sort就越快,也要根据数据情况适当的调整
io.sort.factor
# 默认值0.05。io.sort.mb中用来保存map output记录边界的百分比,其他缓存用来保存数据
io.sort.record.percent
# 默认5,reduce copy数据的线程数
mapred.reduce.parallel.copies
# 默认 300(s)。reduce下载线程最大等待时间
mapred.reduce.copy.backoff
# 默认0.7(70%)。Reduce用来存放从Map节点取过来的数据所用的内存占堆内存的比例
mapred.job.shuffle.input.buffer.percent
# 默认值0。sort完成后reduce计算阶段用来缓存数据的百分比
mapred.job.reduce.input.buffer.percent
true
mapred.reduce.tasks.speculative.execution # hadoop
hive.mapred.reduce.tasks.speculative.execution # hive,二者效果一样,二选一即可
set mapred.reduce.tasks=10; # 直接设置
hive.exec.reducers.max # 默认999
hive.exec.reducers.bytes.per.reducer # 默认1G
numTasks = min(maxReducers, input.size / perReducer)
maxReducers = hive.exec.reducers.max
perReducer=hive.exec.reducers.bytes.per.reducer
# Hive 自动转换联接无条件(mapjoin)大小,默认20M hive.auto.convert.join.noconditionaltask.size # 小文件平均大小合并阈值。默认16M hive.merge.smallfiles.avgsize # Spark 执行程序最大 Java 堆栈大小。默认256M spark.executor.memory # Spark 驱动程序最大 Java 堆栈大小。默认256M spark.driver.memory # Spark 驱动程序内存开销。默认26M spark.yarn.driver.memoryOverhead # Spark 驱动程序内存开销。默认26M spark.yarn.executor.memoryOverhead
# 每个作业的 Reduce 任务的默认数量,默认1
mapreduce.job.reduces
# 容器内存。默认8G
yarn.nodemanager.resource.memory-mb
# 最大容器内存。默认64G
yarn.scheduler.maximum-allocation-mb
# 最大容器虚拟 CPU 内核数量。默认32
yarn.scheduler.maximum-allocation-vcores
MR
explain
select date, week, week_num, quarter, rank, pay_amt, pct_rank
from home_page_pay_rank
where year='2020'
1 STAGE DEPENDENCIES: 2 Stage-0 is a root stage 3 4 STAGE PLANS: 5 Stage: Stage-0 6 Fetch Operator 7 limit: -1 8 Processor Tree: 9 TableScan 10 alias: home_page_pay_rank 11 filterExpr: (year = '2020') (type: boolean) 12 Select Operator 13 expressions: date (type: string), week (type: string), week_num (type: string), quarter (type: string), rank (type: int), pay_amt (type: string), pct_rank (type: string) 14 outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 15 ListSink 16
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。