赞
踩
分而治之
)翻译官
)将HDFS文件映射成表中的数据
将SQL解析为MapReduce程序
(7) SELECT
(8) DISTINCT <select_list>
(1) FROM <left_table>
(3) <join_type> JOIN <right_table>
(2) ON <join_condition>
(4) WHERE <where_condition>
(5) GROUP BY <group_by_list>
(6) HAVING <having_condition>
(9) ORDER BY <order_by_condition>
(10) LIMIT <limit_number>
FROM (→WHERE )→ ON → JOIN (→WHERE )(→ SELECT)→ GROUP BY (→ SELECT)→ HAVING (→ SELECT) → DISTINCT → UNION → ORDER BY (→ SELECT)→ LIMIt
这里其实select 由于要为下一步输出做选择,其实比较特殊,不同写法会存在很多次不同的顺序中
where 由于谓词下推也会比较特殊,也可能会存在多次
所以有兴趣的可以进一步思考谓词下推,什么情况能下推,什么情况不会下推?
tips:这里只是初略看下,有个初步概念,详细的也可以查看这篇文章(会有一些差异,可以思考下怎么回事)
select 1 from 2 where 3 group by 4 having 5 order by 6 limit 7;
1→决定了结果有哪些列:要么是已存在的列,要么是函数生成的列,列的过滤
2→决定了读取数据的数据源
3→决定了对哪些行进行过滤
4→按照什么条件进行分组
5→分组以后对哪些行进行过滤
6→按照什么条件进行排序
7→限制输出
分区表
指定分区
临时表
列裁剪
(即只保留我们需要的列)join
过滤
再join推测执行
mapjoin
-- 是否自动转换为mapjoin set hive.auto.convert.join=true -- 小表的最大文件大小,默认为25000000,即25M set hive.mapjoin.smalltable.filesize=25000000 -- 是否将多个mapjoin合并为一个 set hive.auto.convert.join.noconditionaltask=true -- 合并mapjoin有啥好处呢? -- 因为每个mapjoin都要执行一次map,需要读写一次数据,所以多个mapjoin就要做多次的数据读写。 -- 合并mapjoin后只用读写一次,自然能大大加快速度。 -- 但是执行map是内存大小是有限制的,在一次map里对多个小表做mapjoin就必须把多个小表都加入内存 -- ,为了防止内存溢出,所以加了hive.auto.convert.join.noconditionaltask.size参数来做限制。 -- 不过,这个值只是限制输入的表文件的大小,并不代表实际mapjoin时hashtable的大小。 -- 多个mapjoin转换为1个时,所有小表的文件大小总和的最大值。 set hive.auto.convert.join.noconditionaltask.size=20971520 -- 使用mapjoin时,会先执行一个本地任务(mapreduce local task)将小表转成hashtable并序列化为文件再压缩 --,随后这些hashtable文件会被上传到hadoop缓存,提供给各个mapjoin使用。这里有三个参数我们需要注意: -- 将小表转成hashtable的本地任务的最大内存使用率,默认0.9 set hive.mapjoin.localtask.max.memory.usage=0.9 -- 如果mapjoin后面紧跟着一个group by任务,这种情况下 本地任务的最大内存使用率,默认是0.55 set hive.mapjoin.followby.gby.localtask.max.memory.usage=0.55 -- localtask每处理完多少行,就执行内存检查。默认为100000 set hive.mapjoin.check.memory.rows=100000
combine
参数可以执行前进行小文将合并(一般为默认)-- hive0.5开始就是默认值,执行map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
set mapred.min.split.size= 10000000;
set mapred.max.split.size= 256000000;
-- 一个节点上split的至少的大小 ,决定了多个data node上的文件是否需要合并
set mapred.min.split.size.per.node= 100000000;
-- 一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
set mapred.min.split.size.per.rack= 100000000;
set mapred.reduce.tasks=100;
desc formatted tmpdb.hzy_0701; -- numFiles为1
set hive.merge.mapredfiles = false; -- 关闭merge
set mapred.reduce.tasks=100; -- 设置reduce数量
DROP TABLE IF EXISTS tmpdb.hzy_0701_2;
create table tmpdb.hzy_0701_2 as
select * from tmpdb.hzy_0701
distribute by CAST(RAND() *100 AS INT)
;
desc formatted tmpdb.hzy_0701_2; -- numFiles为100
对于on中某些key过多,可以加前缀
空值的key变字符串+随机数
分配到不同的reduce中,null关联不上,不影响结果
map输出结果压缩
-- 开启中间压缩(map输出结果压缩)
set hive.exec.compress.intermediate = true;
压缩参数
,减少网络输出减少笛卡尔积的产生
skewjoin,先随机,再聚合
set hive.groupby.skewindata = true;
set hive.optimize.skewjoin=true;
set hive.optimize.skewjoin.compiletime=true;
-- 使用hive.optimize.union.remove优化的时候必须设置mapred.input.dir.recursive=true。
set hive.optimize.union.remove=true;
set hive.skewjoin.key=100000; -- 默认值100000。
select *
from log a
left join users b
on case when a.user_id is null
then concat('hive',rand())
else a.user_id
end = b.user_id
set hive.map.aggr = true;
-- 在map-reduce的任务结束时合并小文件
set hive.merge.mapredfiles = true;
-- 合并文件的大小,设置为块大小的两倍256m
set hive.merge.size.per.task = 256000000;
-- 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
set hive.merge.smallfiles.avgsize=128000000;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.dynamic.partitions.pernode=100000;
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8;
set mapreduce.map.memory.mb=8048;
-- mapreduce.map.java.opts一定要小于mapreduce.map.memory.mb
set mapreduce.map.java.opts=-Xmx8000m; -- 启动 JVM 虚拟机时,传递给虚拟机的启动参数,而默认值 -Xmx200m 表示这个 Java 程序可以使用的最大堆内存数,一旦超过这个大小,JVM 就会抛出 Out of Memory 异常,并终止进程。
set mapreduce.reduce.memory.mb=4096;
set mapred.job.priority = NORMAL;
set hive.optimize.bucketmapjoin = true;
-- 一个表的bucket数是另一个表bucket数的整数倍
-- bucket列 == join列
-- 必须是应用在map join的场景中
-- 如果表不是bucket的,则只是做普通join。
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
-- 小表的bucket数=大表bucket数
-- Bucket 列 == Join 列 == sort 列
-- 必须是应用在bucket mapjoin 的场景中
-- hive并不检查两个join的表是否已经做好bucket且sorted,需要用户自己去保证join的表
-- ,否则可能数据不正确。有两个办法
-- 1)hive.enforce.sorting 设置为true。
-- 2)手动生成符合条件的数据,通过在sql中用distributed c1 sort by c1 或者 cluster by c1
-- 表创建时必须是CLUSTERED且SORTED,如下
create table test_smb_2(mid string,age_id string)
CLUSTERED BY(mid) SORTED BY(mid) INTO 500 BUCKETS;
set hive.exec.mode.local.auto=true;
很难避免小文件的场景
或者task特别多
的场景,这类场景大多数执行时间都很短。JVM重用可以使得JVM实例在同一个JOB中重新使用N次
set mapred.job.reuse.jvm.num.tasks=10;
-- 是否为Map Task打开推测执行机制,默认为true。
-- 如果为true,如果Map执行时间比较长,那么集群就会推测这个Map已经卡住了
-- ,会重新启动同样的Map进行并行的执行,哪个先执行完了,就采取哪个的结果来作为最终结果
-- ,一般直接关闭推测执行
set mapreduce.map.speculative=true;
-- 是否为Reduce Task打开推测执行机制,默认为true。
-- 如果reduce执行时间比较长,那么集群就会推测这个reduce已经卡住了
-- ,会重新启动同样的reduce进行并行的执行,哪个先执行完了
-- ,就采取哪个的结果来作为最终结果,一般直接关闭推测执行
set mapreduce.reduce.speculative=true;
--是否和并Map输出文件,默认true
set hive.merge.mapfiles = true;
-- 在map-reduce的任务结束时(reduce端输出)合并小文件,默认为false
set hive.merge.mapredfiles = true;
-- 设置合并文件的大小,默认256000000字节
set hive.merge.size.per.task = 256000000;
--当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge,默认16000000字节
set hive.merge.smallfiles.avgsize=256000000;
-- orc的表同时需要下面两个,其它文件可以去掉
set hive.exec.orc.default.block.size=256000000;
set hive.merge.orcfile.stripe.level=false;
size(collect_set(id))
row_number
-- 左侧的表写在where后面
select *
from a
left join b
on (a.id=b.id and b.name= 'zhangsan')
;
-- 右侧的表写在on后面
select *
from a
left join b
on a.id=b.id
where a.name= 'zhangsan'
;
set hive.optimize.ppd = true;
失效情况
Full outer Join都不会谓词下推;
特殊函数可能会导致不进行谓词下推
select *
from a
join b
on a.id = b.id
where a.dd = ‘2019-10-09’
and a.create_time = unix_timestamp ()
;
map端预聚合,相当于combiner
set hive.map.aggr=true;
启动两个map job,第一个随机分配map结果,局部聚合;第二个最终聚合
set hive.groupby.skewindata=true;
开启倾斜关联
(运行时/编译时)、开启union的优化
(避免二次读写),并设置判断key倾斜的阈值条数set hive.optimize.skewjoin=true;
set hive.optimize.skewjoin.compiletime=true;
-- 使用hive.optimize.union.remove优化的时候必须设置mapred.input.dir.recursive=true。
set hive.optimize.union.remove=true;
set hive.skewjoin.key=100000;默认值100000。
做好列裁剪和过滤
空值的key变字符串+随机数分配到不同的reduce中,null关联不上,不影响结果
hshtable的形式加载到内存
,然后序列化到磁盘,把内存的hashtable压缩为tar文件。然后把文件分发到 Hadoop Distributed Cache
,然后传输给每一个mapper,mapper在本地反序列化文件并加载进内存再做joinCopyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。