赞
踩
etl链路是解析kafka数据落到cos,这个时候会生成很多小文件,等一小时数据全部就绪,再将之前落盘的数据做合并后生成hive的小时级分区,这时才可以用这一小时的数据,这一小时内最早的数据从产生到可用几乎延迟2小时。
那我们如果想早一点查到前5分钟的数据,基于这条etl链路是做不到的,我们需要新起一条实时链路来直接消费kafka中的数据,那么整体链路就会变成这样
这样做存在的问题就是我们维护了两条相同的计算逻辑,一套在Spark,一套在Flink,每次改都需要改两个地方,维护成本高
数据库入hive每天都全量
湖格式将分区信息记录在了一个单独的表中,而不是仅仅依赖于存储目录,这样可以做到只有在任务完全跑成功后,才在元数据表中添加分区,可以避免只生成部分分区的问题。
例如Iceberg的原数据信息:
|file_path |file_format|partition |
-------------------------------------------------------------------------|-----------|---------------|
|action_logs/data/event_time_hour=2020-06-04-19/action=view/log1.parquet |PARQUET |[442027, view] |
|action_logs/data/event_time_hour=2020-06-04-19/action=click/log2.parquet|PARQUET |[442027, click]|
|action_logs/data/event_time_hour=2020-06-04-20/action=click/log3.parquet|PARQUET |[442028, click]|
上表中action_logs表的partition字段(event_time_hour,action),第一个文件的对应partition是[442027, view],即[event_time_hour=442027, action=“view”],其他文件对应的partition以此类推。
湖格式在重写数据的时候,并不是直接重写以前的目录,而是将新数据写入新的目录,从而避免重写数据导致读任务失败。
Iceberg的实现
写入引擎调用Iceberg的commit接口,Iceberg主要会做如下几个事情:
如下图是Iceberg文件的组织形式,在Catelog中记录了要指向的metadata file,metada file中记录了当前所有的版本snapshot,每个snapshot是独立的,一个snapshot对应着一个manifest list,这个list中包含了当前版本下所有提交的manifest file(多个list共用file),每个manifest file中存储着这次提交的data file。Iceberg的元数据相当于是有3层:manifest file -> manifest list -> metadata file。
Hudi的实现
Hudi upsert数据的流程如下:
二者对比
通过上文可以看到,在Iceberg中每次提交会生成新的snapshot,Hudi会每次生成新的Slice,那么在读的时候就可以指定要读取那个版本或者在什么时间以后提交的数据。
多版本的实现会和文件合并产生冲突,比如在Iceberg中,我们每次提交就会生成新manifest file,一个manifest file里又存储了这次提交的数据文件,如果提交频繁会产生很多小文件,所以必须定期将文件做合并,合并之后会生成新的manifest file,它可能是从之前多个manifest file生成出来的,然后再生成新的snapshot,这个时候就是一个新的版本。而之前的数据文件还不能删,因为可能有其他版本在用,所以必须定期清理过期的数据。
Hudi的slice同理。与Iceberg直接可以通过snapshot实现多版本不同的是,Hudi还需要一个时间轴来记录什么时候产生过提交以及影响的文件。
把对象存储上的热数据缓存到执行节点的SSD或者内存中,可以提高查询效率。在事务保证的机制下可以容易地确定哪些缓存数据是失效的。
试想,对于一个多级分区的大表来说,比如我们的点位日志表,一级分区是小时时间分区,二级分区是plat分区,有app和h5,三级分区是业务线定义的product分区,我们假设是10个,那么一天的分区个数就是480个。基于Metastore的partition方案,如果一个SQL想查这个表一天的数据的话,就需要向NameNode发480次list请求,如果是扫描一天或者一个月的数据,请求数就更夸张了。这样会导致两个问题,一方面是NameNode压力太大,另一方面是SQL请求响应延迟。我们写的Hive Sql经常会出现任务总执行时间是一小时,其中任务初始化就占了20多分钟。
在湖格式中,它在元数据表中存储了所有分区下的所有文件,这种就可以省去list操作。
min/max索引和布隆索引需要在读取出来文件后才能用。基于目前hive的功能,spark或者mr在启动任务切分task的时候,是基于文件级别切的。假设我们的日志数据是1T,在查询的point_name根据min/max索引过滤后只命中了10个文件,占1G,但是切文件的时候并不知道1G这个统计信息,它只知道总文件大小是1T,所以会产生 1T / 128MB = 8192个task,但其实只需要8个task。
在湖格式中,它在元数据表中存储了所有文件的文件统计信息以及列统计信息,包括文件大小、min/max索引和布隆过滤器,这样就可以在任务初始化生成task之前,就精准确定下来要读那些文件了。
上一节中提到的min/max索引是一个范围的查询,可能我们要查的key在很多文件中都满足[min, max]的范围,那我们怎么能精确的定位到这个key到底是在哪个文件中呢?这个需要模仿数据库的实现,就建立key到文件的索引(目前只有Hudi有)
建立key到文件的索引意味着我们需要把所有key存下来,那么在存储的时候就需要考虑读性能与索引代价的权衡,Hudi目前支持以下几种索引(用的最多的是BLOOM、BUCEKT、RECORD_INDEX):
BLOOM:使用record key来构建一个布隆过滤器,还可以支持min/max机制,从而使用record key的范围来减少候选文件。在每个分区内要求Key是唯一的。然后当有新key要插入的时候,通过元数据文件检索那些文件可以命中布隆过滤器,如果命中,再做真实查找,所以如果假阳性高的话,会导致查文件的次数变多。(字节实践中发现在5000亿条数据的情况下,假阳性严重拖垮任务)
参考:https://www.cnblogs.com/bytedata/p/15945254.html
GLOBAL_BLOOM:构建方式及min/max机制与BLOOM索引一样,区别在于GLOBAL BLOOM要求Key在整个表内唯一。
SIMPLE(default for Spark):Spark的默认索引。将已到达的Key存储在表中,新来的Key与历史Key做join。在每个分区内要求Key是唯一的。
GLOBAL_SIMPLE:与SIMPLE索引一样,区别在于GLOBAL SIMPLE要求Key在整个表内唯一。
HBASE:将索引存储在HBase表中,要求Key在整个表内唯一。
INMEMORY(default for Flink and Java):在Spark和Java程序中,使用内存中的hashmap;在Flink程序中,使用内存中的状态。
BUCKET:字节贡献的一个索引机制,它是一种基于哈希的索引,借鉴了数据库里的 Hash Index。给定 n 个桶, 用 Hash 函数决定某个记录属于哪个桶。最终所有分区被分成 N 个桶,每个桶对应一个 File Group。
相比较 Bloom Filter Index 来说,Hash Index 在逻辑层面提供了 Record Key 跟 File Group 的映射关系, 不存在假阳性问题。相同 key 的数据一定是落在同一个桶里面。
RECORD_INDEX:Hudi将元数据信息存到一个MOR表中,这个表的底层实现是HFile(数据文件和log文件都用HFile,而不是Parquet+Avro,可以提升查询效率),用来模拟HBASE来提供内置高效的索引服务。元数据表中存储record key和文件组的映射关系。Record index是一个全局索引,要求表中所有分区的key必须唯一。
索引 | 优点 | 缺点 |
---|---|---|
BLOOM | 轻量级,默认的索引方式,包含在数据文件的footer中,不依赖外部系统 | 可能发生假阳性,当假阳性高时,查询成本变大 |
SIMPLE | Spark默认,使用简单 | 当插入较为频繁时,join的次数太多,效率降低 |
HBASE | 对于小批次的keys,查询效率高 | 依赖外部系统,提升运维代价 |
INMEMORY | Flink默认,使用简单,在hashmap中存储查找效率也高 | 对内存的压力太大 |
BUCKET | 轻量级,spark和flink都支持,在大数据量下没有BLOOM假阳性的问题 | Shuffle数据量大 |
RECORD_INDEX | Hudi内置的高效的查询索引,可以替代HBASE | 按照官方所说,是综合性能做好的 需要把每个Key都存下来,存储占用大 |
Z-Order和Hilbert曲线,把二维数据映射到一维的一种思路
假设我们有数据
uid | score |
---|---|
1 | 10 |
1 | 20 |
1 | 30 |
2 | 10 |
2 | 20 |
2 | …很多行 |
3 | 10 |
3 | 20 |
3 | 30 |
4 | 20 |
按照上面的按照uid + score排序,我们在查询uid = 2 and score >= 10 and score <= 20
这样的语句时很有用,相同的uid已经放在一起了,但如果我们要查socre = 10 and uid >= 2 and uid <= 3
这样的语句时就不行了,因为相同的score间隔太远了;同理,如果按照score + uid做排序,如果查socre = 10 and uid >= 2 and uid <= 3
这样的语句很有用,但是如果查uid = 2 and score >= 10 and score <= 20
就不行了,因为相同的uid间隔太远了。那么有没有一种把相同的uid和相同的score都放的聚集的思路呢?就是把二维映射到一维做排序。
Z-order是一种折衷的排序算法,在用的时候需要考虑自己的场景
在2.1.1中,我们看到了湖格式会记录每次在提交后都会有相应的snapshot或者在时间轴上有记录,那么我们就可以通过这个点来实现增量式读取,比如读取从哪个版本以后的,或者是从哪个时间戳提交的以后的数据。
我目前的理解是在insert的场景下,Iceberg和Hudi都可以做到分钟级延迟,但是在upsert的场景下,Iceberg因为没有索引,会导致合并数据的成本很大,肯定无法做到分钟级延迟。
回到1.3中的架构图,我们会发现在实时场景下面,离线链路充当的作用是给实时数据做修数用的,修数的原因是可能有乱序问题导致实时数据算的不准,这种方案被称为Lambda架构。
可以看到Lambda其实就是一个重跑机制,为了保障数据的准确性我们需要维护两条链路,那么我们如果实时链路可以重跑的话,是不是可以把只保留一条链路呢?比如像是下图的这种,在kafka里保存历史数据,把离线链路砍掉,在重新运行的时候直接重跑实时程序,也可以实现Lambda架构的效果。参考:Kappa架构
但是这种存在问题:
因为上述的三个问题(主要是1和2)导致基于Kafka的流批一体没有做起来,现在有了湖格式,基于湖格式的增量读取,我们就可以实现一个近实时的数据拉取(近实时是因为对于insert数据源,它需要上游做commit才能落表;对于upsert数据源,既需要commit,又需要做merge)。如下图是目前主流的一种流批一体的实现
老架构:
新架构:
可以看到,上述的新架构中在存储层面实现了统一,但是在计算层面还是一套Flink、一套Spark,这是因为目前Flink对于批处理模式支持的还不算好,Flink今年的发布会也强调了后续要加强批处理模式的支持,争取做到业界一流的水平。Spark对于实时数据的处理不太好,但是我个人感觉在近实时的场景下,可以把Flink换成Spark Streaming。
特性 | Iceberg | Hudi | Paimon |
---|---|---|---|
Schema变更 | 支持 | 不支持 | 支持 |
Upsert | 支持,但是慢 | 支持且快 | 支持且快 |
对Flink集成度 | 一般 | 一般 | 好 |
存储成本 | 小 | 大 | 最大 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。