当前位置:   article > 正文

2024年最全3 Paimon数据湖中的表类型详解_paimon表和hive区别(1),2024年最新大数据开发开发全套学习

paimon表

img
img

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化资料的朋友,可以戳这里获取

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(env)


//创建Paimon类型的Catalog
tEnv.executeSql(
  """
    |CREATE CATALOG paimon_catalog WITH (
    |    'type'='paimon',
    |    'warehouse'='hdfs://bigdata01:9000/paimon'
    |)
    |""".stripMargin)
tEnv.executeSql("USE CATALOG paimon_catalog")

//执行查询,并且打印输出结果
tEnv.executeSql(
  """
    |SELECT
    |    *
    |FROM `paimon_catalog`.`default`.`user_par`
    |WHERE dt = '20230101' AND hh in ('10','11')
    |""".stripMargin)
  .print()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

}

}


执行代码,可以看到如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5

±—±------------±-------------------------------±-------------------------------±-------------------------------+
| op | id | name | dt | hh |
±—±------------±-------------------------------±-------------------------------±-------------------------------+
| +I | 1 | jack | 20230101 | 10 |
| +I | 2 | tom | 20230101 | 11 |


这就是Paimon分区表的使用了。


#### 3.1.4 临时表


最后我们来看一下临时表,前面我们已经用过临时表了。


临时表是由`Paimon Catalog`临时进行记录,但是不由它管理。删除临时表时也不会删除表中的数据文件。


也就是说这个临时表的元数据信息只会临时存储在`Paimon Catalog`里面,当Flink SQL任务执行结束之后,这个临时表的元数据信息就会被删除了。



> 
> 注意:其实这个临时表的特性是Flink SQL提供的,所以目前只能在Flink SQL中使用临时表。
> 
> 
> 


临时表的典型应用场景是这样的,就是我们想要在`Paimon Catalog`里面使用`其他类型`的表。  
 因为在Paimon Catalog里面定义表的时候是不允许指定connector属性的,所以说如果我们想要通过connector指定kafka或者其他类型的数据存储系统,就需要定义临时表了。


下面我们来看一个例子,加深我们对临时表应用场景的理解:


首先我们创建了Paimon Catalog,并且进入了这个Catalog里面。



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

CREATE CATALOG paimon_catalog WITH (
type’ = ‘paimon’,
‘warehouse’ = ‘hdfs:///path/to/warehouse’
);

USE CATALOG paimon_catalog;


然后我们在Paimon Catalog里面创建了两个表:`t1`和`t2`。



  • 1
  • 2
  • 3
  • 4
  • 5

CREATE TABLE t1(…) WITH (…);

CREATE TEMPORARY TABLE t2(
id INT,
name STRING
) WITH (
‘connector’ = ‘filesystem’,
‘path’ = ‘hdfs://…/data.json’,
‘format’ = ‘json’
);


注意:t1是Paimon类型的表。t2是临时表。


t2表中的数据来源于hdfs中的json数据文件,所以此时我们在Paimon Catalog里面创建t2表的时候就需要使用临时表了,因为我们需要通过connector指定数据的存储位置为filesystem。  
 如果不使用临时表,在Paimon Catalog里面创建t2的时候就不能使用connector属性了。


最后就可以在Paimon Catalog里面直接操作这两个表了。



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

SELECT t1.id,t2.name,t1.age FROM t1 JOIN t2 ON t1.id = t2.id;


如果我不想使用临时表,但是还想实现这个需求,应该怎么做呢?


其实也很简单,我们只需要在默认的`default catalog`里面创建表`t2`即可。


但是需要注意,后续我们在同时使用表`t1`和`t2`的时候,就需要指定表的完整名称了。  
 类似于`FlinkSQLWriteToPaimon`这个案例。



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

//向目的地表中写入数据
tEnv.executeSql(
“”"
|INSERT INTO paimon_catalog.default.wc_sink_sql
|SELECT
| word,
| COUNT(*) as cnt
|FROM default_catalog.default_database.word_source
|GROUP BY word
|“”".stripMargin).print()


* 如果是在paimon catalog里面执行这个SQL,则需要给`word_source`这个表指定完整名称。
* 如果是在的default catalog里面执行这个SQL,则需要给`wc_sink_sql`这个表指定完整名称。


这就是临时表的典型应用场景。


### 3.2 存储维度


从存储维度来看,Paimon中的表可以分为两种:


* `Primary Key`表,也可以称之为主键表。
* `Append Only`表,也可以称之为仅追加表。


这两种表其实很好区分,如果表中定义的有主键字段,则是主键表;如果表中没有定义主键字段,则是仅追加表。


下面我们来详细分析一下。


#### 3.2.1 Primary Key表(主键表)


主键表中包含主键字段,可以支持新增、更新和删除表中的数据。



> 
> 注意:主键可以由一个或者多个字段组成。
> 
> 
> 


主键表其实我们前面已经使用过了,就是在建表语句中通过`PRIMARY KEY`来指定主键字段。


主键表中还包含了多个高级特性:


* Bucket
* Changelog Producers
* Merge Engines
* Sequence Field


下面我们来具体看一下这些高级特性。


##### 3.2.1.1 Bucket


Bucket:可以翻译为桶。


Bucket是表中数据读写的最小存储单元,所以Bucket的数量限制了读写的并行度,最终会影响读写性能,每个Bucket目录内部会包含一棵LSM树。



> 
> 注意:LSM树是一种数据结构,Paimon采用了LSM树作为其文件存储的数据结构。
> 
> 
> 


主键表目前支持两种`Bucket mode`(模式):


* 1:`Fixed Bucket mode`:属于固定Bucket数量模式,也就是需要我们手工指定Bucket的数量。我们在建表时默认使用的就是这种模式,Bucket参数的值默认为1。我们只需要给Bucket设置一个大于0的数值即可。但是需要注意:Bucket数量过大会导致小文件过多,影响读取性能;Bucket数量过小会影响写入性能。一般情况下,每个Bucket中的数据量推荐为1G左右。
* 2:`Dynamic Bucket mode`:属于动态Bucket数量模式,也就是说Bucket的数量是动态变化的。此时我们需要在建表时指定`'bucket' = '-1'`,此时会由Paimon动态维护索引,将每个Bucket中的数据条数控制在2000000(2百万)以下,这个数值是由`dynamic-bucket.target-row-num`这个参数控制的。但是需要注意,目前这种模式属于实验性质,暂时不建议在生产环境下使用。


下面我们通过一个案例来具体感受一下Bucket。


创建package:`tech.xuwei.paimon.bucket`  
 创建object:`BucketDemo`


代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88

package tech.xuwei.paimon.bucket

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**

  • 验证Bucket特性

  • Created by xuwei
    */
    object BucketDemo {
    def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
    “”"
    |CREATE CATALOG paimon_catalog WITH (
    | ‘type’=‘paimon’,
    | ‘warehouse’=‘hdfs://bigdata01:9000/paimon’
    |)
    |“”".stripMargin)
    tEnv.executeSql(“USE CATALOG paimon_catalog”)

    //创建Paimon表
    tEnv.executeSql(
    “”"
    |CREATE TABLE IF NOT EXISTS bucket_test (
    | word STRING,
    | cnt BIGINT,
    | PRIMARY KEY (word) NOT ENFORCED
    |)WITH(
    | ‘bucket’ = ‘2’ – 手工指定bucket的值,默认为1
    |)
    |“”".stripMargin)

    //查看最完整的建表语句
    tEnv.executeSql(“SHOW CREATE TABLE bucket_test”).print()

    //向表中添加数据
    tEnv.executeSql(
    “”"
    |INSERT INTO bucket_test(word,cnt)
    |VALUES(‘a’,1) , (‘b’,2) , (‘c’,1) , (‘d’,3)
    |“”".stripMargin)

}

}


执行代码,可以看到输出的完整建表语句信息:



  • 1
  • 2
  • 3
  • 4
  • 5

CREATE TABLE paimon_catalog.default.bucket_test (
word VARCHAR(2147483647) NOT NULL,
cnt BIGINT,
CONSTRAINT 53dc473d-3e56-4e13-ac8b-a4f3c9abb72b PRIMARY KEY (word) NOT ENFORCED
) WITH (
‘bucket’ = ‘2’,
‘path’ = ‘hdfs://bigdata01:9000/paimon/default.db/bucket_test’
)


接下来我们到hdfs中查看一个这个表的Bucket信息:



  • 1
  • 2
  • 3
  • 4
  • 5

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/bucket_test
Found 5 items
drwxr-xr-x - yehua supergroup 0 2028-11-29 17:54 /paimon/default.db/bucket_test/bucket-0
drwxr-xr-x - yehua supergroup 0 2028-11-29 17:54 /paimon/default.db/bucket_test/bucket-1
drwxr-xr-x - yehua supergroup 0 2028-11-29 17:54 /paimon/default.db/bucket_test/manifest
drwxr-xr-x - yehua supergroup 0 2028-11-29 17:54 /paimon/default.db/bucket_test/schema
drwxr-xr-x - yehua supergroup 0 2028-11-29 17:54 /paimon/default.db/bucket_test/snapshot


此时可以看到,这个表中包含2个bucket目录,这两个bucket目录中存储的就是这个表中的所有数据。


bucket目录内部都是一些data数据文件,里面就是真实的数据内容了:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/bucket_test/bucket-0
Found 1 items
-rw-r–r-- 3 yehua supergroup 545 2028-11-29 17:54 /paimon/default.db/bucket_test/bucket-0/data-8475e141-1725-489f-bd09-13606fd2302f-0.orc


咱们之前创建的表没有手工指定bucket,那么bucket默认为1,可以到表wc\_sink\_sql里面看一下,这个表里面存储了多条数据,但是只有1个bucket:



  • 1
  • 2
  • 3
  • 4
  • 5

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/wc_sink_sql
Found 4 items
drwxr-xr-x - yehua supergroup 0 2028-11-29 11:53 /paimon/default.db/wc_sink_sql/bucket-0
drwxr-xr-x - yehua supergroup 0 2028-11-29 11:53 /paimon/default.db/wc_sink_sql/manifest
drwxr-xr-x - yehua supergroup 0 2028-11-29 11:44 /paimon/default.db/wc_sink_sql/schema
drwxr-xr-x - yehua supergroup 0 2028-11-29 11:53 /paimon/default.db/wc_sink_sql/snapshot


所以在实际工作中,设置合适的bucket数量就是非常重要的了。  
 但是在不同的时期,bucket的数量大概率是需要调整的,因为合适的bucket数量不可能是一成不变的,随着数据量的增加,bucket的数量也需要增大。


官方支持后期手工调整表中的bucket数量,但是想要对bucket中的数据进行重新分布,则只能通过离线流程来完成,也就是说需要跑一个离线任务来重新对bucket中的数据进行分布。


##### 3.2.1.2 Changelog Producers


Changelog Producers:可以翻译为变更日志生产者。


Paimon表中存储数据的时候,除了存储数据本身,还可以选择存储数据的变更日志,也就是Changelog。


Changelog 主要应用在流读场景,在构建实时数据仓库的过程中,我们需要通过流读取上游的数据写入到下游,完成数仓各层之间的数据同步,让整个数仓的数据实时地流动起来。


如果上游数据来源于MySQL的 Binlog 日志,这样是可以直接提供完整的 Changelog 以供流来读取的。


但是针对湖仓一体架构,数仓分层是在Paimon里面实现的,数据会以表格的形式存储在文件系统中。  
 如果下游的Flink任务要通过流读取Paimon表中的数据,则需要Paimon的存储系统帮助生成 Changelog,以便下游流读。  
 此时就需要我们在建表时指定参数`changelog-producer`来决定在何时以何种方式生成Changelog。


如果不指定Changelog Producer则不会向Paimon表中写入数据的时候生成 Changelog,那么下游任务需要在流读时生成一个Changelog Normalize物化节点来产生Changelog。  
 这种方式的成本相对比较高,并且官方也不建议这样使用,因为下游任务会在状态中维护一份全量的数据,也就是说每条数据都需要保存在状态中便于任务在执行时生成Changelog。



> 
> 可能大家在这会有一个疑问,为什么一定需要Changelog呢?
> 
> 
> 


因为通过Changelog可以记录数据的中间变化,针对某些计算逻辑,我们需要知道数据之前的历史值是什么,这样才能得到正确的结果。


例如:我们接收到的数据中包含了相同主键的多条 INSERT 数据,这样会导致下游的流聚合任务有问题,因为相同主键的多条数据应该被认为是更新,而不是重复累加计算。


Paimon 支持的 `Changelog Produers`主要包括这几种:`None、Input、Lookup和Full Compaction`。


下面我们来详细分析一下:


###### (1)None


如果不指定`changelog-producer`,默认就是 `none`,此时存储数据的时候不会存储数据的Changelog,后期读取数据时会动态生成Changelog,成本较高,不建议使用。  
 ![在这里插入图片描述](https://img-blog.csdnimg.cn/5d013c4235fd46849ddcd367821b3fa9.png#pic_center)


看这个图,此时这个数据源可以是任意类型的数据源,假设数据源依次产生了`+I,+U,-D`类型的数据,其实这里面缺少了`-U`类型的数据,我们通过Paimon 的SinkWriter组件将这些数据写入到了Paimon表中。


注意:此时这个Paimon表中配置的`changelog-producer`参数的值为`none`。


此时在向Paimon表中写入数据的时候,这个表只会存储数据本身,不会存储数据的Changelog。


当我们再通过一个任务从这个Paimon表中读取数据的时候,这个任务只能读取到`+I、+U和-D`类型的数据,但是这个任务会产生一个`Changelog Normalize`物化节点来自己生成数据的Changelog,但是这个操作是非常昂贵的,因为它需要在状态中维护数据的所有历史变化情况来生成数据的Changelog。最终是可以获取到完整的`+I、-U、+U、-D`类型的数据的。


下面我们来通过一个案例具体演示一下建表语句中指定`changelog-producer=none`时的效果。


创建package:`tech.xuwei.paimon.changelogproducer.none`


创建object:`FlinkDataStreamWriteToPaimonForNone`


这个Object负责向Paimon表中模拟写入数据。


代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

package tech.xuwei.paimon.changelogproducer.none

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**

  • 使用Flink DataStream API向Paimon表中写入数据

  • Created by xuwei
    */
    object FlinkDataStreamWriteToPaimonForNone {
    def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
    Row.ofKind(RowKind.INSERT, “jack”, Int.box(10))//+I
    //Row.ofKind(RowKind.UPDATE_AFTER, “jack”, Int.box(11))//+U
    //Row.ofKind(RowKind.DELETE, “jack”, Int.box(11))//-D
    )(Types.ROW_NAMED(Array(“name”, “age”),Types.STRING,Types.INT))

    //将DataStream转换为Table
    val schema = Schema.newBuilder()
    .column(“name”, DataTypes.STRING().notNull())//主键非空
    .column(“age”, DataTypes.INT())
    .primaryKey(“name”)//指定主键
    .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
    “”"
    |CREATE CATALOG paimon_catalog WITH (
    | ‘type’=‘paimon’,
    | ‘warehouse’=‘hdfs://bigdata01:9000/paimon’
    |)
    |“”".stripMargin)
    tEnv.executeSql(“USE CATALOG paimon_catalog”)

    //注册临时表
    tEnv.createTemporaryView(“t1”,table)

    //创建Paimon类型的表
    tEnv.executeSql(
    “”"
    |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
    |CREATE TABLE IF NOT EXISTS changelog_none (
    | name STRING,
    | age INT,
    | PRIMARY KEY (name) NOT ENFORCED
    |) WITH (
    | ‘changelog-producer’ = ‘none’ – 注意:值为none时这一行配置可以省略不写
    |)
    |“”".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
    “”"
    |INSERT INTO changelog_none
    |SELECT name,age FROM t1
    |“”".stripMargin)
    }

}


注意:在执行代码的时候通过修改`env.fromElements(...)`中的注释来实现实时产生多种类型数据的效果。


接下来创建Object:`FlinkDataStreamReadFromPaimonForNone`


这个Object负责从Paimon表中实时读取数据。


注意:为了便于在本地观察Flink读取数据任务中自动生成的`Changelog Normalize`物化节点,所以我们需要在代码中开启本地WebUI功能。


先在`pom.xml`中引入相关的依赖:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
org.apache.flink flink-runtime-web 1.15.0

代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5

package tech.xuwei.paimon.changelogproducer.none

import java.time.ZoneId

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**

  • 使用Flink DataStream API从Paimon表中读取数据

  • Created by xuwei
    */
    object FlinkDataStreamReadFromPaimonForNone {
    def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,“8081”)
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)

    //禁用Chain,把多个算子拆分开单独执行,便于在开发和测试阶段观察,正式执行时不需要禁用Chain
    env.disableOperatorChaining()

    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
    “”"
    |CREATE CATALOG paimon_catalog WITH (
    | ‘type’=‘paimon’,
    | ‘warehouse’=‘hdfs://bigdata01:9000/paimon’
    |)
    |“”".stripMargin)
    tEnv.executeSql(“USE CATALOG paimon_catalog”)

    //执行SQL查询,打印输出结果
    val execSql =
    “”"
    |SELECT * FROM changelog_none – 此时默认只能查到数据的最新值
    |-- /*+ OPTIONS(‘scan.mode’=‘from-snapshot’,‘scan.snapshot-id’ = ‘1’) */ – 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
    |“”".stripMargin
    val table = tEnv.sqlQuery(execSql)
    table.execute().print()

}

}


接下来先运行`FlinkDataStreamWriteToPaimonForNone`向Paimon表中写入`+I`类型的数据。


再运行`FlinkDataStreamReadFromPaimonForNone`负责读取数据。  
 此时可以看到控制台输出如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

±—±-------------------------------±------------+
| op | name | age |
±—±-------------------------------±------------+
| +I | jack | 10 |


修改`FlinkDataStreamWriteToPaimonForNone`中的代码,继续执行,向Paimon表中写入`+U`类型的数据。



  • 1
  • 2
  • 3
  • 4
  • 5

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
//Row.ofKind(RowKind.INSERT, “jack”, Int.box(10))//+I
Row.ofKind(RowKind.UPDATE_AFTER, “jack”, Int.box(11))//+U
//Row.ofKind(RowKind.DELETE, “jack”, Int.box(11))//-D
)(Types.ROW_NAMED(Array(“name”, “age”),Types.STRING,Types.INT))


此时可以在`FlinkDataStreamReadFromPaimonForNone`的控制台看到如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5

| -U | jack | 10 |
| +U | jack | 11 |


注意:虽然我们向Paimon表中只写入了`+U`类型的数据,但是此时从Paimon表中读取数据的时候是可以产生`-U类`型的数据的。


为什么会这样呢?  
 此时这个Paimon表中设置的`'changelog-producer' = 'none'`,我们在向这个表中写入数据的时候,他只会存储数据本身,不会存储changelog数据,可以到这个表对应的hdfs 数据目录中确认一下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/changelog_none/bucket-0
Found 2 items
-rw-r–r-- 3 yehua supergroup 582 2028-12-10 17:19 /paimon/default.db/changelog_none/bucket-0/data-2449a680-5a0b-4496-970a-5a0fdac78cfc-0.orc
-rw-r–r-- 3 yehua supergroup 566 2028-12-10 17:16 /paimon/default.db/changelog_none/bucket-0/data-c3e8ff17-6c86-4c0b-95d7-82a1f2091a5e-0.orc


注意:此时只能看到两个data开头的数据文件,没有changelog开头的数据文件,这说明我们的配置生效了。


咱们之前在创建user表的时候指定了`'changelog-producer' = 'input'`,这种情况下是会在数据目录中保存changelog数据文件的,可以到user表的hdfs目录中看一下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/user/bucket-0 Found 2 items
-rw-r–r-- 3 yehua supergroup 617 2028-11-28 17:50 /paimon/default.db/user/bucket-0/changelog-eb48583e-f8c9-424f-9000-05a2bbce7a2b-0.orc
-rw-r–r-- 3 yehua supergroup 581 2028-11-28 17:50 /paimon/default.db/user/bucket-0/data-eb48583e-f8c9-424f-9000-05a2bbce7a2b-1.orc


注意:这里面有一个changelog开头的文件,所以这里面存储的就是changelog数据。


那我们现在创建的`changelog_none`这个表设置的是`'changelog-producer' = 'none'`,所以它不会存储changelog数据,那为什么在读取数据的时候也可以读取到changelog数据呢?


咱们前面解释过,如果`changelog-producer`设置为none或者不设置,那么下游任务会在流读时生成一个`Changelog Normalize`物化节点来产生Changelog。


其实这个`Changelog Normalize`物化节点我们也可以到Flink任务的Web UI界面中查看验证一下:  
 ![在这里插入图片描述](https://img-blog.csdnimg.cn/fcd551d0e04c47deb85d5a5c18e704da.png#pic_center)


在这可以看到,Flink任务中确实会产生一个`Changelog Normalize`物化节点,所以此时我们看到的`-U`类型的Changelog变更数据就是这个物化节点产生的。


他具体是如何实现这种效果的呢?  
 其实也很简单,他只需要在状态中维护接收到的每一条历史数据即可,如果接收到了相同主键的多条数据,那么它就知道是发生了数据更新这种行为,对应的就会补全`-U和+U`这种形式的数据。但是我们前面说了,这种方式成本较高,不建议使用,因为它需要在状态中维护数据的所有历史值。


接下来我们继续运行`FlinkDataStreamWriteToPaimonForNone`向Paimon表中写入`-D`类型的数据。



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

val dataStream = env.fromElements(
//Row.ofKind(RowKind.INSERT, “jack”, Int.box(10))//+I
//Row.ofKind(RowKind.UPDATE_AFTER, “jack”, Int.box(11))//+U
Row.ofKind(RowKind.DELETE, “jack”, Int.box(11))//-D
)(Types.ROW_NAMED(Array(“name”, “age”),Types.STRING,Types.INT))


此时可以在`FlinkDataStreamReadFromPaimonForNone`的控制台看到如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5

| -D | jack | 11 |


到目前为止,我们向这个Paimon表中写入了3条数据,`+I,+U和-D`。


下面我们停止`FlinkDataStreamReadFromPaimonForNone`这个实时读取任务。


停止了之后,再重新运行`FlinkDataStreamReadFromPaimonForNone`这个实时读取任务,可以看到如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11


结果什么也没有看到,为什么呢?


因为此时我们执行的SQL查询语句默认只能查到数据的最新值,但是数据最新的情况是被删除了,所以什么也没有查到,这是正常的,也是正确的。


如果想要从头查看,需要指定`scan`相关的参数:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

val execSql =
“”"
|SELECT * FROM changelog_none – 此时默认只能查到数据的最新值
|/*+ OPTIONS(‘scan.mode’=‘from-snapshot’,‘scan.snapshot-id’ = ‘1’) */ – 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
|“”".stripMargin


重新运行`FlinkDataStreamReadFromPaimonForNone`这个实时读取任务,此时可以看到如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5

±—±-------------------------------±------------+
| op | name | age |
±—±-------------------------------±------------+
| +I | jack | 10 |
| -U | jack | 10 |
| +U | jack | 11 |
| -D | jack | 11 |


因为是从头读取的数据,所以借助于`Changelog Normalize`物化节点,在状态中维护历史接收到的数据,这样就可以获取到完整的Changelog数据了。


###### (2)Input


如果将`changelog-producer`指定为input,表示在向Paimon表中存储数据的时候会将数据源中的Changelog也存储到Paimon表中的Changelog文件中。


典型应用场景是这样的:数据源是MySQL的binlog日志,此时数据源中具有完整的Changelog,所以可以完全依赖数据源中的Changelog,并且后续可以将这份Changelog提供给下游任务读取时使用。这样下游任务读取数据时就不需要产生`Changelog Normalize`物化节点了。


注意:如果我们把MySQL的binlog日志实时写入到了Kafka中,那么Kafka中存储的数据也相当于具有了完整的Changelog,此时在从Kafka这个数据源中读取数据的时候也是可以将`changelog-producer`设置为input的。  
 ![在这里插入图片描述](https://img-blog.csdnimg.cn/bb5112c4f837442985d76a4c429da44c.png#pic_center)


看这个图,当我们通过Flink CDC去采集数据库中的数据的时候,是可以获取到数据库中的所有Changelog变更日志数据的,所以里面会包含完整的`+I、-U、+U、-D`这些类型的数据。


此时在Paimon中创建表的时候,就可以指定`changelog-producer=input`,这样在存储数据的时候就会单独存储一份Changelog File。


下游任务在从Paimon表中读取数据的时候就不需要再产生`Changelog Normalize`物化节点生成Changelog了,直接从Paimon表中读取Changelog File即可获取到完整的Changelog数据。


下面我们来具体演示一下建表语句中指定`changelog-producer=input`时的效果


创建package:`tech.xuwei.paimon.changelogproducer.input`


创建object:`FlinkDataStreamWriteToPaimonForInput`


这个Object负责向Paimon表中模拟写入数据。


代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

package tech.xuwei.paimon.changelogproducer.input

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**

  • 使用Flink DataStream API向Paimon表中写入数据

  • Created by xuwei
    */
    object FlinkDataStreamWriteToPaimonForInput {
    def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
    Row.ofKind(RowKind.INSERT, “jack”, Int.box(10))//+I
    //Row.ofKind(RowKind.UPDATE_BEFORE, “jack”, Int.box(10))//-U
    //Row.ofKind(RowKind.UPDATE_AFTER, “jack”, Int.box(11))//+U
    //Row.ofKind(RowKind.DELETE, “jack”, Int.box(11))//-D
    )(Types.ROW_NAMED(Array(“name”, “age”),Types.STRING,Types.INT))

    //将DataStream转换为Table
    val schema = Schema.newBuilder()
    .column(“name”, DataTypes.STRING().notNull())//主键非空
    .column(“age”, DataTypes.INT())
    .primaryKey(“name”)//指定主键
    .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
    “”"
    |CREATE CATALOG paimon_catalog WITH (
    | ‘type’=‘paimon’,
    | ‘warehouse’=‘hdfs://bigdata01:9000/paimon’
    |)
    |“”".stripMargin)
    tEnv.executeSql(“USE CATALOG paimon_catalog”)

    //注册临时表
    tEnv.createTemporaryView(“t1”,table)

    //创建Paimon类型的表
    tEnv.executeSql(
    “”"
    |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
    |CREATE TABLE IF NOT EXISTS changelog_input (
    | name STRING,
    | age INT,
    | PRIMARY KEY (name) NOT ENFORCED
    |) WITH (
    | ‘changelog-producer’ = ‘input’
    |)
    |“”".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
    “”"
    |INSERT INTO changelog_input
    |SELECT name,age FROM t1
    |“”".stripMargin)
    }

}


注意:在执行代码的时候通过修改`env.fromElements(...)`中的注释来实现实时产生多种类型数据的效果。


接下来创建Object:`FlinkDataStreamReadFromPaimonForInput`


这个Object负责从Paimon表中实时读取数据。


代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

package tech.xuwei.paimon.changelogproducer.input

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**

  • 使用Flink DataStream API从Paimon表中读取数据

  • Created by xuwei
    */
    object FlinkDataStreamReadFromPaimonForInput {
    def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,“8081”)
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)

    //禁用Chain,把多个算子拆分开单独执行,便于在开发和测试阶段观察,正式执行时不需要禁用Chain
    env.disableOperatorChaining()

    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
    “”"
    |CREATE CATALOG paimon_catalog WITH (
    | ‘type’=‘paimon’,
    | ‘warehouse’=‘hdfs://bigdata01:9000/paimon’
    |)
    |“”".stripMargin)
    tEnv.executeSql(“USE CATALOG paimon_catalog”)

    //执行SQL查询,打印输出结果
    val execSql =
    “”"
    |SELECT * FROM changelog_input – 此时默认只能查到数据的最新值
    |-- /*+ OPTIONS(‘scan.mode’=‘from-snapshot’,‘scan.snapshot-id’ = ‘1’) */ – 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
    |“”".stripMargin
    val table = tEnv.sqlQuery(execSql)
    table.execute().print()

}

}


接下来先运行`FlinkDataStreamWriteToPaimonForInput`向Paimon表中写入`+I`类型的数据。


再运行`FlinkDataStreamReadFromPaimonForInput`负责读取数据。  
 此时可以看到控制台输出如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

±—±-------------------------------±------------+
| op | name | age |
±—±-------------------------------±------------+
| +I | jack | 10 |


来看一下这个Flink任务的Web UI界面  
 ![在这里插入图片描述](https://img-blog.csdnimg.cn/426d219dcf2844b0a56ca8dac531ec25.png#pic_center)


在这可以发现,此时这个任务中没有产生Changelog Normalize物化节点,因为我们在Paimon表中指定了`changelog-producer=input`,所以这个Paimon表内部会自己存储Changelog数据。


此时到这个Paimon表的hdfs数据目录中查看一下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/changelog_input/bucket-0
Found 2 items
-rw-r–r-- 3 yehua supergroup 566 2028-12-11 11:14 /paimon/default.db/changelog_input/bucket-0/changelog-bc3740e4-6adf-4e94-9d4e-c1ece10ed114-0.orc
-rw-r–r-- 3 yehua supergroup 566 2028-12-11 11:14 /paimon/default.db/changelog_input/bucket-0/data-bc3740e4-6adf-4e94-9d4e-c1ece10ed114-1.orc


在这里可以发现里面有两个文件,一个以data开头的文件,里面存储的是数据自身。还有一个以changelog开头的文件,里面存储的是changelog变更数据。


修改`FlinkDataStreamWriteToPaimonForInput`中的代码,继续执行,向Paimon表中写入`-U`类型的数据。



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
//Row.ofKind(RowKind.INSERT, “jack”, Int.box(10))//+I
Row.ofKind(RowKind.UPDATE_BEFORE, “jack”, Int.box(10))//-U
//Row.ofKind(RowKind.UPDATE_AFTER, “jack”, Int.box(11))//+U
//Row.ofKind(RowKind.DELETE, “jack”, Int.box(11))//-D
)(Types.ROW_NAMED(Array(“name”, “age”),Types.STRING,Types.INT))


此时可以在`FlinkDataStreamReadFromPaimonForInput`的控制台看到如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5

| -U | jack | 10 |


再修改`FlinkDataStreamWriteToPaimonForInput`中的代码,继续执行,向Paimon表中写入`+U`类型的数据。



  • 1
  • 2
  • 3
  • 4
  • 5

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
//Row.ofKind(RowKind.INSERT, “jack”, Int.box(10))//+I
//Row.ofKind(RowKind.UPDATE_BEFORE, “jack”, Int.box(10))//-U
Row.ofKind(RowKind.UPDATE_AFTER, “jack”, Int.box(11))//+U
//Row.ofKind(RowKind.DELETE, “jack”, Int.box(11))//-D
)(Types.ROW_NAMED(Array(“name”, “age”),Types.STRING,Types.INT))


此时可以在`FlinkDataStreamReadFromPaimonForInput`的控制台看到如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5

| +U | jack | 11 |


再修改`FlinkDataStreamWriteToPaimonForInput`中的代码,继续执行,向Paimon表中写入`-D`类型的数据。



  • 1
  • 2
  • 3
  • 4
  • 5

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
//Row.ofKind(RowKind.INSERT, “jack”, Int.box(10))//+I
//Row.ofKind(RowKind.UPDATE_BEFORE, “jack”, Int.box(10))//-U
//Row.ofKind(RowKind.UPDATE_AFTER, “jack”, Int.box(11))//+U
Row.ofKind(RowKind.DELETE, “jack”, Int.box(11))//-D
)(Types.ROW_NAMED(Array(“name”, “age”),Types.STRING,Types.INT))


此时可以在`FlinkDataStreamReadFromPaimonForInput`的控制台看到如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5

| -D | jack | 11 |


下面我们停止`FlinkDataStreamReadFromPaimonForInput`这个实时读取任务。


停止了之后,修改一下代码,因为默认只会读取最新的数据快照



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

val execSql =
“”"
|SELECT * FROM changelog_input – 此时默认只能查到数据的最新值
|/*+ OPTIONS(‘scan.mode’=‘from-snapshot’,‘scan.snapshot-id’ = ‘1’) */ – 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
|“”".stripMargin


再重新运行`FlinkDataStreamReadFromPaimonForNone`这个实时读取任务,可以看到这个结果:



  • 1
  • 2
  • 3
  • 4
  • 5

±—±-------------------------------±------------+
| op | name | age |
±—±-------------------------------±------------+
| +I | jack | 10 |
| -U | jack | 10 |
| +U | jack | 11 |
| -D | jack | 11 |


注意:此时可以看到完整的数据变更情况,这是依赖于Paimon表中存储的changelog文件实现的,没有依赖于Flink任务中的`Changelog Normalize`物化节点。


所以说,如果我们数据源中可以提供完整的changelog数据,那么建议给存储数据的Paimon表设置`changelog-producer=input`,这样下游任务读取这个Paimon表的时候就可以直接从表中changelog文件里面获取变更数据了,不需要自己维护,效率比较高。


###### (3)Lookup


如果数据源中没有提供完整的 Changelog,并且我们也不想让下游任务在读取数据时通过Changelog Normalize物化节点来生成,那么这个时候我们可以考虑在Paimon表中配置 `changelog-producer=lookup`。


这样可以通过Lookup(查找)的方式在向Paimon表中写入数据的时候生成 Changelog。


但是需要注意:Lookup这种方式目前处于实验阶段,还没有经过大量的生产环境验证。


![在这里插入图片描述](https://img-blog.csdnimg.cn/1fbd5376a423421d8e721e725743ceb7.png#pic_center)


看这个图,此时这个数据源中没有提供完整的Changelog,这个数据源可以是任意类型的数据源,数据源中可能只有`+I、+U、-D`类型的数据,缺少了`-U`类型的数据。


但是由于我们在Paimon表中设置了`changelog-producer=lookup`,所以在通过`SinkWriter`向Paimon表中写入数据的时候,底层会通过Lookup的方式查找表中已有的数据,自动生成`Changelog File`,补全`-U`类型的变更日志。


这样下游任务在读取这个Paimon表的时候就可以直接从表对应的`Changelog File`中读取到完整的`+I、-U、+U、-D`类型的数据了。


下面我们来具体演示一下建表语句中指定`changelog-producer=lookup`时的效果


创建package:`tech.xuwei.paimon.changelogproducer.lookup`


基于创建Object:`FlinkDataStreamWriteToPaimonForLookup`


这个Object负责向Paimon表中模拟写入数据。


代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

package tech.xuwei.paimon.changelogproducer.lookup

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**

  • 使用Flink DataStream API向Paimon表中写入数据

  • Created by xuwei
    */
    object FlinkDataStreamWriteToPaimonForLookup {
    def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
    Row.ofKind(RowKind.INSERT, “jack”, Int.box(10))//+I
    //Row.ofKind(RowKind.UPDATE_AFTER, “jack”, Int.box(11))//+U
    //Row.ofKind(RowKind.DELETE, “jack”, Int.box(11))//-D
    )(Types.ROW_NAMED(Array(“name”, “age”),Types.STRING,Types.INT))

    //将DataStream转换为Table
    val schema = Schema.newBuilder()
    .column(“name”, DataTypes.STRING().notNull())//主键非空
    .column(“age”, DataTypes.INT())
    .primaryKey(“name”)//指定主键
    .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
    “”"
    |CREATE CATALOG paimon_catalog WITH (
    | ‘type’=‘paimon’,
    | ‘warehouse’=‘hdfs://bigdata01:9000/paimon’
    |)
    |“”".stripMargin)
    tEnv.executeSql(“USE CATALOG paimon_catalog”)

    //注册临时表
    tEnv.createTemporaryView(“t1”,table)

    //创建Paimon类型的表
    tEnv.executeSql(
    “”"
    |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
    |CREATE TABLE IF NOT EXISTS changelog_lookup (
    | name STRING,
    | age INT,
    | PRIMARY KEY (name) NOT ENFORCED
    |) WITH (
    | ‘changelog-producer’ = ‘lookup’
    |)
    |“”".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
    “”"
    |INSERT INTO changelog_lookup
    |SELECT name,age FROM t1
    |“”".stripMargin)
    }

}


注意:在执行代码的时候通过修改`env.fromElements(...)`中的注释来实现实时产生多种类型数据的效果。


接下来创建Object:`FlinkDataStreamReadFromPaimonForLookup`


这个Object负责从Paimon表中实时读取数据。


代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

package tech.xuwei.paimon.changelogproducer.lookup

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**

  • 使用Flink DataStream API从Paimon表中读取数据

  • Created by xuwei
    */
    object FlinkDataStreamReadFromPaimonForLookup {
    def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,“8081”)
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)

    //禁用Chain,把多个算子拆分开单独执行,便于在开发和测试阶段观察,正式执行时不需要禁用Chain
    env.disableOperatorChaining()

    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
    “”"
    |CREATE CATALOG paimon_catalog WITH (
    | ‘type’=‘paimon’,
    | ‘warehouse’=‘hdfs://bigdata01:9000/paimon’
    |)
    |“”".stripMargin)
    tEnv.executeSql(“USE CATALOG paimon_catalog”)

    //执行SQL查询,打印输出结果
    val execSql =
    “”"
    |SELECT * FROM changelog_lookup – 此时默认只能查到数据的最新值
    |-- /*+ OPTIONS(‘scan.mode’=‘from-snapshot’,‘scan.snapshot-id’ = ‘1’) */ – 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
    |“”".stripMargin
    val table = tEnv.sqlQuery(execSql)
    table.execute().print()

}

}


接下来先运行`FlinkDataStreamWriteToPaimonForLookup`向Paimon表中写入+I类型的数据。


再运行`FlinkDataStreamReadFromPaimonForLookup`负责读取数据。  
 此时可以看到控制台输出如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

±—±-------------------------------±------------+
| op | name | age |
±—±-------------------------------±------------+
| +I | jack | 10 |


来看一下这个Flink任务的Web UI界面  
 ![在这里插入图片描述](https://img-blog.csdnimg.cn/896ac30e704548078972651a826ca196.png#pic_center)


在这可以发现,此时这个任务中没有产生`Changelog Normalize`物化节点,因为我们在Paimon表中指定了`changelog-producer=lookup`,Changelog数据会在我们向Paimon表中写入数据的时候通过Lookup产生。


到这个Paimon表的hdfs数据目录里面查看一下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/changelog_lookup/bucket-0
Found 3 items
-rw-r–r-- 3 yehua supergroup 566 2028-12-11 12:01 /paimon/default.db/changelog_lookup/bucket-0/changelog-edb23cdc-09be-4437-b2ac-716e06e25c6d-1.orc
-rw-r–r-- 3 yehua supergroup 566 2028-12-11 12:01 /paimon/default.db/changelog_lookup/bucket-0/data-edb23cdc-09be-4437-b2ac-716e06e25c6d-0.orc
-rw-r–r-- 3 yehua supergroup 566 2028-12-11 12:01 /paimon/default.db/changelog_lookup/bucket-0/data-f07e00b5-a815-4d64-b8d6-1b8a2e64dab6-0.orc


在这可以发现,里面有1个changelog开头的文件,这个就是Lookup产生的。


修改`FlinkDataStreamWriteToPaimonForLookup`中的代码,继续执行,向Paimon表中写入`+U`类型的数据。



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
//Row.ofKind(RowKind.INSERT, “jack”, Int.box(10))//+I
Row.ofKind(RowKind.UPDATE_AFTER, “jack”, Int.box(11))//+U
//Row.ofKind(RowKind.DELETE, “jack”, Int.box(11))//-D
)(Types.ROW_NAMED(Array(“name”, “age”),Types.STRING,Types.INT))


此时可以在`FlinkDataStreamReadFromPaimonForLookup`的控制台看到如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5

| -U | jack | 10 |
| +U | jack | 11 |


注意:虽然我们向Paimon表中只写入了+U类型的数据,但是Lookup在生成changelog的时候会自动补全`-U`类型的数据。


后面的`-D`类型的数据就不再演示了,效果和前面是一样的。


所以说,Lookup这种方式属于一种折中方案,数据源里面无法提供完整的changelog变更日志,所以无法使用Input,但是我们还想摆脱昂贵的`Changelog Normalize`物化节点,这个时候就可以考虑Lookup了。


最后还需要注意,Lookup这种方式虽然不需要产生`Changelog Normalize`物化节点,但是他在生成Changelog的时候依然会消耗一部分资源的,因为它需要触发数据查找这个过程,只不过消耗的资源比`Changelog Normalize`物化节点这种方式低一些。


###### (4)Full Compaction


如果你的数据源无法提供完整的changelog变更日志数据,并且你觉得Lookup这种方式还是比较消耗资源,此时可以考虑使用`Full Compaction`这种方式,在创建Paimon表的时候指定`changelog-producer=full-compaction`。


Full Compaction这种方式可以解耦写入数据和生成changelog这两个步骤。  
 也就是说我们会先把数据写入到Paimon表中,当表中的数据触发完全压缩之后,Paimon 会比较两次完全压缩之间的结果并生成差异作为changelog(变更日志),生成changelog的延迟会受到完全压缩频率的影响。


通过指定`full-compaction.delta-commits`表属性,表示在增量提交Checkpoint后将会触发完全压缩。默认情况下值为1,所以每次提交Checkpoint都会进行完全压缩并生成changelog。  
 这样其实对生成changelog的延迟没有特别大的影响。


Full Compaction这种方式可以为任何类型的数据源生成完整的changelog变更日志。但是它没有Input方式的效率高,并且生成changelog的延迟可能会比较高。


不过Full Compaction这种方式解耦了写入数据和生成changelog这两个步骤,他的资源消耗比Lookup这种方式要低一些。  
 ![在这里插入图片描述](https://img-blog.csdnimg.cn/5973104d317d4ca4a04174ad93ede03d.png#pic_center)


看这个图,此时这个数据源中没有提供完整的Changelog,这个数据源可以是任意类型的数据源,数据源中可能只有`+I、+U、-D`的数据,缺少了`-U`类型的数据。


但是由于我们在Paimon表中设置了`changelog-producer=full-compaction`,所以Paimon会周期性的比较两次完全压缩(Full Compaction)之间的结果并生成差异作为changelog(变更日志),并且在Changelog中补全缺失的变更日志。


这样下游任务在读取这个Paimon表的时候就可以从表对应的Changelog File中读取到完整的`+I、-U、+U、-D`类型的数据了。


下面我们来具体演示一下建表语句中指定`changelog-producer=full-compaction`时的效果


创建package:`tech.xuwei.paimon.changelogproducer.fullcompaction`


创建object:`FlinkDataStreamWriteToPaimonForFullcompaction`


这个Object负责向Paimon表中模拟写入数据。


代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

package tech.xuwei.paimon.changelogproducer.fullcompaction

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**

  • 使用Flink DataStream API向Paimon表中写入数据

  • Created by xuwei
    */
    object FlinkDataStreamWriteToPaimonForFullcompaction {
    def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
    Row.ofKind(RowKind.INSERT, “jack”, Int.box(10))//+I
    //Row.ofKind(RowKind.UPDATE_AFTER, “jack”, Int.box(11))//+U
    //Row.ofKind(RowKind.DELETE, “jack”, Int.box(11))//-D
    )(Types.ROW_NAMED(Array(“name”, “age”),Types.STRING,Types.INT))

    //将DataStream转换为Table
    val schema = Schema.newBuilder()
    .column(“name”, DataTypes.STRING().notNull())//主键非空
    .column(“age”, DataTypes.INT())
    .primaryKey(“name”)//指定主键
    .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
    “”"
    |CREATE CATALOG paimon_catalog WITH (
    | ‘type’=‘paimon’,
    | ‘warehouse’=‘hdfs://bigdata01:9000/paimon’
    |)
    |“”".stripMargin)
    tEnv.executeSql(“USE CATALOG paimon_catalog”)

    //注册临时表
    tEnv.createTemporaryView(“t1”,table)

    //创建Paimon类型的表
    tEnv.executeSql(
    “”"
    |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
    |CREATE TABLE IF NOT EXISTS changelog_fullcompaction (
    | name STRING,
    | age INT,
    | PRIMARY KEY (name) NOT ENFORCED
    |) WITH (
    | ‘changelog-producer’ = ‘full-compaction’,
    | ‘full-compaction.delta-commits’ = ‘1’
    |)
    |“”".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
    “”"
    |INSERT INTO changelog_fullcompaction
    |SELECT name,age FROM t1
    |“”".stripMargin)
    }

}


注意:在执行代码的时候通过修改`env.fromElements(...)`中的注释来实现实时产生多种类型数据的效果。


接下来创建Object:`FlinkDataStreamReadFromPaimonForFullcompaction`


这个Object负责从Paimon表中实时读取数据。


代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

package tech.xuwei.paimon.changelogproducer.fullcompaction

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**

  • 使用Flink DataStream API从Paimon表中读取数据

  • Created by xuwei
    */
    object FlinkDataStreamReadFromPaimonForFullcompaction {
    def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,“8081”)
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)

    //禁用Chain,把多个算子拆分开单独执行,便于在开发和测试阶段观察,正式执行时不需要禁用Chain
    env.disableOperatorChaining()

    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
    “”"
    |CREATE CATALOG paimon_catalog WITH (
    | ‘type’=‘paimon’,
    | ‘warehouse’=‘hdfs://bigdata01:9000/paimon’
    |)
    |“”".stripMargin)
    tEnv.executeSql(“USE CATALOG paimon_catalog”)

    //执行SQL查询,打印输出结果
    val execSql =
    “”"
    |SELECT * FROM changelog_fullcompaction – 此时默认只能查到数据的最新值
    |–/*+ OPTIONS(‘scan.mode’=‘from-snapshot’,‘scan.snapshot-id’ = ‘1’) */ – 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
    |“”".stripMargin
    val table = tEnv.sqlQuery(execSql)
    table.execute().print()

}

}


接下来先运行`FlinkDataStreamWriteToPaimonForFullcompaction`向Paimon表中写入`+I`类型的数据。


再运行`FlinkDataStreamReadFromPaimonForFullcompaction`负责读取数据。  
 此时可以看到控制台输出如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

±—±-------------------------------±------------+
| op | name | age |
±—±-------------------------------±------------+
| +I | jack | 10 |


来看一下这个Flink任务的Web UI界面  
 ![在这里插入图片描述](https://img-blog.csdnimg.cn/50edb2b1d63141cf836e64cfbbdf7f61.png#pic_center)


在这可以发现,此时这个任务中没有产生`Changelog Normalize`物化节点,其实只有我们把`Changelog Producer`设置为`none`的时候Flink任务才会产生`Changelog Normalize`物化节点。


那此时我们到这个Paimon表的hdfs数据目录里面查看一下有没有产生changelog文件:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/changelog_fullcompaction/bucket-0
Found 3 items
-rw-r–r-- 3 yehua supergroup 566 2028-12-11 16:20 /paimon/default.db/changelog_fullcompaction/bucket-0/changelog-264c4b74-10dd-493d-95e0-8f5760e90dc8-1.orc
-rw-r–r-- 3 yehua supergroup 566 2028-12-11 16:20 /paimon/default.db/changelog_fullcompaction/bucket-0/data-264c4b74-10dd-493d-95e0-8f5760e90dc8-0.orc
-rw-r–r-- 3 yehua supergroup 566 2028-12-11 16:20 /paimon/default.db/changelog_fullcompaction/bucket-0/data-d7adcc2a-804a-4a13-876a-fb77dc4a0952-0.orc


在这可以发现,里面有1个changelog开头的文件,这个就是Full Compaction这种方式产生的。


修改`FlinkDataStreamWriteToPaimonForFullcompaction`中的代码,继续执行,向Paimon表中写入`+U`类型的数据。



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
//Row.ofKind(RowKind.INSERT, “jack”, Int.box(10))//+I
Row.ofKind(RowKind.UPDATE_AFTER, “jack”, Int.box(11))//+U
//Row.ofKind(RowKind.DELETE, “jack”, Int.box(11))//-D
)(Types.ROW_NAMED(Array(“name”, “age”),Types.STRING,Types.INT))


此时可以在`FlinkDataStreamReadFromPaimonForFullcompaction`的控制台看到如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5

| -U | jack | 10 |
| +U | jack | 11 |


注意:这块可能会有一些延迟,具体的延迟程度要看完全压缩触发的频率,我们前面指定了`full-compaction.delta-commits`的值为1,表示在每次提交Checkpoint都会进行完全压缩并生成changelog,所以目前的延迟是比较低的。


但是我们需要注意:完全压缩是一个资源密集型的过程,会消耗一定的`CPU`和`磁盘IO`,因此过于频繁的完全压缩可能会导致写入速度变慢,所以这块也需要均衡考虑。


后面的`-D`类型的数据就不再演示了,效果和前面是一样的。


(5)总结  
 咱们前面一共讲了4种Changelog Producer。


* 在实际工作中None这种方式基本上是不使用的,成本太高。
* 如果数据源是完整的CDC数据,直接使用Input这种方式即可,成本最低,效率最高。
* 如果数据源中无法提供完整的Changelog,此时可以考虑使用Lookup和Full Compaction。
* 如果你觉得使用Lookup来实时生成 Changelog 成本过大,可以考虑通过Full Compaction和对应较大的延迟,以非常低的成本生成 Changelog。


##### 3.2.1.3 Merge Engines


Merge Engines:可以翻译为合并引擎。


针对多条相同主键的数据,Paimon主键表收到之后,应该如何进行合并处理?


针对这块的处理逻辑,Paimon提供了参数`merge-engine`,通过这个参数来指定如何合并数据。


`merge-engine`一共支持3种取值:


* deduplicate:默认值,表示去重,也就是说主键表默认只会保留相同主键最新的数据。
* partial-update:表示局部更新,通过相同主键的多条数据来更新不同字段的值。
* aggregation:表示聚合,可以对相同主键的多条数据根据指定的字段进行聚合。


下面我们来详细分析一下这几种合并引擎。


###### (1)Deduplicate


如果我们在Paimon中创建主键表时不指定`merge-engine`参数,那么默认值就是`deduplicate` 。


此时只保留主键最新的数据,之前表中相同主键的数据会被丢弃。


注意:如果主键最新的数据是`-D`类型的,那么这个主键的所有数据都会被删除。


下面我们来具体演示一下。  
 核心的思路是这样的:我们通过数据源模拟产生2条相同主键的+I类型的数据,依次写入到主键表中,最终发现主键表中只会保留最新的那一条数据。


创建package:`tech.xuwei.paimon.mergeengine.deduplicate`  
 创建object:`FlinkDataStreamWriteToPaimonForDeduplicate`


这个Object负责向Paimon表中模拟写入数据。  
 代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

package tech.xuwei.paimon.mergeengine.deduplicate

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**

  • 使用Flink DataStream API向Paimon表中写入数据

  • Created by xuwei
    */
    object FlinkDataStreamWriteToPaimonForDeduplicate {
    def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
    Row.ofKind(RowKind.INSERT, “jack”, Int.box(10))//+I
    //Row.ofKind(RowKind.INSERT, “jack”, Int.box(12))//+I
    )(Types.ROW_NAMED(Array(“name”, “age”),Types.STRING,Types.INT))

    //将DataStream转换为Table
    val schema = Schema.newBuilder()
    .column(“name”, DataTypes.STRING().notNull())//主键非空
    .column(“age”, DataTypes.INT())
    .primaryKey(“name”)//指定主键
    .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
    “”"
    |CREATE CATALOG paimon_catalog WITH (
    | ‘type’=‘paimon’,
    | ‘warehouse’=‘hdfs://bigdata01:9000/paimon’
    |)
    |“”".stripMargin)
    tEnv.executeSql(“USE CATALOG paimon_catalog”)

    //注册临时表
    tEnv.createTemporaryView(“t1”,table)

    //创建Paimon类型的表
    tEnv.executeSql(
    “”"
    |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
    |CREATE TABLE IF NOT EXISTS merge_engine_deduplicate (
    | name STRING,
    | age INT,
    | PRIMARY KEY (name) NOT ENFORCED
    |) WITH (
    | ‘merge-engine’ = ‘deduplicate’ – 注意:值为deduplicate时这一行配置可以省略不写
    |)
    |“”".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
    “”"
    |INSERT INTO merge_engine_deduplicate
    |SELECT name,age FROM t1
    |“”".stripMargin)
    }

}


注意:在执行代码的时候通过修改`env.fromElements(...)`中的注释来实现实时产生多条`+I`类型数据的效果。


接下来创建Object:`FlinkDataStreamReadFromPaimonForDeduplicate`


这个Object负责从Paimon表中实时读取数据。


代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

package tech.xuwei.paimon.mergeengine.deduplicate

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**

  • 使用Flink DataStream API从Paimon表中读取数据

  • Created by xuwei
    */
    object FlinkDataStreamReadFromPaimonForDeduplicate {
    def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,“8081”)
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)

    //禁用Chain,把多个算子拆分开单独执行,便于在开发和测试阶段观察,正式执行时不需要禁用Chain
    env.disableOperatorChaining()

    val tEnv = StreamTableEnvironment.create(env)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
    “”"
    |CREATE CATALOG paimon_catalog WITH (
    | ‘type’=‘paimon’,
    | ‘warehouse’=‘hdfs://bigdata01:9000/paimon’
    |)
    |“”".stripMargin)
    tEnv.executeSql(“USE CATALOG paimon_catalog”)

    //执行SQL查询,打印输出结果
    val execSql =
    “”"
    |SELECT * FROM merge_engine_deduplicate – 此时默认只能查到数据的最新值
    |-- /*+ OPTIONS(‘scan.mode’=‘from-snapshot’,‘scan.snapshot-id’ = ‘1’) */ – 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
    |“”".stripMargin
    val table = tEnv.sqlQuery(execSql)
    table.execute().print()

}

}


接下来先运行`FlinkDataStreamWriteToPaimonForDeduplicate`向Paimon表中写入一条`+I`类型的数据。


再运行`FlinkDataStreamReadFromPaimonForDeduplicate`负责读取数据。  
 此时可以看到控制台输出如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

±—±-------------------------------±------------+
| op | name | age |
±—±-------------------------------±------------+
| +I | jack | 10 |


修改`FlinkDataStreamWriteToPaimonForDeduplicate`中的代码,继续执行,向Paimon表中写入第2条`+I`类型的数据。  
 注意:这两条数据的主键是相同的。



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

//手工构造一个Changelog DataStream 数据流
val dataStream = env.fromElements(
//Row.ofKind(RowKind.INSERT, “jack”, Int.box(10))//+I
//Row.ofKind(RowKind.INSERT, “jack”, Int.box(12))//+I
)(Types.ROW_NAMED(Array(“name”, “age”),Types.STRING,Types.INT))


此时可以在`FlinkDataStreamReadFromPaimonForDeduplicate`的控制台看到如下结果:



  • 1
  • 2
  • 3
  • 4
  • 5

| -U | jack | 10 |
| +U | jack | 12 |


从这可以看出来,之前的数据被删除了,新增了一条年龄为12的数据。  
 所以`deduplicate`这种表引擎只会保留相同主键最新的数据。


###### (2)Partial Update


如果我们在Paimon中创建主键表时指定`merge-engine`的值为`partial-update`,那么就可以实现局部更新数据字段的效果。


举个例子:使用多个 Flink流任务去更新同一张表,每个流任务只更新一张表的部分列,最终实现一行完整数据的更新。对于需要构建宽表的业务场景,使用`partial-update`是非常合适的,并且构建宽表的操作也比较简单。



> 
> 注意:这里所说的多个Flink 流任务并不是指多个Flink Job并发写同一张Paimon表,这样比较麻烦。目前推荐的是将多个Flink流任务 `UNION ALL` 起来,最终启动一个Flink Job 向Paimon表中写入数据。
> 
> 
> 


还有一点需要注意的是:`partial-update`这种表引擎不支持流读,需要结合`Lookup`或者`full-compaction`变更日志生产者一起使用才可以支持流读。


同时由于`partial-update`不能接收和处理`DELETE`消息,为了避免接收到DELETE消息报错,需要在建表语句中配置`partial-update.ignore-delete= true`表示忽略 DELETE消息。


下面我们来具体演示一下:


核心思路是这样的,准备模拟产生3条+I类型的数据,数据内容大致是这样的。



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

<jack, 10, 175, null>
<jack, null, null, beijing>
<jack, 11, null, null>


将这3条数据写入到Paimon主键表之后,会得到什么结果呢?  
 结果是这样的:`<jack, 11, 175, beijing>`


为什么呢?因为`null`字段不会覆盖更新字段的值。


创建package:`tech.xuwei.paimon.mergeengine.partialupdate`  
 创建object:`FlinkDataStreamWriteToPaimonForPartialupdate`


这个Object负责向Paimon表中模拟写入数据。  
 代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

package tech.xuwei.paimon.mergeengine.partialupdate

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.{Row, RowKind}

/**

  • 使用Flink DataStream API向Paimon表中写入数据

  • Created by xuwei
    */
    object FlinkDataStreamWriteToPaimonForPartialupdate {
    def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(env)

    //手工构造一个Changelog DataStream 数据流
    val dataStream = env.fromElements(
    Row.ofKind(RowKind.INSERT, “jack”, Int.box(10),Int.box(175),null)//+I
    //Row.ofKind(RowKind.INSERT, “jack”, null,null,“beijing”)//+I
    //Row.ofKind(RowKind.INSERT, “jack”, Int.box(11),null,null)//+I
    )(Types.ROW_NAMED(Array(“name”, “age”, “height”, “city”),Types.STRING,Types.INT,Types.INT,Types.STRING))

    //将DataStream转换为Table
    val schema = Schema.newBuilder()
    .column(“name”, DataTypes.STRING().notNull())//主键非空
    .column(“age”, DataTypes.INT())
    .column(“height”, DataTypes.INT())
    .column(“city”, DataTypes.STRING())
    .primaryKey(“name”)//指定主键
    .build()
    val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())

    //创建Paimon类型的Catalog
    tEnv.executeSql(
    “”"
    |CREATE CATALOG paimon_catalog WITH (
    | ‘type’=‘paimon’,
    | ‘warehouse’=‘hdfs://bigdata01:9000/paimon’
    |)
    |“”".stripMargin)
    tEnv.executeSql(“USE CATALOG paimon_catalog”)

    //注册临时表
    tEnv.createTemporaryView(“t1”,table)

    //创建Paimon类型的表
    tEnv.executeSql(
    “”"
    |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
    |CREATE TABLE IF NOT EXISTS merge_engine_partialupdate (
    | name STRING,
    | age INT,
    | height INT,
    | city STRING,
    | PRIMARY KEY (name) NOT ENFORCED
    |) WITH (
    | ‘merge-engine’ = ‘partial-update’,
    | ‘partial-update.ignore-delete’ = ‘true’
    |)
    |“”".stripMargin)

    //向Paimon表中写入数据
    tEnv.executeSql(
    “”"
    |INSERT INTO merge_engine_partialupdate
    |SELECT name,age,height,city FROM t1
    |“”".stripMargin)
    }

}


注意:在执行代码的时候通过修改`env.fromElements(...)`中的注释来实现实时产生多条`+I`类型数据的效果。


接下来创建Object:`FlinkDataStreamReadFromPaimonForPartialupdate`


这个Object负责从Paimon表中实时读取数据。


代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

package tech.xuwei.paimon.mergeengine.partialupdate

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**

  • 使用Flink DataStream API从Paimon表中读取数据
  • Created by xuwei
    */
    object FlinkDataStreamReadFromPaimonForPartialupdate {
    def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,“8081”)
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)

img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

需要这份系统化资料的朋友,可以戳这里获取

tEnv.createTemporaryView("t1",table)

//创建Paimon类型的表
tEnv.executeSql(
  """
    |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
    |CREATE TABLE IF NOT EXISTS `merge_engine_partialupdate` (
    |    name STRING,
    |    age INT,
    |    height INT,
    |    city STRING,
    |    PRIMARY KEY (name) NOT ENFORCED
    |) WITH (
    |    'merge-engine' = 'partial-update',
    |    'partial-update.ignore-delete' = 'true'
    |)
    |""".stripMargin)

//向Paimon表中写入数据
tEnv.executeSql(
  """
    |INSERT INTO `merge_engine_partialupdate`
    |SELECT name,age,height,city FROM t1
    |""".stripMargin)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

}

}


注意:在执行代码的时候通过修改`env.fromElements(...)`中的注释来实现实时产生多条`+I`类型数据的效果。


接下来创建Object:`FlinkDataStreamReadFromPaimonForPartialupdate`


这个Object负责从Paimon表中实时读取数据。


代码如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

package tech.xuwei.paimon.mergeengine.partialupdate

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**

  • 使用Flink DataStream API从Paimon表中读取数据
  • Created by xuwei
    */
    object FlinkDataStreamReadFromPaimonForPartialupdate {
    def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    //指定WebUI界面的访问端口,默认就是8081
    conf.setString(RestOptions.BIND_PORT,“8081”)
    //为了便于在本地通过页面观察任务执行情况,所以开启本地WebUI功能
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)

[外链图片转存中…(img-q4Z5QBg3-1714892636498)]
[外链图片转存中…(img-eWiiTc0t-1714892636498)]
[外链图片转存中…(img-SyEJ3LHq-1714892636498)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

需要这份系统化资料的朋友,可以戳这里获取

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/946821
推荐阅读
相关标签
  

闽ICP备14008679号