当前位置:   article > 正文

hive(七) -- 拉链表、数据存储及优化配置_skew join适用场景

skew join适用场景

数据同步问题

Hive在实际工作中主要用于构建离线数据仓库,定期的从各种数据源中同步采集数据到Hive中,经过分层转换提供数据应用。比如每天需要从MySQL中同步最新的订单信息、用户信息、店铺信息等到数据仓库中,进行订单分析、用户分析。

例如:MySQL中有一张用户表:tb_user,每个用户注册完成以后,就会在用户表中新增该用户的信息.

由于每天都会有用户注册,产生新的用户信息,那么每天都需要将MySQL中的用户数据同步到Hive数据仓库中.

假如在1号已经在hive中创建了表并拉取了数据,但是在2号时MySQL中新增2条用户注册数据,并且有1条用户数据发生更新.

那么我们需要对2号的数据进行同步到hive中,新增的数据会直接加载到Hive表中,但是更新的数据如何存储在Hive表中?

方案一:直接覆盖

  1. 使用2号的数据 直接将1号的数据覆盖掉
  2. 优点:实现最简单,使用起来最方便
  3. 缺点:没有历史状态 想查询008之前的数据查看不到

方案二:根据日期构建一份全量的快照表

  1. 1号创建一张表拉取所有数据
  2. 2号再创建一张表拉取所有数据
  3. ... 每天都创建一张表
  4. 优点:记录了所有数据在不同时间的状态
  5. 缺点:冗余存储了很多没有发生变化的数据,导致存储的数据量过大

方案三:构建拉链表,通过时间标记发生变化的数据的每种状态的时间周期

拉链表的设计是将更新的数据进行状态记录,没有发生更新的数据不进行状态存储,用于存储所有数据在不同时间上的所有状态,通过时间进行标记每个状态的生命周期,查询时,根据需求可以获取指定时间范围状态的数据,默认用9999-12-31等最大值来表示最新状态。

4.2 拉链表实现原理

原理

  1. 增量采集变化数据,放入增量表中

  1. 将Hive中的拉链表与临时表的数据进行合并,合并结果写入临时表

  2. 将临时表的数据覆盖写入拉链表中

4.3 拉链表实现演示

创建拉链表

  1. -- 数据准备
  2. vi zipper.txt
  3. 001 186xxxx1234 laoda 0 sh 2021-01-01 9999-12-31
  4. 002 186xxxx1235 laoer 1 bj 2021-01-01 9999-12-31
  5. 003 186xxxx1236 laosan 0 sz 2021-01-01 9999-12-31
  6. 004 186xxxx1237 laosi 1 gz 2021-01-01 9999-12-31
  7. 005 186xxxx1238 laowu 0 sh 2021-01-01 9999-12-31
  8. 006 186xxxx1239 laoliu 1 bj 2021-01-01 9999-12-31
  9. 007 186xxxx1240 laoqi 0 sz 2021-01-01 9999-12-31
  10. 008 186xxxx1241 laoba 1 gz 2021-01-01 9999-12-31
  11. 009 186xxxx1242 laojiu 0 sh 2021-01-01 9999-12-31
  12. 010 186xxxx1243 laoshi 1 bj 2021-01-01 9999-12-31
  1. --创建拉链表
  2. create table dw_zipper
  3. (
  4. userid string,
  5. phone string,
  6. nick string,
  7. gender int,
  8. addr string,
  9. starttime string,
  10. endtime string
  11. ) row format delimited fields terminated by '\t';
  12. load data local inpath '/root/zipper.txt' into table dw_zipper;
  13. select * from dw_zipper;

创建增量表

  1. vi update.txt
  2. 008 186xxxx1241 laoba 1 sh 2021-01-02 9999-12-31
  3. 011 186xxxx1244 laoshi 1 jx 2021-01-02 9999-12-31
  4. 012 186xxxx1245 laoshi 0 zj 2021-01-02 9999-12-31
  1. create table ods_update
  2. (
  3. userid string,
  4. phone string,
  5. nick string,
  6. gender int,
  7. addr string,
  8. starttime string,
  9. endtime string
  10. ) row format delimited fields terminated by '\t';
  11. load data local inpath '/root/update.txt' overwrite into table ods_update;
  12. select * from ods_update;

创建临时表

  1. create table tmp_zipper
  2. (
  3. userid string,
  4. phone string,
  5. nick string,
  6. gender int,
  7. addr string,
  8. starttime string,
  9. endtime string
  10. ) row format delimited fields terminated by '\t';

合并数据到临时表

  1. insert overwrite table tmp_zipper
  2. select
  3. userid,
  4. phone,
  5. nick,
  6. gender,
  7. addr,
  8. starttime,
  9. endtime
  10. from ods_update
  11. union all
  12. --查询原来拉链表的所有数据,并将这次需要更新的数据的endTime更改为更新值的startTime
  13. select
  14. a.userid,
  15. a.phone,
  16. a.nick,
  17. a.gender,
  18. a.addr,
  19. a.starttime,
  20. --如果这条数据没有更新或者这条数据不是要更改的数据,就保留原来的值,否则就改为新数据的开始时间-1
  21. if(b.userid is null or a.endtime < '9999-12-31', a.endtime , date_sub(b.starttime,1)) as endtime
  22. from dw_zipper a left join ods_update b
  23. on a.userid = b.userid ;

覆盖拉链表数据

  1. insert overwrite table dw_zipper
  2. select * from tmp_zipper;

数据存储

Hive数据存储的本质还是HDFS,所有的数据读写都基于HDFS的文件来实现,为了提高对HDFS文件读写的性能,Hive中提供了多种文件存储格式:TextFile、SequenceFile、ORC、Parquet等。不同的文件存储格式具有不同的存储特点,有的可以降低存储空间,有的可以提高查询性能等,可以用来实现不同场景下的数据存储,以提高对于数据文件的读写效率。

5.1 列式存储和行式存储

要理解行式存储和列式存储以及他们之间的差异首先就得理解两种存储方式在结构上的差异.

举个例子,如下表所示为一张学生的学科表:

行式存储

列式存储

写操作差异

从他们的结构上可以看出,行存储在写入时比列存储要快〈eg:如果是机械硬盘,行存储时磁头在磁盘上只需要顺序写入,而列存储需要频繁的移动定位到下一个字段需要写入的地址,造成了时间上的开销)﹔同时由于行存储下表的数据是放在一起的,一次写入,所以数据的完整性可以确定;

读数据差异

1.查询某一列数据

  1. 假设我们查询sid列
  2. 行存储查询时会将课程名和年级一起查询出来,这样会造成数据冗余,如果列数比较多,影响比较大.
  3. 列存储是按照列来存储到一起的,每一列单独存放,查询起来方便很多,直接查询即可

2.查询所有数据

  1. 行存储查询所有数据直接查询就可以了
  2. 列存储,由于每列单独存放,先查询出来一列,然后在通过聚集运算,把列上每个数据拼接成行,没有行存储方便.

使用场景总结

5.2 主流文件格式特点

TextFile格式

  1. TextFile是Hive中默认的文件格式,存储形式为按行存储。工作中最常见的数据文件格式就是TextFile文件,几乎所有的原始数据生成都是TextFile格式.
  2. 建表时不指定存储格式即为textfile,导入数据时把数据文件拷贝至hdfs不进行处理。
  3. 优点
  4. 1.最简单的数据格式,不需要经过处理,可以直接cat查看
  5. 2.可以使用任意的分隔符进行分割
  6. 3.便于和其他工具(Pig, grep, sed, awk)共享数据
  7. 4.可以搭配Gzip、Bzip2、Snappy等压缩一起使用
  8. 缺点
  9. 1.耗费存储空间,I/O性能较低
  10. 2.结合压缩时Hive不进行数据切分合并,不能进行并行操作,查询效率低
  11. 3.按行存储,读取列的性能差
  12. 应用场景
  13. 1.适合于小量数据的存储查询
  14. 2.一般用于做第一层数据加载和测试使用

Parquet格式

  1. Parquet是一种支持嵌套结构的列式存储文件格式.是一种支持嵌套数据模型对的列式存储系统,作为大数据系统中OLAP查询的优化方案,它已经被多种查询引擎原生支持,并且部分高性能引擎将其作为默认的文件存储格式。通过数据编码和压缩,以及映射下推和谓词下推功能,Parquet的性能也较之其它文件格式有所提升。
  2. 优点
  3. 1.更高效的压缩和编码可压缩、可分割,优化磁盘利用率和I/O
  4. 2.可用于多种数据处理框架
  5. 缺点
  6. 1.不支持update, insert, delete, ACID
  7. 应用场景
  8. 1.适用于字段数非常多,无更新,只取部分列的查询

Orc格式

ORC(OptimizedRC File)文件格式也是一种Hadoop生态圈中的列式存储格式.它并不是一个单纯的列式存储格式,是首先根据行组分割整个表,在每一个行组内进行按列存储。.

  1. Index Data
  2. Index Data 包括每一列的最小值和最大值,以及每一列中的行位置。(也可以包含bit field or bloom filter ) 行索引项提供了偏移量,使您能够在解压缩块中查找正确的压缩块和字节。
  3. 注意,ORC索引仅用于选择stripes 和 row groups.
  4. 拥有相对频繁的行索引项可以在stripe 内跳过行,以便快速读取,尽管stripe 很大。默认情况下,可以跳过每10,000行。
  5. Row Data
  6. 存的是具体的数据,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个Stream来存储。
  7. Stripe Footer
  8. 存的是各个Stream的类型,长度等信息。
  9. File Footer
  10. 每个文件有一个File Footer,这里面存的是每个Stripe的行数,每个Column的数据类型信息等;
  11. PostScript
  12. 每个文件的尾部是一个PostScript,这里面记录了整个文件的压缩类型以及FileFooter的长度信息等。
  13. 在读取文件时,会seek到文件尾部读PostScript,从里面解析到File Footer长度,再读FileFooter,从里面解析到各个Stripe信息,再读各个Stripe,即从后往前读。
  1. 优点
  2. 1.列式存储,存储效率非常高
  3. 2.可压缩,高效的列存取
  4. 3.查询效率较高,支持索引
  5. 4.支持各种复杂的数据类型
  6. 5.支持矢量化查询
  7. 缺点
  8. 1.加载时性能消耗较大
  9. 2.需要通过text文件转化生成
  10. 3.读取全量数据时性能较差
  11. 应用场景
  12. 适用于Hive中大型的存储、查询

矢量化查询(了解)

  1. Hive的默认查询执行引擎一次处理一行,而矢量化查询执行是一种Hive针对ORC文件操作的特性,目的是按照每批1024行读取数据,并且一次性对整个记录整合(而不是对单条记录)应用操作,提升了像过滤, 联合, 聚合等等操作的性能。
  2. 注意:要使用矢量化查询执行,就必须以ORC格式存储数据。
  3. -- 开启矢量化查询
  4. set hive.vectorized.execution.enabled = true;
  5. set hive.vectorized.execution.reduce.enabled = true;

5.3 主流文件格式对比

**TextFile **

  1. create table test_textfile
  2. (
  3. stime string,
  4. userid string,
  5. keyword string,
  6. clickorder string,
  7. url string
  8. )row format delimited fields terminated by '\t';
  9. load data local inpath '/root/sogou.txt' into table test_textfile;
  10. select * from test_textfile limit 10;
  11. desc formatted test_textfile;

**ORC 列式存储 **

  1. create table test_orc
  2. (
  3. stime string,
  4. userid string,
  5. keyword string,
  6. clickorder string,
  7. url string
  8. )
  9. stored as ORC ;
  10. -- 不能使用load加载数据
  11. insert into table test_orc select * from test_textfile ;
  12. select * from test_orc limit 10;

parquet 列式存储

  1. create table test_parquet
  2. (
  3. stime string,
  4. userid string,
  5. keyword string,
  6. clickorder string,
  7. url string
  8. )
  9. stored as parquet ;
  10. -- 不能使用load加载数据
  11. insert into table test_parquet select * from test_textfile ;
  12. select * from test_parquet limit 10;
  13. desc formatted test_parquet;

  • 文本文件是默认格式 行式存储 不压缩 效率最低 18.1M

  • ORC 列式存储 压缩比高 效率高 2.7M

  • parquet 列式存储 压缩比比ORC低 效率和ORC相当 , 兼容性比ORC好 13.1M

执行效率ORC和parquet格式类似 , 但是ORC的压缩比更好 ,parquet 兼容性好 ,

5.4 数据压缩(了解)

Hive底层运行MapReduce程序时,磁盘I/O操作、网络数据传输、shuffle和merge要花大量的时间,尤其是数据规模很大和工作负载密集的情况下。鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助。Hive压缩实际上说的就是MapReduce的压缩。

  1. 压缩的优点
  2. 减小文件存储所占空间
  3. 加快文件传输效率,从而提高系统的处理速度
  4. 降低IO读写的次数
  5. 压缩的缺点
  6. 使用数据时需要先对文件解压,加重CPU负荷,压缩算法越复杂,解压时间越长

Hive中的压缩就是使用了Hadoop中的压缩实现的,所以Hadoop中支持的压缩在Hive中都可以直接使用。

要想在Hive中使用压缩,需要对MapReduce和Hive进行相应的配置

  1. 临时配置
  2. --配置MapReduce开启输出压缩及配置压缩类型
  3. --开启输出压缩
  4. set mapreduce.output.fileoutputformat.compress=true;
  5. --配置压缩类型为Snappy
  6. set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
  7. --配置Hive开启中间结果压缩和输出压缩及配置压缩类型
  8. -- 中间结果压缩
  9. set hive.exec.compress.intermediate=true;
  10. set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
  11. -- 输出结果压缩
  12. set hive.exec.compress.output=true;
  13. 永久配置
  14. 将以上MapReduce的配置写入mapred-site.xml中,重启Hadoop
  15. 将以上Hive的配置写入hive-site.xml中,重启Hive

压缩测试

TextFile压缩

  1. create table t_textfile_snappy
  2. stored as textfile
  3. as select * from test_textfile;

Orc压缩

  1. create table t_orc_snappy
  2. stored as orc tblproperties ("orc.compress"="SNAPPY")
  3. as select * from test_textfile;
  4. select * from t_orc_snappy limit 10;

Hive优化配置

Explain查询计划

explain命令可以帮助用户了解一条HQL语句在底层的实现过程。通俗来说就是Hive打算如何去做这件事。explain会解析HQL语句,将整个HQL语句的实现步骤、依赖关系、实现过程都会进行解析返回,可以了解一条HQL语句在底层是如何实现数据的查询及处理的过程,辅助用户对Hive进行优化。

官网:LanguageManual Explain - Apache Hive - Apache Software Foundation

explain  select * from test_textfile limit 10;

每个查询计划由以下几个部分组成

  • 抽象语法树(AST):Hive使用Antlr解析生成器,可以自动地将HQL生成为抽象语法树
  • Stage依赖关系:会列出运行查询划分的stage阶段以及之间的依赖关系
  • Stage内容:包含了每个stage非常重要的信息,比如运行时的operator和sort orders等具体的信息

explain  select stime,count(*) from test_textfile where stime > '00:00:00' group by  stime order by  stime limit  5;

Join优化

Hive Join的底层是通过MapReduce来实现的,Hive实现Join时,为了提高MapReduce的性能,提供了多种Join方案来实现;

例如适合小表Join大表的Map Join,大表Join大表的Reduce Join,以及大表Join的优化方案Bucket Join等。

Map Join

适用场景 : 小表join大表

原理

将小表的那份数据放到分布式缓存中,给每个MapTask的内存都放一份完整的数据,大的数据每个部分都可以与小数据的完整数据进行join,底层不需要经过shuffle,需要占用内存空间存放小的数据文件.

explain  select * from t_category left join t_product tp on t_category.cid = tp.cid;

使用

  1. 尽量使用Map Join来实现Join过程,Hive中默认自动开启了Map Join
  2. hive.auto.convert.join=true
  3. -- 如果大小表连接时小表数据小于该值,则自动开启mapjoin优化
  4. -- 2.0以前
  5. hive.smalltable.filesize or hive.mapjoin.smalltable.filesize: 默认值是25m
  6. -- 2.0以后
  7. hive.auto.convert.join.noconditionaltask.size :默认值10m
  8. hive.auto.convert.join.noconditionaltask: 默认值是true

Reduce Join

应用场景:适合于大表Join大表

原理:将两张表的数据在shuffle阶段利用shuffle的分组,将数据按照关联字段进行合并,必须经过shuffle,利用Shuffle过程中的分组来实现关联.

使用:Hive会自动判断是否满足Map Join,如果不满足Map Join,则自动执行Reduce Join

Bucket Join

应用场景:适合于大表Join大表

原理:将两张表按照相同的规则将数据划分,根据对应的规则的数据进行join,减少了比较次数,提高了性能

使用

  1. 使用Bucket Join
  2. 语法:clustered by colName
  3. 参数:set hive.optimize.bucketmapjoin = true;
  4. 要求:分桶字段 = Join字段 ,桶的个数相等或者成倍数
  1. 使用Sort Merge Bucket Join(SMB)
  2. 基于有序的数据Join
  3. 语法:clustered by colName sorted by (colName)
  4. 参数
  5. set hive.optimize.bucketmapjoin = true;
  6. set hive.auto.convert.sortmerge.join=true;
  7. set hive.optimize.bucketmapjoin.sortedmerge = true;
  8. set hive.auto.convert.sortmerge.join.noconditionaltask=true;
  9. 要求:分桶字段 = Join字段 = 排序字段 ,桶的个数相等或者成倍数

6.3 数据倾斜

分布式计算中最常见的,最容易遇到的问题就是数据倾斜;

现象:当提交运行一个程序时,这个程序的大多数的Task都已经运行结束了,只有某一个Task一直在运行,迟迟不能结束,导致整体的进度卡在99%或者100%,这时候就可以判定程序出现了数据倾斜的问题。

原因:数据分配不均衡

Group by、Count(distinct)造成数据倾斜

当程序中出现group by或者count(distinct)等分组聚合的场景时,如果数据本身是倾斜的,根据MapReduce的Hash分区规则,肯定会出现数据倾斜的现象。

根本原因是因为分区规则导致的,所以可以通过以下几种方案来解决group by导致的数据倾斜的问题

方案一:开启Map端聚合

  1. hive.map.aggr=true;
  2. 通过减少shuffle数据量和Reducer阶段的执行时间,避免每个Task数据差异过大导致数据倾斜
  3. 需要注意:求平均值不能使用 combiner

方案二:数据倾斜时自动负载均衡

  1. hive.groupby.skewindata=true;
  2. 开启该参数以后,当前程序会自动通过两个MapReduce来运行
  3. 第一个MapReduce自动进行随机分布到Reducer中,每个Reducer做部分聚合操作,输出结果
  4. 第二个MapReduce将上一步聚合的结果再按照业务(group by key)进行处理,保证相同的分布到一起,最终聚合得到结果
  5. 键随机数打散

Join造成数据倾斜

Join操作时,如果两张表比较大,无法实现Map Join,只能走Reduce Join,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题;

面对Join产生的数据倾斜,核心的思想是尽量避免Reduce Join的产生,优先使用Map Join来实现

但往往很多的Join场景不满足Map Join的需求,那么可以以下几种方案来解决Join产生的数据倾斜问题:

方案一:提前过滤,将大数据变成小数据,实现Map Join

  1. select a.id,a.value1,b.value2 from table1
  2. a join (select b.* from table2 b where b.ds>='20181201' and b.ds<'20190101') c
  3. on (a.id=c.id)

方案二:使用Bucket Join

  1. 方案二:使用Bucket Join
  2. 如果使用方案一,过滤后的数据依旧是一张大表,那么最后的Join依旧是一个Reduce Join
  3. 这种场景下,可以将两张表的数据构建为桶表,实现Bucket Map Join,避免数据倾斜.

方案三:使用Skew Join

  1. Skew Join是Hive中一种专门为了避免数据倾斜而设计的特殊的Join过程
  2. 这种Join的原理是将Map Join和Reduce Join进行合并,如果某个值出现了数据倾斜,就会将产生数据倾斜的数据单独使用Map Join来实现
  3. 其他没有产生数据倾斜的数据由Reduce Join来实现,这样就避免了Reduce Join中产生数据倾斜的问题
  4. 最终将Map Join的结果和Reduce Join的结果进行Union合并
  5. hive.optimize.skewjoin = true

原理

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/868012
推荐阅读
相关标签
  

闽ICP备14008679号