赞
踩
Flink 社区希望能够将 Flink 的 Streaming 实时计算能力和 Lakehouse 新架构优势进一步结合,推出新一代的 Streaming Lakehouse 技术,促进数据在数据湖上真正实时流动起来,并为用户提供实时离线一体化的开发体验。Flink 社区内部孵化了 Flink Table Store (简称 FTS )子项目,一个真正面向 Streaming 以及 Realtime的数据湖存储项目。2023年3月12日,FTS进入 Apache 软件基金会 (ASF) 的孵化器,改名为 Apache Paimon (incubating)。
Apache Paimon (incubating) 是一项流式数据湖存储技术,可以为用户提供高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。Paimon 采用开放的数据格式和技术理念,可以与 Apache Flink / Spark / Trino 等诸多业界主流计算引擎进行对接,共同推进 Streaming Lakehouse 架构的普及和发展。
Apache Paimon的官方网站是:Apache Paimon
整体架构:
读/写:Paimon 支持多种读/写数据和执行 OLAP 查询的方式。
(1)对于读取,它支持以下方式消费数据
从历史快照(批处理模式)、从最新的偏移量(在流模式下),或以混合方式读取增量快照。
(2)对于写入,它支持来自数据库变更日志(CDC)的流式同步或来自离线数据的批量插入/覆盖。
生态:除了Apache Flink之外,Paimon还支持Apache Hive、Apache Spark、Trino等其他计算引擎的读取。
底层:Paimon 将列式文件存储在文件系统/对象存储上,并使用 LSM 树结构来支持大量数据更新和高性能查询。
统一存储:
对于 Apache Flink 这样的流引擎,通常有三种类型的连接器:
Paimon 提供表抽象。它的使用方式与传统数据库没有什么区别:
核心特性
基本概念:
Snapshot:
快照捕获表在某个时间点的状态。用户可以通过最新的快照来访问表的最新数据。通过时间旅行,用户还可以通过较早的快照访问表的先前状态。
Partition:
Paimon 采用与 Apache Hive 相同的分区概念来分离数据。分区是一种可选方法,可根据日期、城市和部门等特定列的值将表划分为相关部分。每个表可以有一个或多个分区键来标识特定分区。通过分区,用户可以高效地操作表中的一片记录。如果定义了主键,则分区键必须是主键的子集。
Bucket:
未分区表或分区表中的分区被细分为存储桶,以便为可用于更有效查询的数据提供额外的结构。桶的范围由记录中的一列或多列的哈希值确定。用户可以通过提供bucket-key选项来指定分桶列。如果未指定bucket-key选项,则主键(如果已定义)或完整记录将用作存储桶键。桶是读写的最小存储单元,因此桶的数量限制了最大处理并行度。不过这个数字不应该太大,因为它会导致大量小文件和低读取性能。一般来说,建议每个桶的数据大小为1GB左右。
Consistency Guarantees一致性保证
Paimon writer使用两阶段提交协议以原子方式将一批记录提交到表中。每次提交在提交时最多生成两个快照。对于任意两个同时修改表的writer,只要他们不修改同一个存储桶,他们的提交都是可序列化的。如果他们修改同一个存储桶,则仅保证快照隔离。也就是说,最终表状态可能是两次提交的混合,但不会丢失任何更改。
Paimon表格式:
文件布局:
一张表的所有文件都存储在一个基本目录下。Paimon 文件以分层方式组织。从快照文件开始,Paimon 可以递归地访问表中的所有记录。
其中,清单列表(manifest list)是清单文件名(manifest file)的列表。清单文件(manifest file)是包含有关 LSM 数据文件和更改日志文件的文件信息。例如对应快照中创建了哪个LSM数据文件、删除了哪个文件。
Data Files:数据文件按分区和存储桶分组。每个存储桶目录都包含一个 LSM 树及其变更日志文件。目前,Paimon 支持使用 orc(默认)、parquet 和 avro 作为数据文件格式。
LSM Trees
Paimon 采用 LSM 树(日志结构合并树)作为文件存储的数据结构。
Sorted Runs
LSM 树将文件组织成多个Sorted Run。Sorted Run由一个或多个数据文件组成,并且每个数据文件恰好属于一个Sorted Run。数据文件中的记录按其主键排序。在Sorted Run中,数据文件的主键范围永远不会重叠。
不同的Sorted Run可能具有重叠的主键范围,甚至可能包含相同的主键。查询LSM树时,必须合并所有Sorted Run,并且必须根据用户指定的合并引擎和每条记录的时间戳来合并具有相同主键的所有记录。
写入LSM树的新记录将首先缓存在内存中。当内存缓冲区满时,内存中的所有记录将被排序并刷新到磁盘。
Compaction
当越来越多的记录写入LSM树时,Sorted Run的数量将会增加。由于查询LSM树需要将所有Sorted Run合并起来,太多Sorted Run将导致查询性能较差,甚至内存不足。
为了限制Sorted Run的数量,我们必须偶尔将多个Sorted Run合并为一个大的Sorted Run。这个过程称为Compaction。
然而,Compaction是一个资源密集型过程,会消耗一定的CPU时间和磁盘IO,因此过于频繁的Compaction可能会导致写入速度变慢。这是查询和写入性能之间的权衡。Paimon 目前采用了类似于 Rocksdb 通用压缩的Compaction策略。
默认情况下,当Paimon将记录追加到LSM树时,它也会根据需要执行Compaction。用户还可以选择在“专用Compaction作业”中独立执行所有Compaction。
Paimon 创新地结合了 湖存储 + LSM + 列式格式(ORC, Parquet),为湖存储带来大规模实时更新能力。Paimon 的 LSM 的文件组织结构如下:
LSM 是一个面向写友好的格式,它在写入的时候可以看到整个流程,但它不用理解具体的流程。大致的思路是,写入发生在 Flink Sink 中,当检查点到达时,它会对内存中的数据进行排序,并将记录刷新到 Level0 文件中。
得益于 LSM 这种原生异步的 Minor Compaction,它可以通过异步 Compaction 落到最下层,也可以在上层就发生一些 Minor 的 Compaction 和 Minor 的合并,这样压缩之后它可以保持 LSM 不会有太多的 level。保证了读取 merge read 的性能,且不会带来很大的写放大。
另外,Flink Sink 会自动清理过期的快照和文件,还可以配置分区的清理策略。所以整个 Paimon 提供了吞吐大的 Append 写,消耗低的局部 Compaction,全自动的清理以及有序的合并。所以它的写吞吐很大,merge read 不会太慢。
阿里云智能开源表存储负责人,Founder of Paimon,Flink PMC 成员李劲松在云栖大会开源大数据专场的分享中对Paimon和Hudi的性能比较进行了详细的阐述。
在阿里云上测试 Apache Paimon 和 Hudi 的性能,测试湖存储的 MergeOnRead 的更新性能,可以看到左边是大致是 5 亿条数据入湖,按照类似的配置、相同的索引来入湖,我们来评估 5 亿条入湖需要多少时间。经过测试发现 Paimon 入湖的过程中,吞吐或者耗时能达到 Hudi 的 4 倍,但是查询相同的数据,发现 Paimon 的查询性能是 Hudi 的 10 倍甚至 20 倍,Hudi 还会碰到因内存变小而无法读取的情况。
为什么呢?分析其原因,Hudi MOR 是纯 Append,虽然后台有 Compaction,但是完全不等 Compaction。所以在测试中 Hudi 的 Compaction 只做了一点点,读取的时候性能特别差。
基于这点,又做了右边的 benchmark,就是 1 亿条数据的 CopyOnWrite,来测试合并性能,测试 CopyOnWrite 情况下的 Compaction 性能。测试的结果是发现不管是 2 分钟、1 分钟还是 30 秒,Paimon 性能都是大幅领先的,是 12 倍的性能差距。在 30 秒的时候,Hudi 跑不出来,Paimon 还是能比较正常地跑出来。(Checkpoint 到 10s 后,Paimon 也跑不出来了)
总结来看,Paimon 到底能做到什么?
第一,低延时、低成本的流式数据湖。如果你有用过 Hudi,我们希望你替换到 Paimon 之后以 1/3 的资源来运行它。
第二,使用简单、入湖简单、开发效率高。可以轻松地把数据库的数据以 CDC 的方式同步到数据湖 Paimon 中。
第三,与 Flink 集成强大,数据流起来。
Flink+Paimon 的流式 CDC 更新,只要定义 Paimon 的主键表,不分区。它的定义就非常像 MySQL 表的定义。
通过 Flink CDC、Flink 作业把 CDC 数据全增量一体到 Paimon 中就够了,就可以实时看到这张表的状态,并且实时地查到这张表。数据被实时的同步,但是离线数仓是需要每天的 View,Paimon 要提供 Tag 技术。今天打了一个 Tag 就记住了今天的状态,每次读到这个 Tag 都是相同的数据,这个状态是不可变的。所以通过 Tag 技术能等同取代 Hive 全量表分区的作用,Flink、Spark 可以通过 Time Travel 的语法访问到 Tag 的数据。
传统的 Hive 表那是分区表,Hive SQL 也没有 Time Travel 的语义,怎么办?在 Paimon 中也提供了 Tag 映射成 Hive 分区表的能力,还是可以在 Hive SQL 中通过分区查询,查询多天的数据。Hive SQL 是完全兼容一行不改的状态来查询到 Paimon 的组件表,所以经过这样的架构改造之后,你可以看到整个数据分钟级实时可见,各整个全增量一体化,存储是复用,比较简单稳定而且一键同步,这里不管是存储成本还是计算成本都可以大幅降低。
存储成本通过 Paimon 的文件复用机制,你会发现打十天的 Tag 其实存储成本只有一两天的全量成本,所以保留 100 天的分区,最后存储成本可以达到 50 倍的节省。
在计算成本上虽然需要维护 24 小时都在跑的流作业,但是你可以通过 Paimon 的异步 Compaction 的方式,尽可能地缩小同步的资源消耗,甚至 Paimon 也提供整库同步的类似功能给到你,可以通过一个作业同步上百张或者几百张表。所以整个链路能做到三低:时延低、成本低和链路复杂度低。
同样,Hudi、Iceberg 也能流读,为实现更好的性能,Paimon在数据流读上做了大量的工作,主要分为两个方面:
Consumer 机制
在流读中经常碰到非常头疼的东西就是 FileNotFoundException,这个机制是什么样的呢?在数据产出过程当中,需要不断地产生 Snapshot。太多的 Snapshot 会导致大量的文件、导致数据存储非常地冗余,所以需要有 Snapshot 的清理机制。但是另外流读的作业可不知道这些,万一正在流读的 Snapshot 被 Snapshot Expiration 给删了,那不就会出现 FileNotFoundException,怎么办?而且更为严重的是,流读作业可能会 Failover,万一它挂了 2 个小时,重新恢复后,它正在流读的 Snapshot 已经被删除了,再也恢复不了。
所以 Paimon 在这里提出了 Consumer 机制。Consumer 机制就是在 Paimon 里用了这个机制之后,会在文件系统中记一个进度,当我再读这个 Snapshot,Expiration 就不会删这个 Snapshot,它能保证这个流读的安全,也能做到像类似 Kafka Group Id 流读进度的保存。重启一个作业无状态恢复还是这个进度。所以 Consumer 机制可以说是流读的基本机制。
Changelog 生成
假设有这样一张 Paimon 的 PK 表,Key 是名字,Value 是 Count,上游在不断地流写,下游在不断地流读。流写可能会同一个组件写相同的数据,比如说先前写的 Jason 是 1,后面又写一个 Jason 是 2。你会发现流读的作业在做一个正确流处理的时候,比如说做一个 sum,sum 结果应该是 2 还是 3,如果没有这个 Changelog 的生成就不知道这是同一个主键,我要先把 Jason -> 1 给 retract 掉,再写 Jason -> 2。所以这里也对我们湖存储本身要表现得像一个数据库生成 Binlog 的方式,下游的流读计算才能更好、更准确。
Changelog 生成有哪些技术呢?在 Flink 实时流计算中,大家如果写过作业的话,也可能写过大量用 State 的方式来去重。但是这样的方式 State 的成本比较高,而且数据会存储多份,一致性也很难保障。或者你可以通过全量合并的方式,比如说 Delta、Hudi、Paimon 都提供了这样的方式,可以在全量合并的时候生成对应的 Changelog,这个可以,但是每次生成 Changelog 都需要全量合并,这个代价也会非常大。
Lookup
Paimon 这边独有的方式,它有 Changelog-Producer=lookup,因为它是 LSM。LSM 是有点查的能力,所以你可以配置这样一个点查的方式在写入的时候能通过批量高效率的点查生成对应的 Changelog 让下游的流处理能够正确地流处理。
一个准实时流式湖仓的架构如下。通过 Flink 实时入湖入到 ODS 层 Paimon 表,通过流式流起来流到 DWD,再流到 DWS,再到 ADS,这样一整套完整的流式湖仓。
基于实时计算Flink版和流式数据湖仓Paimon搭建流式湖仓可以解决上述传统离线数仓的问题。利用Flink的实时计算能力,数据可以在数仓分层之间实时流动。同时,利用Paimon高效的更新能力,数据变更可以在分钟级的延时内传递给下游消费者。因此,流式湖仓在延时和成本上具有双重优势。
具体特色:Paimon连接器_实时计算 Flink版(Flink)-阿里云帮助中心
该方案有如下优势:
Paimon的每一层数据都可以在分钟级的延时内将变更传递给下游,将传统离线数仓的延时从小时级甚至天级降低至分钟级。
Paimon的每一层数据都可以直接接受变更数据,无需覆写分区,极大地降低了传统离线数仓数据更新与订正的成本,解决了中间层数据不易查、不易更新、不易修正的问题。
模型统一,架构简化。ETL链路的逻辑是基于Flink SQL实现的;ODS层、DWD层和DWS层的数据统一存储在Paimon中,可以降低架构复杂度,提高数据处理效率。
同程选择Apache Paimon的原因如下:
应用场景:
优化:
问题一:Paimon 在写入任务的二次初始化慢
原因:Paimon 在写入任务不依赖于 checkpoint 重启的时候,它需要从 Manifest 里面加载已有的分区、bucket 和 LSM 树 Level 信息,用于后面的续写。在实践过程中,随着 commit 数量的增加,Manifest 数量的上涨也非常明显。同程实践下来单分区加载最大要花费 18 秒左右。并发写分区越多的情况下,初始化加载比较慢的情况就会比较严重。
解决方案:主要有两个方向,第一个是分缓存,第二个是增加一些筛选。
通过缓存减少与底层文件交互的次数,提升加载性能。同时在加载过程中,根据当前写入的分区和 Bucket 的信息裁剪掉不必要的 Manifest 文件,减少加载的实体数量,加快加载速度。碰到类似的问题,可以尝试去配置'write-manifest-cache'这个参数,增加它的缓存,然后根据实际应用大小去调整它的大小。实践下来,上千个分区并发写 200 多个分区情况下,配置 2 个 G 左右的 Manifest 缓存就够了。
问题二:commit 阶段的内存控制
原因:这个和 Manifest 的加载也有一定的关系,因为单次 commit 的变更分区和桶数是不固定的,commit 阶段可能会加载非常多的 Manifest,就会导致内存的占用过高,甚至出现 OOM 的情况。
解决办法:社区通过更改整个加载 Manifest 里的一个并发框架,将原来的 stream 模式改为了消费队列的模式,更加细粒度的控制 commit 阶段的内存使用。在实践过程中,commit 阶段出现类似的问题,可以尝试去配置'scan.manifest.parallelism'这个参数,目前配置的是 15,相对来说比较保守,任务基本能稳定运行。需要在实践过程中不断尝试,去找到一个比较合适的配置。
问题三:流批 Split 大小控制
原因:随着 Paimon 表数据量不断的增长,流批读 Paimon 的时候,他的 Split 下发阶段可能会触发 akka.framesize 的限制,导致任务报错。
解决办法:尝试去调整'scan.split-enumerator.batch-size'这个参数,控制流读的时候下发 Split 的大小。
问题四:加速时间戳旅行读
原因:在时间旅行过程中,比如读取某一个时间戳的数据量的时候会发现,初始化 Jobmanager 耗时非常久。经过分析主要是在根据时间戳定位 Snapshot 这个阶段,原来用的是遍历的模式,耗时基本上会在 10 分钟左右。
解决办法:后来改成了二分的模式,耗时从原来 10 分钟缩短到了 1 分钟不到,就能根据具体的时间戳定位到某一个快照信息,快速的将整个数据读取出来。
问题五:中文 Comment 乱码的问题
原因:编码问题
解决办法:目前在 Flink 1.17.2 上已经有 Flink 的 pr 进行修复了,可以升级到 1.17.2 上去解决。
参考:
基于 Flink SQL 和 Paimon 构建流式湖仓新方案_flinkcdc同步mysql到paimon-CSDN博客
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。