赞
踩
paimon 官网学习记录,了解 paimon 是用来设计做什么,有什么优势,使用等
支持批流一体,主要用来做实时数据湖
Paimon 并使用 LSM 树结构来支持大量数据更新和高性能查询
和Flink CDC整合,实时入湖更简单
支持 mysql 等直接同步到 paimon,且支持表结构变更
Paimon 的变更日志还可以存储到外部日志系统(例如 Kafka)中或从外部日志系统(例如 Kafka)中消费
支持 hive presto spark flink 等多种计算引擎
支持hdfs oss s3等多种储存
Basic Concepts | Apache Paimon
整体看下来,很多地方和 icerberg 有相似之处,但是在 primary 表功能更强,且支持flink纯流式读取,和lookup join
借鉴了iceberg的文件数:File Layouts | Apache Paimon
学过 iceberg 这个就很熟悉了,不过有一些点需要注意
Bucket是读写的最小存储单元,每个Bucket目录中包含一棵LSM树。
Paimon 表只有一个桶,这意味着它只提供单并行读写
paimon 为什么有delete文件???
- CREATE TABLE MyTable (
- pk BIGINT PRIMARY KEY NOT ENFORCED,
- f1 DOUBLE,
- f2 BIGINT,
- ts TIMESTAMP
- ) WITH (
- -- 指定顺序字段,避免乱序
- 'sequence.field' = 'ts',
- --设置默认值
- 'fields.f2.default-value'='0' ,
- -- 合并引擎,仅限于 Primary Key Table
- 'merge-engine'='deduplicate',
-
- );
主键表是 Paimon 作为流式数据湖的核心,它可以接收上游来自数据库 CDC 或者 Flink Streaming 产生的 Changelog,里面包含 INSERT、UPDATE、DELTE 的数据。
主键由一组包含每个记录的唯一值的列组成。Paimon 通过对每个存储桶内的主键进行排序来强制数据排序,允许用户通过对主键应用过滤条件来实现高性能
注意:table.exec.sink.upsert-materialize 建议始终设置为NONE,当输入乱序时,我们建议您使用 序列字段来纠正乱序。
Merge Engines | ||
---|---|---|
deduplicate | 默认的合并引擎 | 使用主键的最新一条记录作为当前数据 |
partial-update | 部分更新,null 值不覆盖已有数据 | INSERT INTO T VALUES (1, 1,null,null); INSERT INTO T VALUES (1, null,null,1); SELECT * FROM T; -- output 1, 1, null, 1 |
aggregation | 聚合模型(比如doris),需要指定非主键列的聚合类型 | CREATE TABLE MyTable ( product_id BIGINT, price DOUBLE, sales BIGINT, PRIMARY KEY (product_id) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', 'fields.price.aggregate-function' = 'max', 'fields.sales.aggregate-function' = 'sum' ); |
first-row | 保留同一主键的第一行,不接受 DELETE 和 UPDATE_BEFORE | 与 deduplicate合并引擎不同的是,在 first-row合并引擎中,它将生成仅插入变更日志 |
默认的合并引擎,使用主键的最新一条记录作为当前数据
部分更新
注意:
stream 查询时,partial-update必须和 lookup or full-compaction changelog producer 一起使用
默认是不支持删除数据的
支持聚合(Aggregation 模式的所有函数)
这两个是什么意思
sequence-group解决多流更新时,不同流字段的乱序问题
- 'fields.g_1.sequence-group'='a,b',
- 'fields.g_2.sequence-group'='c,d'
默认是使用last_non_null_value聚合,支持以下
这个还有些复杂捏,后面再看看
只有 sum, product and count 支持retraction (UPDATE_BEFORE and DELETE)
如果您允许某些功能忽略撤回消息,您可以配置: 'fields.${field_name}.ignore-retract'='true'
注意:流读下,必须 lookup or full-compaction changelog producer,(‘input’ changelog producer is also supported, but only returns input records.)
保留同一主键的第一行,与 deduplicate合并引擎不同的是,在first-row合并引擎中,it will generate insert only changelog
paimon 支持对外提供changelog(比如flink stream read)
需要注意:
对读取paimon的source端可能有压力
另外paimon会合并数据,拿不到完整的changelog
但是大多数场景不需要完整的历史changelog,一般都是选默认的none
上游如果是changelog日志,所有输入记录将保存在单独的changelog文件(和lsm树不影响)
Lookup应该是为了在上游没有changelog时在commit前查找历史数据生成changelog
Paimon will generate changelog through 'lookup' before co
Full Compaction 解耦数据写入和changelog生成,Paimon will compare the results between full compactions and produce the differences as changelog。The latency(延迟) of changelog is affected(影响) by the frequency of full compactions.
延迟可能很高
sequence.field 解决数据乱序问题,sequence.field无论输入顺序如何,具有最大值的记录将是最后合并的记录。
Row Kind Field
这个怎么使用的???
如果表没有定义主键,则它是 Append 表,你只能插入 INSERT 的数据。Append 表的 Compaction 只是用来合并小文件。
目前 Append 表有两种模式:Scalable 表 与 Queue 表。推荐使用 Scalable 表,它更简单易懂。
- CREATE TABLE MyTable (
- user_id BIGINT,
- item_id BIGINT,
- behavior STRING,
- dt STRING
- ) WITH (
- 'bucket' = '-1'
- )
当你定义 bucket 为 -1,且没有主键时,你可以认为它就是一张增强的 Hive 表,它将没有桶的概念 (虽然这种模式把数据放到 bucket-0 目录中,但是所有的读写并没有并发限制,桶被忽略了),它就是一个普通的数仓表,支持批写批读,支持流写流读,只是它的流读会有一部分乱序 (并不是完全的输入顺序)。
它有如下应用场景:
简单替换 Hive 表,在 Hive 表的基础上拥有湖存储的 ACID 特性。
你可以流写流读它,它有着非常高的吞吐,非常好的易用性。
你也可以使用对它进行批的排序,结合 z-order 等手段,它可以提供非常快速的点查范围查询。
如果你想在传统 TPC-DS 中测试 Paimon 表,请使用此模式。
执行拓扑:
图的上面部分是 Source 数据写入表,可以看到,中间并没有 Shuffler 了,它可以有着非常高的吞吐。
图的下面部分是 Compaction 拓扑,它会监控文件,进行必要性的小文件合并,它是完全非阻塞,纯异步的模式,并不会阻塞正常数据的写。如果你希望关闭它,请配置 'write-only'。
我们的建议是:在一个流写任务中保持 Compaction 拓扑的打开 (或者启动 Dedicated Compaction 作业),其它写作业统统 write-only,这是非常方便的玩法,Compaction 拓扑不但会合并小文件,还会清理过期 Snapshots 等等。
-
- CREATE TABLE MyTable (
- user_id BIGINT,
- item_id BIGINT,
- behavior STRING,
- dt STRING
- ) WITH (
- 'bucket' = '10',
- 'bucket-key' = 'user_id'
- )
在这种模式下,你可以将 Append 表视为一个由 bucket 分隔的队列。同一存储桶中的每条记录都是严格排序的,流式读取会将记录准确地按写入顺序传输到下游。你需要还可以定义 bucket 个数和 bucket-key,以实现更大的并行性 (默认 bucket 为 1)。
目前 Queue 表只能通过设置 bucket-key 的方式,Flink Kafka 默认模式 (Fixed, each Flink task ends up in at most one Kafka partition) 暂时没被支持,但是这在 Roadmap 上。
整张表就像 Kafka 的 Queue 一样在工作,它与 Kafka 的优劣如下:
延时分钟级,而 Kafka 可以是秒级,但是 Paimon 的成本要低得多。
数据沉淀下来,可被计算引擎 AD-HOC 查询。
流读可以进行 Projection & Filter Pushdown,这可以大大降低成本。
另外,此模式的 Compaction 只会进行 bucket 内的小文件合并,所以会造成更多的小文件,你可以配置 'compaction.max.file-num' (默认 50) 更小的此参数来更多的合并小文件。
另外,并不仅仅是顺序,Paimon 的此模式还可以让你定义 Watermark,并且支持最新的 Watermark 对齐的策略。
使用上和其他数据湖差不多,一般也是外部catalog管理元数据,另外支持历史快照(批处理模式)或者从最新的偏移量(在流模式下)读取等多种方式
spark 尽管也可以流式读写paimon,但是基本上就是当做了hive表使用
- insert into paimon_catalog.user_log .....
- select * from paimon_catalog.user_log
支持update 和 merge into 语法
- UPDATE my_table SET v = 'new_value' WHERE id = 1;
-
- -- 尽管官方没说 MERGE INTO 不支持 first row 和 agg
- MERGE INTO target
- USING source
- ON target.a = source.a
- WHEN MATCHED THEN
- UPDATE SET *
- WHEN NOT MATCHED
- THEN INSERT *
Ps: UPDATE 和 MERGE INTO 是spark 自己读取数据合并在写入,还是提交给paimon处理(paimon没有单独的计算能力,所以只能是通过文件解决),怎么做的?
模式演化和Spark Procedure我没看太明白
paimon支持flink所有数据类型
查询时指定分区和主键过滤器,且为主键最左边的前缀指定范围过滤器,查询效率会好很多
- -- paimon requires checkpoint interval in streaming mode
- SET 'execution.checkpointing.interval' = '10 s';
- -- write streaming data to dynamic table
- INSERT INTO word_count SELECT word, COUNT(*) FROM user_log GROUP BY word;
- SET 'execution.runtime-mode' = 'streaming';
- SET 'execution.runtime-mode' = 'batch';
-
- -- track the changes of table and calculate the count interval statistics
- SELECT `interval`, COUNT(*) AS interval_cnt FROM
- (SELECT cnt / 10000 AS `interval` FROM user_log ) GROUP BY `interval`;
冲突是指???
Flink 托管内存 我也没看太懂
paimon 支持 looup join,并且重写了 flink 的looup join
目前来看性能还是可以的,比habse 稍微差一些
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。