赞
踩
在工作中经常会遇到要实时获取数据库(postgresql、mysql等)的变更数据,主要体现数据的实时性;mysql数据库有canal工具实现很简单,但是基于postgresql数据库获取实时数据就比较复杂,之前已经写过一篇获取postgresql数据库实时数据的文章,如下:
但是,之前的实现方式比较繁琐,不利于维护,所有本文整合Flink CDC通过一个比较简单的方式实现;
Flink CDC(Change Data Capture)是一个基于Apache Flink构建的开源数据变更捕获(CDC)框架。其核心功能是从各种关系型数据库(如MySQL、PostgreSQL、Oracle等)中捕获数据变更(如增删改操作),并将这些变更以流的形式提供给Flink等流处理引擎进行处理;
1)CDC(Change Data Capture):数据变更捕获的简称,用于监测并捕获数据库的变动,然后将这些变更按照发生顺序捕获,并写入到目标存储系统(如数据仓库、数据湖、消息队列等)。
2)Flink CDC:基于Flink的CDC实现,将CDC技术与Flink流处理引擎相结合,实现数据的实时捕获、处理和传输。
1)Postgres CDC(Change Data Capture)连接器是用于从PostgreSQL数据库捕获数据变更(如增删改操作)并将其以流的形式提供给数据处理引擎(如Flink)的组件;
2)PostgreSQL版本:Postgres CDC连接器通常支持PostgreSQL的多个版本,具体版本可能因连接器版本不同而有所差异。常见的支持版本包括9.6、10、11、12、13、14等;
1、安装postgresql数据库,并创建库和测试使用的表,这里不再列举详细步骤;
2、修改postgresql数据库配置,通过wal日志监听变更数据
修改postgresql.conf文件,重启服务
wal_level=logical
3、springboot关键maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>3.0.1</version>
</dependency>
注:其它依赖不在列举,可以通过获取源码查看
InitAction02.java
package com.sk.proxytest.init;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class InitAction02 {
@PostConstruct
public void run() throws Exception {
DebeziumDeserializationSchema<String> deserializer =
new JsonDebeziumDeserializationSchema();
JdbcIncrementalSource<String> postgresIncrementalSource =
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
.hostname("127.0.0.1")
.port(5432)
.database("postgres")
.schemaList("public")
.tableList("public.student")
.username("postgres")
.password("password")
.slotName("flink")
.decodingPluginName("pgoutput") // use pgoutput for PostgreSQL 10+
.deserializer(deserializer)
.includeSchemaChanges(true) // output the schema changes as well
.splitSize(2) // the split size of each snapshot split
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(
postgresIncrementalSource,
WatermarkStrategy.noWatermarks(),
"PostgresParallelSource")
.setParallelism(2).addSink(new CustomSink());
//.print();
env.execute("Output Postgres Snapshot");
}
}
CustomSink.java
package com.sk.proxytest.init;
import lombok.extern.log4j.Log4j2;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@Log4j2
public class CustomSink extends RichSinkFunction<String> {
@Override
public void invoke(String value, Context context) throws Exception {
log.info("============数据发生变化:{}", value);
}
}
执行结果:
1)新增数据
2)变更数据输出
2024-07-31T00:00:15,761 INFO [debezium-reader-0] io.debezium.util.Threads$3: Creating thread debezium-postgresconnector-postgres_cdc_source-keep-alive
2024-07-31T00:00:15,761 INFO [debezium-reader-0] io.debezium.connector.postgresql.PostgresStreamingChangeEventSource: Processing messages
2024-07-31T00:00:15,762 INFO [debezium-reader-0] io.debezium.connector.postgresql.connection.WalPositionLocator: Message with LSN 'LSN{0/3588018}' arrived, switching off the filtering
2024-07-31T00:00:16,678 INFO [Sink: Unnamed (1/4)#0] com.sk.proxytest.init.CustomSink: ============数据发生变化:{"before":null,"after":{"id":8,"name":"8","age":8,"remark":"8"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres_cdc_source","ts_ms":1722355215252,"snapshot":"false","db":"postgres","sequence":"[null,\"56131608\"]","schema":"public","table":"student","txId":932,"lsn":56131608,"xmin":null},"op":"c","ts_ms":1722355216336,"transaction":null}
Postgres CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后继续读取二进制日志,即使发生故障,也会进行一次处理;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。