当前位置:   article > 正文

初识Apache Paimon

apache paimon

第 0 章 前言

0.1 主流数据湖工具

Delta:由Spark背后商业公司Databricks出品,因此很多功能强兼容Spark,对其他计算引擎不是很友好。

Apache Hudi:由 Uber 的工程师为满足其内部数据分析的需求而设计的数据湖项目,功能很丰富,但是体系设计也很复杂,目前在国内落地场景较多。

Apache Iceberg:由Netflix设计的一种数据湖项目,其采用了异于Hudi的文件布局方式,自身定位为一种新的数据湖表结构设计范式,提供一种数据湖设计的思想,Apache Paimon的文件布局方式就参考了Iceberg的文件布局方式。

0.2 传统流批一体架构挑战

面对流批场景,很多用户会采用经典的 Lambda 架构来构建他们的流批处理场景,但是Lambda架构会存在以下问题。

第一、批量导入到文件系统的数据一般都缺乏全局的严格 schema 规范,下游的 Spark 作业做分析时碰到格式混乱的数据会很麻烦,每一个分析作业都要过滤处理错乱缺失的数据,成本较大;

第二、数据写入文件系统这个过程没有 ACID 保证,用户可能读到导入中间状态的数据。所以上层的批处理作业为了躲开这个坑,只能调度避开数据导入时间段,可以想象这对业务方是多么不友好;同时也无法保证多次导入的快照版本,例如业务方想读最近 5 次导入的数据版本,其实是做不到的。

第三、用户无法高效 upsert/delete 历史数据,parquet 文件一旦写入 HDFS 文件,要想改数据,就只能全量重新写一份的数据,成本很高。事实上,这种需求是广泛存在的,例如由于程序问题,导致错误地写入一些数据到文件系统,现在业务方想要把这些数据纠正过来;线上的 MySQL binlog 不断地导入 update/delete 增量更新到下游数据湖中;某些数据审查规范要求做强制数据删除,例如欧洲出台的 GDPR 隐私保护等等。

第四、频繁地数据导入会在文件系统上产生大量的小文件,导致文件系统不堪重负,尤其是 HDFS 这种对文件数有限制的文件系统。

0.3 数据湖工具公共特点

针对上述传统Lambda架构在应对流批一体场景时的问题,一个标准数据湖工具应该具备以下四个最基本的特点。

随着数据湖的发展,当下数据湖工具的新标准为以下几个特点

0.4 数据湖工具对比

第一、ACID 和隔离级别支持

这里主要解释下,对数据湖来说三种隔离分别代表的含义。

  1. Serialization 是说所有的 reader 和 writer 都必须串行执行;
  2. Write Serialization: 是说多个 writer 必须严格串行,reader 和 writer 之间则可以同时跑;
  3. Snapshot Isolation: 是说如果多个 writer 写的数据无交集,则可以并发执行;否则只能串行。Reader 和 writer 可以同时跑。

综合起来看,Snapshot Isolation 隔离级别的并发性是相对比较好的。

第二、Schema 变更支持和设计

这里有两个对比项,一个是 schema 变更的支持情况,我的理解是 Hudi 仅支持添加可选列和删除列这种向后兼容的 DDL 操作,而其他方案则没有这个限制。另外一个是数据湖是否自定义 schema 接口,以期跟计算引擎的 schema 解耦。这里 Iceberg 是做的比较好的,抽象了自己的 schema,不绑定任何计算引擎层面的 schema。

第三、流批接口支持

目前 Iceberg 和 Hive 暂时不支持流式消费,不过 Iceberg 社区正在 issue 179 上开发支持。

第四、接口抽象程度和插件化

这里主要从计算引擎的写入和读取路径、底层存储可插拔、文件格式四个方面来做对比。

Iceberg 是抽象程度做得最好的数据湖方案,四个方面都做了非常干净的解耦。Delta 是 databricks 背后主推的,必须天然绑定 Spark;Hudi 的代码跟 Delta 类似,也是强绑定 Spark。存储可插拔的意思是说,是否方便迁移到其他分布式文件系统上(例如 S3),这需要数据湖对文件系统 API 接口有最少的语义依赖,例如若数据湖的 ACID 强依赖文件系统 rename 接口原子性的话,就难以迁移到 S3 这样廉价存储上,目前来看只有 Hive 没有太考虑这方面的设计;文件格式指的是在不依赖数据湖工具的情况下,是否能读取和分析文件数据,这就要求数据湖不额外设计自己的文件格式,统一用开源的 parquet 和 avro 等格式。这里,有一个好处就是,迁移的成本很低,不会被某一个数据湖方案给绑死。

第五、查询性能优化

第六、其他功能

这里 One line demo 指的是,示例 demo 是否足够简单,体现了方案的易用性,Iceberg 稍微复杂一点(我认为主要是 Iceberg 自己抽象出了 schema,所以操作前需要定义好表的 schema)。做得最好的其实是 Delta,因为它深度跟随 Spark 易用性的脚步。

Python 支持其实是很多基于数据湖之上做机器学习的开发者会考虑的问题,可以看到 Iceberg 和 Delta 是做的很好的两个方案。

出于数据安全的考虑,Iceberg 还提供了文件级别的加密解密功能,这是其他方案未曾考虑到的一个比较重要的点。

在了解到其他数据湖具备的基本功能和特性后,正式开始了解Apache Paimon吧~

第 1 章 概述

1.0 写在前面

Apache Paimon得益于Flink在流计算的迅猛发展,集百家长,避百家短,充分融合了Hudi和Iceberg很多优秀的设计,区别于过去类似于Hudi\Iceberg数据湖计算框架以批为主的思想,Paimon的愿景是打造一个真正的流式数据湖。

1.1 简介

Flink 社区希望能够将 Flink 的 Streaming 实时计算能力和 Lakehouse 新架构优势进一步结合,推出新一代的 Streaming Lakehouse 技术,促进数据在数据湖上真正实时流动起来,并为用户提供实时离线一体化的开发体验。Flink 社区内部孵化了 Flink Table Store (简称 FTS )子项目,一个真正面向 Streaming 以及 Realtime的数据湖存储项目。2023年3月12日,FTS进入 Apache 软件基金会 (ASF) 的孵化器,改名为 Apache Paimon (incubating)。

Apache Paimon是一个流数据湖平台,具有高速数据摄取、变更日志跟踪和高效的实时分析的能力。

1)读/写:Paimon 支持多种读/写数据和执行 OLAP 查询的方式。

(1)对于读取,它支持以下方式消费数据:

  • 从历史快照(批处理模式),
  • 从最新的偏移量(在流模式下),或
  • 以混合方式读取增量快照。

(2)对于写入,它支持来自数据库变更日志(CDC)的流式同步或来自离线数据的批量插入/覆盖。

2)生态系统

除了Apache Flink之外,Paimon还支持Apache Hive、Apache Spark、Trino等其他计算引擎的读取。

3)内部

在底层,Paimon 将列式文件存储在文件系统/对象存储上,并使用 LSM 树结构来支持大量数据更新和高性能查询。

4)统一存储

对于 Apache Flink 这样的流引擎,通常有三种类型的连接器:

  • 消息队列:例如 Apache Kafka,在源阶段和中间阶段都使用它,以保证延迟保持在秒级。
  • OLAP系统:例如Clickhouse,它以流方式接收处理后的数据并为用户的即席查询提供服务。
  • 批量存储:例如Apache Hive,它支持传统批处理的各种操作,包括INSERT OVERWRITE。

Paimon 提供表抽象。它的使用方式与传统数据库没有什么区别:

  • 在批处理执行模式下,它就像一个Hive表,支持Batch SQL的各种操作。查询它以查看最新的快照。
  • 在流执行模式下,它的作用就像一个消息队列。查询它的行为就像从历史数据永不过期的消息队列中查询流更改日志。

1.2 核心特性

1)统一批处理和流处理

批量写入和读取、流式更新、变更日志生成,全部支持。

2)数据湖能力

低成本、高可靠性、可扩展的元数据。 Apache Paimon 具有作为数据湖存储的所有优势。

3)各种合并引擎

按照您喜欢的方式更新记录。保留最后一条记录、进行部分更新或将记录聚合在一起,由您决定。

4)变更日志生成

Apache Paimon 可以从任何数据源生成正确且完整的变更日志,从而简化您的流分析。

5)丰富的表类型(Primary Table \ Append-Only Table) 

除了主键表之外,Apache Paimon还支持append-only表,提供有序的流式读取来替代消息队列。

6)模式演化(Schema Evolution)

Apache Paimon 支持完整的模式演化。您可以重命名列并重新排序。

1.3 基本概念

1.3.1 Snapshot

快照捕获表在某个时间点的状态。用户可以通过最新的快照来访问表的最新数据。通过时间旅行,用户还可以通过较早的snapshot访问表的先前状态。

1.3.2 Partition

Paimon 采用与 Apache Hive 相同的分区概念来分离数据。

分区是一种可选方法,可根据日期、城市和部门等特定列的值将表划分为相关部分。每个表可以有一个或多个分区键来标识特定分区。

通过分区,用户可以高效地操作表中的一片记录。

如果定义了主键,则分区键必须是主键的子集。

1.3.3 Bucket

未分区表或分区表中的分区被细分为存储桶,以便为可用于更有效查询的数据提供额外的结构。

桶的范围由记录中的一列或多列的哈希值确定。用户可以通过提供bucket-key选项来指定分桶列。如果未指定bucket-key选项,则主键(如果已定义)或完整记录将用作存储桶键。

桶是读写的最小存储单元,因此桶的数量限制了最大处理并行度。不过这个数字不应该太大,因为它会导致大量小文件和低读取性能。一般来说,建议每个桶的数据大小为1GB左右。

1.3.4 Consistency Guarantees一致性保证

Paimon writer使用两阶段提交协议以原子方式将一批记录提交到表中。每次提交在提交时最多生成两个快照。

对于任意两个同时修改表的writer,只要他们不修改同一个存储桶,他们的提交都是可序列化的。如果他们修改同一个存储桶,则仅保证快照隔离。也就是说,最终表状态可能是两次提交的混合,但不会丢失任何更改。

1.4 文件布局

一张表的所有文件都存储在一个基本目录下。 Paimon 文件以分层方式组织。下图说明了文件布局。从快照文件开始,Paimon 读者可以递归地访问表中的所有记录。

1.4.1 Snapshot Files

所有快照文件都存储在快照目录中。

快照文件是一个 JSON 文件,包含有关此快照的信息,包括:

正在使用的Schema文件

包含此快照的所有更改的清单列表(manifest list)

1.4.2 Manifest Files

所有清单列表(manifest list)和清单文件(manifest file)都存储在清单(manifest)目录中。

清单列表(manifest list)是清单文件名(manifest file)的列表。

清单文件(manifest file)是包含有关 LSM 数据文件和更改日志文件的文件信息。例如对应快照中创建了哪个LSM数据文件、删除了哪个文件。

1.4.3 Data Files

数据文件按分区和存储桶分组。每个存储桶目录都包含一个 LSM 树及其变更日志文件。目前,Paimon 支持使用 orc(默认)、parquet 和 avro 作为数据文件格式。

1.4.4 LSM Trees

Paimon 采用 LSM 树(日志结构合并树)作为文件存储的数据结构。

1.4.4.1 Sorted Runs

LSM 树将文件组织成多个Sorted Run。Sorted Run由一个或多个数据文件组成,并且每个数据文件恰好属于一个Sorted Run。注:Sorted Run这个概念是Rocksdb里面的

数据文件中的记录按其主键排序。在Sorted Run中,数据文件的主键范围永远不会重叠。

正如您所看到的,不同的Sorted Run可能具有重叠的主键范围,甚至可能包含相同的主键。查询LSM树时,必须合并所有Sorted Run,并且必须根据用户指定的合并引擎和每条记录的时间戳来合并具有相同主键的所有记录。

写入LSM树的新记录将首先缓存在内存中。当内存缓冲区满时,内存中的所有记录将被排序并刷新到磁盘。

1.4.4.2 Compaction

当越来越多的记录写入LSM树时,Sorted Run的数量将会增加。由于查询LSM树需要将所有Sorted Run合并起来,太多Sorted Run将导致查询性能较差,甚至内存不足。

为了限制Sorted Run的数量,我们必须偶尔将多个Sorted Run合并为一个大的Sorted Run。这个过程称为Compaction。

然而,Compaction是一个资源密集型过程,会消耗一定的CPU时间和磁盘IO,因此过于频繁的Compaction可能会导致写入速度变慢。这是查询和写入性能之间的权衡。 Paimon 目前采用了类似于 Rocksdb 通用压缩的Compaction策略。

默认情况下,当Paimon将记录追加到LSM树时,它也会根据需要执行Compaction。用户还可以选择在“专用Compaction作业”中独立执行所有Compaction。

1.4.5 Hudi的文件布局

Hudi-基本概念(时间轴、文件布局、索引、表类型、查询类型、数据写、数据读、Compaction)_hudi表-CSDN博客

1.4.6 Iceberg文件布局

iceberg的文件布局基本和Paimon的文件布局相同,Paimon的文件布局设计思想就是从Iceberg演变过来的,paimon snapshot思想从hudi过度过来的。

第 2 章 表类型

2.1 主键表

主键表是创建表时的默认表类型。用户可以在表中插入、更新或删除记录。

主键由一组列组成,这些列包含每个记录的唯一值。Paimon通过对每个bucket中的主键进行排序来实现数据排序,允许用户通过对主键应用过滤条件来实现更高的性能。

2.1.1 数据分布

默认情况下,Paimon表只有一个bucket,这意味着它此时的并行度是1。在使用的时候需要为表配置桶策略,提高并行度。

桶是进行读写操作的最小存储单元,每个桶目录包含一个LSM树。

2.1.2 Fixed Bucket

配置数量大于0的桶数目,使用Fixed bucket模式,根据Math.abs(key_hashcode % numBuckets)来计算真正要记录的桶。

减少桶数量只能在离线情况下(停掉作业)进行,桶数量太多会产生“小文件”问题,数量太少会导致“性能欠缺”

2.1.3 Dynamic Bucket

配置“bucket”=“-1”。最先到达的键将落入旧的桶中,新键将落入新桶中,桶和键的分布取决于数据到达的顺序。Paimon维护一个索引来确定哪个键对应于哪个bucket。

Paimon将自动扩展桶的数量。

Option1:“dynamic-bucket.target-row-num':控制一个bucket的行数,当bucket中的行数到达阈值将创建新的bucket。

Option2:“dynamic-bucket.initial-buckets ':控制初始化桶的数量。

动态bucket只支持单作业写模式。请不要启动多个作业来写入同一个分区(这可能导致重复数据)。即使启用“write-only”并启动专用的压缩作业,它也不会起作用

2.1.4 Normal dynamic bucket模式

当您的更新操作没有跨分区(未分区模式,或者主键包含所有分区字段)时,动态桶模式使用HASH索引来维护从键到桶的映射,它比固定桶模式需要更多的内存。

性能:

  1. 一般来说,没有性能损失,但会有一些额外的内存消耗,一个分区中的1亿个条目会多占用1 GB的内存,不活跃的分区不会占用内存。

  2. 对于更新率较低的表,建议使用该模式,以显著提高性能。

普通动态bucket 模式支持排序压缩以加快查询速度

2.1.5 Cross Partitions upsert dynamic bucket模式

当需要跨分区upsert(主键不包含所有分区字段)时,Dynamic Bucket模式直接维护键到分区和桶的映射,使用本地磁盘,并在开始流写作业时通过读取表中所有现有键来初始化索引。不同的合并引擎有不同的行为:

  1. Deduplicate:删除旧分区中的数据,并将新数据插入新分区。

  2. PartialUpdate & Aggregation:将新数据插入旧分区。

  3. FirstRow:如果有旧值,则忽略新数据。

性能:对于具有大量数据的表,将会有很大的性能损失。而且,初始化需要很长时间。

如果您的upsert不依赖于太旧的数据,您可以考虑配置索引TTL来减少索引和初始化时间:

'cross-partition-upsert.index-ttl':在rocksdb中设置TTL索引和初始化,这样可以避免维护过多的索引而导致性能越来越差。

但请注意,这也可能导致数据重复

2.1.6 Merge Engines

当Paimon sink接收到具有相同主键的两条或多条记录时,它会将它们合并到一条记录中,以保持主键的唯一性。通过指定merge-engine表属性,用户可以选择如何将记录合并在一起。

在flink sql中设置‘table.exec.sink.upsert-materialize=NONE’,可能会产生一些不可预估的现象。当输入是无序的,我们建议您使用序列字段(sequence fields) 来纠正混乱。

deduplicate

deduplicate引擎是默认的合并引擎。Paimon将只保留最新的记录,并丢弃具有相同主键的其他记录。

具体来说,如果最新的记录是一条DELETE记录,那么所有具有相同主键的记录都将被删除。

Partial Update

通过设置 'merge-engine' = 'partial-update':用户可以通过多次更新来更新记录的列,直到记录完成。这是通过使用相同主键下的最新数据逐个更新值字段来实现的。但是,在此过程中不会覆盖空值。

例如,假设Paimon接收到三条记录:

  • <1, 23.0, 10, NULL>-

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

假设第一列是主键,那么最终结果将是< 1,25.2,10,'This is a book'>。

对于流式读取场景Partial-Update 合并引擎必须与 lookup 或者 full-compaction changelog producer. 一起使用。(也支持' input ' changelog producer,但只返回输入记录。)

默认情况下,Partial Update不接受删除记录,您可以选择以下解决方案之一:

  • 配置‘partial-update.ignore-delete’ 忽略删除记录

  • 配置“sequence-group”来撤回部分列。

sequence group

`squence fields` 可能无法解决具有多个流更新的partial update 表的无序问题,因为在多流更新期间,序列字段可能被另一个流的最新数据覆盖。

因此,我们引入了Partial Update表的序列组机制。它可以解决:

  1. 多流更新过程中的混乱。每个流定义自己的序列组。

  2. 真正的部分更新,而不仅仅是非空更新。

  1. CREATE TABLE T (
  2.   k INT,
  3.   a INT,
  4.   b INT,
  5.   g_1 INT,
  6.   c INT,
  7.   d INT,
  8.   g_2 INT,
  9.    PRIMARY KEY (k) NOT ENFORCED
  10. ) WITH (
  11.    'merge-engine'='partial-update',
  12.    'fields.g_1.sequence-group'='a,b',
  13.    'fields.g_2.sequence-group'='c,d'
  14. );
  15. INSERT INTO T VALUES (1, 1, 1, 1, 1, 1, 1);
  16. -- g_2 is null, c, d should not be updated
  17. INSERT INTO T VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT));
  18. SELECT * FROM T; -- output 1, 2, 2, 2, 1, 1, 1
  19. -- g_1 is smaller, a, b should not be updated
  20. INSERT INTO T VALUES (1, 3, 3, 1, 3, 3, 3);
  21. SELECT * FROM T; -- output 1, 2, 2, 2, 3, 3, 3

对于序列组配置支持的字段类型为:DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ.

Partial Update的更新

您可以为输入字段指定聚合函数,aggregation中的所有函数都支持。

  1. CREATE TABLE T (
  2.         k INT,
  3.         a INT,
  4.         b INT,
  5.         c INT,
  6.         d INT,
  7.          PRIMARY KEY (k) NOT ENFORCED
  8. ) WITH (
  9.     'merge-engine'='partial-update',
  10.     'fields.a.sequence-group' = 'b',
  11.     'fields.b.aggregate-function' = 'first_value',
  12.     'fields.c.sequence-group' = 'd',
  13.     'fields.d.aggregate-function' = 'sum'
  14. );
  15. INSERT INTO T VALUES (1, 1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
  16. INSERT INTO T VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1, 1);
  17. INSERT INTO T VALUES (1, 2, 2, CAST(NULL AS INT), CAST(NULL AS INT));
  18. INSERT INTO T VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, 2);
  19. SELECT * FROM T; -- output 1, 2, 1, 2, 3
Aggregation 聚合函数

在flink sql中table.exec.sink.upsert-materialize要设置为NONE

有时用户只关心聚合的结果。聚合合并引擎根据聚合函数,在相同的主键下,将具有最新数据的每个值字段逐一聚合。

每个非主键字段都可以指定一个聚合函数,通过fields.<field-name>.aggregate-function,如果没有设定将默认采用最新的一个非空值。

例子:

  1. CREATE TABLE MyTable (
  2.   product_id BIGINT,
  3.   price DOUBLE,
  4.   sales BIGINT,
  5.    PRIMARY KEY (product_id) NOT ENFORCED
  6. ) WITH (
  7.    'merge-engine' = 'aggregation',
  8.    'fields.price.aggregate-function' = 'max',
  9.    'fields.sales.aggregate-function' = 'sum'
  10. );

字段价格将通过max函数进行汇总,字段销售额将通过sum函数进行汇总。给定两个输入记录< 1,23.0,15>和< 1,30.2,20>,最终结果将是< 1,30.2,35>。

当前支持的聚合函数和数据类型有:

  • sum:sum函数将跨多行的值聚合起来。当前支持的聚合函数和数据类型有,DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE

  • product:product函数可以跨多个行计算乘积值,DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE

  • count:count函数对跨多行的值进行计数。INTEGER, BIGINT

  • max:max函数识别并保留最大值。 CHAR, VARCHAR, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ

  • min:min函数识别并保留最小值。CHAR, VARCHAR, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ

  • last_value:last_value函数用最近导入的值替换之前的值,支持所有数据类型。

  • last_non_null_value:last_non_null_value函数用最近的非空值替换之前的值,支持所有数据类型

  • listtagg:listagg函数将多个字符串值连接成一个字符串,支持string类型

  • bool_and:求与运算,支持BOOLEAN类型给

  • bool_or:求或运算,支持BOOLEAN类型。

  • first_value:first_value函数从数据集中检索第一个空值,支持所有类型

  • first_not_null_value:first_not_null_value函数选择数据集中的第一个非空值,支持所有数据类型。

  • nested_update:nested_update函数将多行收集到一个数组中(所谓的“嵌套表”),支持ARRAY类型

    使用fields.<field-name>.nested-key=pk0,pk1,...指定嵌套表的主键。如果没有键,行将被追加到数组。

    例子

    1. -- orders table
    2. CREATE TABLE orders (
    3. order_id BIGINT PRIMARY KEY NOT ENFORCED,
    4. user_name STRING,
    5. address STRING
    6. );
    7. -- sub orders that have the same order_id
    8. -- belongs to the same order
    9. CREATE TABLE sub_orders (
    10. order_id BIGINT,
    11. sub_order_id INT,
    12. product_name STRING,
    13. price BIGINT,
    14.  PRIMARY KEY (order_id, sub_order_id) NOT ENFORCED
    15. );
    16. -- wide table
    17. CREATE TABLE order_wide (
    18. order_id BIGINT PRIMARY KEY NOT ENFORCED,
    19. user_name STRING,
    20. address STRING,
    21. sub_orders ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>
    22. ) WITH (
    23.  'merge-engine' = 'aggregation',
    24.  'fields.sub_orders.aggregate-function' = 'nested_update',
    25.  'fields.sub_orders.nested-key' = 'sub_order_id'
    26. );
    27. -- widen
    28. INSERT INTO order_wide
    29. SELECT
    30. order_id,
    31. user_name,
    32. address,
    33. CAST (NULL AS ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>)
    34. FROM orders
    35. UNION ALL
    36.  
    37. SELECT
    38. order_id,
    39. CAST (NULL AS STRING),
    40. CAST (NULL AS STRING),
    41. ARRAY[ROW(sub_order_id, product_name, price)]
    42. FROM sub_orders;
    43. -- query using UNNEST
    44. SELECT order_id, user_name, address, sub_order_id, product_name, price
    45. FROM order_wide, UNNEST(sub_orders) AS so(sub_order_id, product_name, price)

  • collect:collect函数将元素收集到Array中。您可以设置字段:field.<field-name>.distinct =true去重元素,只支持array类型,元素类型可以是 ARRAY, MULTISET, MAP, and ROW.

  • merge_map:merge_map函数用于合并输入映射,只支持map

只有sum和product支持回退(UPDATE_BEFORE和DELETE),其他聚合函数不支持回退。如果允许某些函数忽略撤回消息,则可以配置'fields.${field_name}.ignore-retract'='true'.

对于流式读取场景Partial-Update 合并引擎必须与 lookup 或者 full-compaction changelog producer. 一起使用。(也支持' input ' changelog producer,但只返回输入记录。)

first row

通过设置'merge-engine' = 'first-row',用户可以保留相同主键的第一行。它与重复数据删除合并引擎的不同之处在于,在第一行合并引擎中,它将生成只插入的更改日志

  • 您可以不能指定 sequence.field。

  • 不接受DELETE和UPDATE_BEFORE消息。您可以配置first-row.ignore-delete。忽略-删除忽略这两种记录。

这对于取代流计算中的日志重复数据删除有很大的帮助。

2.2 Append Table

如果表没有定义主键,则默认情况下它是一个追加表

在流式中,只能将完整的记录插入到表中。这种类型的表适用于不需要流更新的用例(例如日志数据同步)。

强烈推荐使用append scalable table模式,设置bucket=-1

Append Scalable Table 

定义

通过在表属性中定义'bucket' = '-1',您可以为这个表指定一个特殊模式(我们称之为“无感知桶模式”)(参见示例)。在这种模式下,所有的东西都是不同的。我们不再有bucket的概念,也不保证流读的顺序。我们将此表视为批处理脱机表(尽管我们仍然可以流式读取和写入)。所有记录都将进入一个目录(为了兼容性,我们将它们放在bucket-0中),并且不再维护顺序。由于我们没有桶的概念,所以我们不会按桶对输入记录进行洗牌,这会加快数据插入速度。

使用这种模式,您可以将Hive表替换为湖表。

compaction

在非感知桶模式下,我们不使用writer进行压缩,而是使用Compact Coordinator扫描小文件并将压缩任务提交给Compact Worker。通过这种方式,我们可以轻松地并行地对一个简单的数据目录进行压缩。在流模式下,如果你在flink中运行insert sql,拓扑将是这样的:

它将尽最大努力压缩小文件,但是当一个分区中的单个小文件长时间存在并且没有向分区添加新文件时,compact Coordinator将从内存中删除它以减少内存使用。重新启动作业后,它将扫描小文件并再次将其添加到内存中。控制压缩行为的选项与Append For queue完全相同。如果将write-only 设置为true,则将在compact coordinator和compact worker。

自动压缩仅在Flink引擎流模式下支持。您还可以在paimon中通过flink动作启动一个压缩作业,并通过设置 write-only禁用所有其他压缩作业

sort compact

每个分区中的数据乱序将导致select操作变得缓慢,而前面说的compaction操作可能会导致插入操作变慢。为插入作业设置write-only是一个不错的选择,并且在完成每个分区的数据之后,触发分区Sort Compact操作。参见排序压缩。

streaming source

无感知桶模式append only表支持流读写,但不再保证顺序。你不能把它看作一个队列,而是一个有储物箱的湖泊。每次提交都会生成一个新的记录仓,我们可以通过读取新的记录仓来读取增量,但是一个记录仓中的记录会流向它们想要的任何地方,并且我们以任何可能的顺序获取它们。在Append For Queue模式下,记录不存储在箱子中,而是存储在记录管道中。我们可以看到下面的区别。

streaming multiple partitions write (流式多分区写入)

因为Paimon-sink需要处理的写任务数量是:将数据写入的分区数量*每个分区的桶数量。因此,我们需要尽量控制每个paimon-sink任务的写任务数,使其分布在合理的范围内。如果每个sink任务处理太多的写任务,不仅会导致小文件过多的问题,还可能导致内存不足错误。

此外,写失败会产生僵尸文件,这无疑会增加维护paimon的成本。我们需要尽可能避免这个问题。

对于启用了自动合并的flink作业,我们建议尝试遵循以下公式来调整paimon-sink的并行度(这不仅适用于追加表,它实际上适用于大多数场景):

  1. (N*B)/P < 100 (This value needs to be adjusted according to the actual situation)
  2. N(the number of partitions to which the data is written)
  3. B(bucket number)
  4. P(parallelism of paimon-sink)
  5. 100 (This is an empirically derived threshold,For flink-jobs with auto-merge disabled, this value can be reduced.
  6. However, please note that you are only transferring part of the work to the user-compaction-job, you still have to deal with the problem in essence,
  7. the amount of work you have to deal with has not been reduced, and the user-compaction-job still needs to be adjusted according to the above formula.)

您也可以将`write-buffer- spilable`设置为true,写入器可以将记录溢出到磁盘。这可以尽可能地减少小文件。要使用此选项,您需要为flink集群提供一定大小的本地磁盘。这对那些在k8上使用flink的人来说尤其重要。 

对于追加表,可以为追加表设置write-buffer-for -append 选项。将此参数设置为true,写入器将使用段池缓存记录以避免OOM。

例子

下面是创建追加表并指定桶键的示例。

  1. CREATE TABLE MyTable (
  2. product_id BIGINT,
  3. price DOUBLE,
  4. sales BIGINT
  5. ) WITH (
  6. 'bucket' = '-1'
  7. );

append queue table

定义

在这种模式下,您可以将追加表视为以桶分隔的队列。同一bucket中的每条记录都是严格排序的,流式读取将按照写入的顺序将记录传输到下游。要使用这种模式,您不需要配置特殊配置,所有数据都将作为队列放入一个bucket中。您还可以定义bucket和bucket-key以启用更大的并行性和分散数据(参见示例)。

compaction(合并)

缺省情况下,汇聚节点将自动执行压缩以控制文件数量。以下选项控制压缩策略:

key  

defaulttypedescription
write-only
falseBoolean如果设置为true,将跳过compactions和snapshot过期。此选项与专用compact作业一起使用。
compaction.min.file-num
5Integer对于文件集[f_0,…],f_N],满足sum(size(f_i)) >= targetFileSize触发追加表compaction的最小文件数。这个值避免了几乎整个文件被compaction,因为那样是不划算的。
compaction.max.file-num
50Integer对于文件集[f_0,…],f_N],触发追加表compaction的最大文件数,即使sum(size(f_i)) < targetFileSize。此值可避免挂起过多的小文件,以免降低性能。
full-compaction.delta-commits
(none)Integer在增量提交之后,将不断触发全表compaction。

streaming source

流源行为目前仅支持Flink引擎。

streaming read order(流式顺序读)

对于流式读取,记录按以下顺序生成:

  • 对于来自两个不同分区的任意两条记录
    • 如果scan.plan-sort-partition 设置为true ,分区值较小的记录将首先生成。
    • 否则,将首先生成分区创建时间较早的记录
  • 对于来自同一分区和同一桶的任意两条记录,将根据写入顺序决定
  • 对于来自同一分区但不同桶的任意两条记录,不同桶由不同的任务处理,它们之间没有顺序保证

watermark 定义

您可以为读取Paimon表定义水印:

  1. CREATE TABLE T (
  2. `user` BIGINT,
  3. product STRING,
  4. order_time TIMESTAMP(3),
  5. WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
  6. ) WITH (...);
  7. -- launch a bounded streaming job to read paimon_table
  8. SELECT window_start, window_end, COUNT(`user`) FROM TABLE(
  9. TUMBLE(TABLE T, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;

你也可以启用Flink水印对齐,这将确保没有源/分割/碎片/分区将其水印间隔增加得太远。

keydefaulttypedescription
scan.watermark.alignment.group
(none)String一组用于对齐水印的源。
scan.watermark.alignment.max-drift
noneDuration在我们暂停从源/任务/分区消费之前,对齐水印的最大漂移。

Bounded Stream(有界流)

streaming source也可以被限定,你可以指定' scan.bounded.watermark. '定义有界流模式的结束条件,直到遇到更大的水印快照,流读取才会结束。

快照中的水印是由writer生成的,例如可以指定一个kafka源,并声明水印的定义。当使用这个kafka源对Paimon表进行写操作时,Paimon表的快照会生成相应的水印,这样流式读取这个Paimon表时就可以使用有界水印的特性

  1. CREATE TABLE kafka_table (
  2. `user` BIGINT,
  3. product STRING,
  4. order_time TIMESTAMP(3),
  5. WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
  6. ) WITH ('connector' = 'kafka'...);
  7. -- launch a streaming insert job
  8. INSERT INTO paimon_table SELECT * FROM kakfa_table;
  9. -- launch a bounded streaming job to read paimon_table
  10. SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;

例子

下面是创建追加表并指定桶键的示例。

  1. CREATE TABLE MyTable (
  2. product_id BIGINT,
  3. price DOUBLE,
  4. sales BIGINT
  5. ) WITH (
  6. 'bucket' = '8',
  7. 'bucket-key' = 'product_id'
  8. );

第 3 章 引擎

3.1 概览

Paimon不仅支持Flink SQL编写和本地查询,还提供来自其他流行引擎(如Apache Spark和Apache Hive)的查询。

EngineVersionBatch ReadBatch WriteCreate TableAlter TableStreaming WriteStreaming ReadBatch Overwrite
Flink1.14 - 1.18✅(1.17+)
Spark3.1 - 3.5✅(3.3+)
Hive2.1 - 3.1
Spark2.4
Trino358 - 422
Presto0.236 - 0.280
StarRocks3.1+
Doris2.0+

推荐 versions are Flink 1.17.2, Spark 3.5.0, Hive 2.3.9

3.2 flink

安装步骤省略

3.2.1 savepoint and recover

因为Paimon有自己的快照管理,这可能会与Flink的检查点管理冲突,导致从保存点恢复时出现异常(不要担心,它不会导致存储损坏)。
建议使用以下方法保存点:

  • 使用 stop with savepoint 

大家都知道 Flink 会周期性的进行 Checkpoint,并且维护了一个全局的状态快照。假如我们碰到这种场景:用户在两个Checkpoint 周期中间主动暂停了作业,然后过一会又进行重启。这样,Flink 会自动读取上一次成功保存的全局状态快照,并开始计算上一次全局快照之后的数据。虽然这么做能保证状态数据的不多不少,但是输出到 Sink 的却已经有重复数据了。有了这个功能之后,Flink 会在暂停作业的同时做一次全局快照,并存储到Savepoint。下次启动时,会从这个 Savepoint 启动作业,这样 Sink 就不会收到预期外的重复数据了。不过,这个做法并不能解决作业在运行过程中自动Failover而引起的输出到 Sink 数据重复问题。  

3.2.2 数据类型

paimo支持所有flink的数据类型除了MULTISET、主键不支持MAP

3.2.3 使用flink进行内存管理

Paimon任务可以基于执行器内存创建内存池,这些内存将由Flink执行器管理,例如Flink TM的托管内存。它将通过executor管理多个任务的写器缓冲区,从而提高接收器的稳定性和性能。

如果使用Flink托管内存,可以设置以下属性:

OptionDefaultDescription
sink.use-managed-memory-allocatorfalse如果为true, flink sink将为合并树使用托管内存;否则,它将创建一个独立的内存分配器,这意味着每个任务分配和管理自己的内存池(堆内存),如果一个Executor中有太多的任务,可能会导致性能问题甚至OOM。
sink.managed.writer-buffer-memory256M

托管内存中写入器缓冲区的权重,Flink会根据权重计算出写入器的内存大小,实际使用的内存取决于运行环境。现在,在这个属性中定义的内存大小等于在运行时分配给写缓冲区的确切内存。

用户可以在SQL中为Flink托管内存设置内存权重,然后Flink sink操作符将获得内存池大小并为Paimon writer创建分配器。

  1. INSERT INTO paimon_table /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='256M') */
  2. SELECT * FROM ....;

其他引擎看官方文档吧~

第 4 章 File System

4.1 概览

Apache Paimon使用与Apache Flink相同的可插拔文件系统。如果使用Flink作为计算引擎,用户可以按照标准的插件机制来配置插件结构。然而,对于其他引擎,如Spark或Hive,提供的opt jar(由Flink)可能会得到冲突,不能直接使用。由于用户不方便修复类冲突,因此Paimon提供了自包含的引擎统一的FileSystem pluggable jar供用户从Spark/Hive端查询表。

支持的文件系统

FileSystemURI SchemePluggableDescription
Local File Systemfile://NBuilt-in Support
HDFShdfs://NBuilt-in Support, ensure that the cluster is in the hadoop environment
Aliyun OSSoss://Y
S3s3://Y

4.2 catalog

Paimon目录目前支持两种类型的介元:

  • filesytem metastore(默认),它在文件系统中存储元数据和表文件。
  • hive metastore,它还将元数据存储在Hive metastore中。用户可以直接从Hive访问这些表

第 5 章 CDC ingestion

5.0 前言

5.0.1 当前的问题

Apache Paimon 最典型的场景是解决了 CDC (Change Data Capture) 数据的入湖;CDC 数据来自数据库。一般来说,分析需求是不会直接查询数据库的。

  1. 容易对业务造成影响,一般分析需求会查询全表,这可能导致数据库负载过高,影响业务
  2. 分析性能不太好,业务数据库一般不是列存,查询部分列 Projection 性能太差
  3. 没有 Immutable 的视图,离线数仓里面需要根据 Immutable 的一个分区来计算

所以需要通过 CDC 的方式同步数据库的数据到数据仓库或数据湖里。

CDC可以理解为是Changelog数据流。

目前典型的同步方式依然是 Hive 的全量与增量的离线合并同步方式。

image.png

在 Hive 数仓里维护两张表:增量分区表和全量分区表,通过:

  1. (按需) 初始化时使用 DataX 或 Sqoop 等工具同步整张数据库表到 Hive 全量表的分区中。
  2. 每天定时 (比如凌晨0点30分) 同步增量数据 (通过 Kafka) 到 Hive 增量分区表,形成一个增量分区 T。
  3. 将 增量分区 T 与 全量分区 T-1 进行合并,产出今天的 全量表 分区 T。

这个流程在今天也是主流的同步方式,离线数据提供一个 Immutable 的视图,让数据的可靠性大大增加。 但是它的问题不少:

  1. 架构链路复杂度高:由于链路复杂,每天产出全量分区容易有问题导致不能按时产出,新增业务也比较复杂,全量和增量割裂。
  2. 时延高:至少 T + 1 延时,而且需要等全量和增量合并完成。
  3. 存储成本高:每天全量表一个分区存储所有数据,意味着 100 天就需要 100 倍的存储成本。
  4. 计算成本高:每天需要读取全量数据,与增量数据进行全量合并,在增量数据不多时浪费严重。

5.0.2 引入Paimon

和其它数据湖不同的是,Paimon 是从流世界里面诞生的数据湖,所以它在对接流写流读、对接 Flink 方面都要比其它数据湖做得更好。 Flink 结合 Paimon 打造的入湖架构如下:

image.png

image.png

步骤如下:

  1. 通过 Flink CDC 一键全增量一体入湖到 Paimon,此任务可以配置 Tag 的自动创建,然后通过 Paimon 的能力,将 Tag 映射为 Hive 的分区,完全兼容原有 Hive SQL 的用法。

只需一步。

Paimon 的每一次写都会生成一个 Immutable 的快照,快照可以被 Time Travel 的读取,但是快照会有过期被删除的问题,因此要解决此问题,可以基于快照创建 Tag;Tag 就是快照集合,通过Tag提供离线历史数据的访问。

流式入湖方式可以有如下多种方式:

  1. Flink SQL 入湖,SQL 处理,可以有函数等 Streaming SQL 的处理
  2. Paimon 一键 Schema Evolution 入湖,好处是 Schema 也会同步到下游 Paimon 表里:详见 https://paimon.apache.org/docs/master/cdc-ingestion/overview/

它的好处是:

  1. 架构链路复杂度低,不再因为各种组件的问题导致链路延时,你只用运维这一个流作业,而且可以完全兼容原有 Hive SQL 用法。
  2. 时延低:延时取决于流作业的 Checkpoint Interval,数据最低1分钟实时可见 (建议1-5分钟)。不但如此,Paimon 也提供了流读的能力,让你完成分钟级的 Streaming 计算,也可以写到下游别的存储。
  3. 存储成本低:得益于湖格式的 Snapshot 管理,加上 LSM 的文件复用,比如同样是存储 100天的快照,原有 Hive 数仓 100 天需要 100 份的存储,Paimon 在某些增量数据不多的场景只需要 2 份的存储,大幅节省存储资源。
  4. 计算成本低:得益于 LSM 的增量合并能力,此条链路只有增量数据的处理,没有全量的合并。可能有用户会担心,常驻的流作业会消耗更多的资源,对 Paimon 来说,你可以打开纯异步 Compaction 的机制,以 Paimon 优异的性能表现,只用少量的资源即可完成同步,Paimon 另有整库同步等能力帮助你节省资源。

5.1 概览

通过模式演化,Paimon支持多种方式将数据摄取到Paimon表中。这意味着添加的列将实时同步到Paimon表,并且不会为此重新启动同步作业。
我们目前支持以下同步方式:

  1. MySQL Synchronizing Table: 同步一个或多个表从MySQL到一个Paimon表。
  2. MySQL Synchronizing Database: 将整个MySQL数据库同步到一个Paimon数据库。
  3. Program API Sync: 将自定义数据流输入同步到一个Paimon表中。
  4. Kafka Synchronizing Table: 同步一个Kafka主题的表到一个Paimon表
  5. Kafka Synchronizing Database: 同步一个包含多个表的Kafka主题或多个包含一个表的主题到一个Paimon数据库。
  6. MongoDB Synchronizing Collection: 从MongoDB同步一个Collection到一个Paimon表。
  7. MongoDB Synchronizing Database: 将整个MongoDB数据库同步到一个Paimon数据库
  8. Pulsar Synchronizing Table: 同步一个pulsar主题表到一个Paimon表
  9. Pulsar Synchronizing Database: 同步一个包含多个表的Pulsar主题或多个包含一个表的主题到一个Paimon数据库。

5.2 什么是模式演变(Schema Evolution)

假设我们有一个名为tableA的MySQL表,它有三个字段:field_1, field_2, field_3。当我们想要加载这个MySQL表到Paimon时,我们可以在Flink SQL中这样做,或者使用MySqlSyncTableAction。

flink sql

MySqlSyncTableAction:

5.3 模式变更(Schema Change Evolution)

Cdc摄取支持有限数量的模式更改。目前,框架不能重命名表,删除列,所以rename table和drop COLUMN的行为将被忽略,rename COLUMN将添加一个新的列。当前支持的模式更改包括:

  • 添加列
  • 修改列类型
    • 将字符串类型(char, varchar, text)转换为另一种长度更长的字符串类型,
    • 从二进制类型(binary, varbinary, blob)转换为另一种长度更长的二进制类型,
    • 从整数类型(tinyint, smallint, int, bigint)转换为另一个范围更大的整数类型,
    • 从浮点类型(float, double)转换为另一种范围更大的浮点类型;

 参考

Apache Paimon 流式数据湖 V 0.4 与后续展望 - 知乎
当流计算邂逅数据湖:Paimon 的前生今世 - 知乎

流数据湖平台Apache Paimon(一)概述-腾讯云开发者社区-腾讯云

深度对比delta、iceberg和hudi三大开源数据湖方案 - 知乎

Apache Paimon | Apache Paimon

未完待续……

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/1001536
推荐阅读
相关标签
  

闽ICP备14008679号