赞
踩
是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台。你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别(row-level)的更改。只有已提交的更改才是可见的,所以你的应用不用担心事务(transaction)或者更改被回滚(roll back)。Debezium为所有的数据库更改事件提供了一个统一的模型,所以你的应用不用担心每一种数据库管理系统的错综复杂性。另外,由于Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉
PostgreSQL是一个功能强大的开源数据库系统。经过长达15年以上的积极开发和不断改进,PostgreSQL已在可靠性、稳定性、数据一致性等获得了业内极高的声誉。目前PostgreSQL可以运行在所有主流操作系统上,包括Linux、Unix(AIX、BSD、HP-UX、SGI IRIX、Mac OS X、Solaris和Tru64)和Windows。PostgreSQL是完全的事务安全性数据库,完整地支持外键、联合、视图、触发器和存储过程(并支持多种语言开发存储过程)。它支持了大多数的SQL:2008标准的数据类型,包括整型、数值型、布尔型、字节型、字符型、日期型、时间间隔型和时间型,它也支持存储二进制的大对像,包括图片、声音和视频。PostgreSQL对很多高级开发语言有原生的编程接口,如C/C++、Java、.Net、Perl、Python、Ruby、Tcl 和ODBC以及其他语言等,也包含各种文档。
docker pull postgres:10.6
在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为postgres,并将 pgoutput 插件加载到 PostgreSQL 实例中:
docker run -d -p 30028:5432 --name postgres-10.6 -e POSTGRES_PASSWORD=postgres postgres:10.6 -c 'shared_preload_libraries=pgoutput'
docker ps | grep postgres-10.6
docker exec -it postgres-10.6 bash
vi /var/lib/postgresql/data/postgresql.conf
配置内容如下:
- # 更改wal日志方式为logical(方式有:minimal、replica 、logical )
- wal_level = logical
-
-
- # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
- max_replication_slots = 20
-
-
- # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
- max_wal_senders = 20
-
-
- # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
- wal_sender_timeout = 180s
docker restart postgres-10.6
连接数据库,如果查询一下语句,返回logical表示修改成功:
SHOW wal_level
使用创建容器时的账号密码(postgres/postgres)登录Postgresql数据库。
-- 创建数据库test_db
CREATE DATABASE test_db;
-- 连接到新创建的数据库 test_db
\c test_db
-- 创建 t_user 表
- CREATE TABLE "public"."t_user" (
- "id" int8 NOT NULL,
- "name" varchar(255),
- "age" int2,
- PRIMARY KEY ("id")
- );
-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';
-- 给用户复制流权限
ALTER ROLE test1 replication;
-- 给用户登录数据库权限
GRANT CONNECT ON DATABASE test_db to test1;
-- 把当前库public下所有表查询权限赋给用户
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE t_user REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='t_user';
实验目标:测试读取postgres数据库中t_user增量数据
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>springboot-demo</artifactId>
- <groupId>com.et</groupId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
-
- <artifactId>postgre</artifactId>
-
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <debezium.version>1.9.4.Final</debezium.version>
-
-
- </properties>
- <dependencies>
-
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-autoconfigure</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <!-- lombok -->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
-
-
- <!-- debezium -->
- <dependency>
- <groupId>io.debezium</groupId>
- <artifactId>debezium-api</artifactId>
- <version>${debezium.version}</version>
- </dependency>
- <dependency>
- <groupId>io.debezium</groupId>
- <artifactId>debezium-embedded</artifactId>
- <version>${debezium.version}</version>
- </dependency>
- <dependency>
- <groupId>io.debezium</groupId>
- <artifactId>debezium-connector-postgres</artifactId>
- <version>${debezium.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>3.12.0</version>
- </dependency>
-
-
- </dependencies>
- </project>
- package com.et.postgres.config;
-
-
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.core.env.Environment;
-
-
- import java.io.File;
- import java.io.IOException;
-
-
- @Configuration
- public class DebeziumConnectorConfig {
-
-
- @Bean
- public io.debezium.config.Configuration customerConnector(Environment env) throws IOException {
- File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
- return io.debezium.config.Configuration.create()
- .with("name", "customer_postgres_connector")
- .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
- .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
- .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
- .with("offset.flush.interval.ms", "60000")
- .with("database.hostname", env.getProperty("customer.datasource.host"))
- .with("database.port", env.getProperty("customer.datasource.port")) // defaults to 5432
- .with("database.user", env.getProperty("customer.datasource.username"))
- .with("database.password", env.getProperty("customer.datasource.password"))
- .with("database.dbname", env.getProperty("customer.datasource.database"))
- .with("database.server.id", "10181")
- .with("database.server.name", "customer-postgres-db-server")
- .with("database.history", "io.debezium.relational.history.MemoryDatabaseHistory")
- .with("table.include.list", "public.t_user")
- .with("column.include.list", "public.t_user.name,public.t_user.age")
- .with("publication.autocreate.mode", "filtered")
- .with("plugin.name", "pgoutput")
- .with("slot.name", "dbz_customerdb_listener")
- .build();
- }
- }
- package com.et.postgres.listener;
-
-
- import io.debezium.config.Configuration;
- import io.debezium.embedded.Connect;
- import io.debezium.engine.DebeziumEngine;
- import io.debezium.engine.RecordChangeEvent;
- import io.debezium.engine.format.ChangeEventFormat;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.connect.data.Struct;
- import org.apache.kafka.connect.source.SourceRecord;
- import org.springframework.stereotype.Component;
-
-
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- import java.io.IOException;
- import java.util.Objects;
- import java.util.concurrent.Executor;
- import java.util.concurrent.Executors;
-
-
- @Slf4j
- @Component
- public class DebeziumListener {
-
-
- private final Executor executor = Executors.newSingleThreadExecutor();
- private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;
-
-
- public DebeziumListener(Configuration customerConnectorConfiguration) {
- this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
- .using(customerConnectorConfiguration.asProperties())
- .notifying(this::handleChangeEvent)
- .build();
- }
-
-
- private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
- SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
- log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());
- Object sourceRecordChangeValue= (Struct) sourceRecord.value();
- log.info("SourceRecordChangeValue = '{}'",sourceRecordRecordChangeEvent);
- // if (sourceRecordChangeValue != null) {
- // Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
-
-
- // Operation.READ operation events are always triggered when application initializes
- // We're only interested in CREATE operation which are triggered upon new insert registry
- // if(operation != Operation.READ) {
- // String record = operation == Operation.DELETE ? BEFORE : AFTER; // Handling Update & Insert operations.
-
-
- // Struct struct = (Struct) sourceRecordChangeValue.get(record);
- // Map<String, Object> payload = struct.schema().fields().stream()
- // .map(Field::name)
- // .filter(fieldName -> struct.get(fieldName) != null)
- // .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
- // .collect(toMap(Pair::getKey, Pair::getValue));
-
-
- // // this.customerService.replicateData(payload, operation);
- // log.info("Updated Data: {} with Operation: {}", payload, operation.name());
- // }
- // }
- }
-
-
- @PostConstruct
- private void start() {
- this.executor.execute(debeziumEngine);
- }
-
-
- @PreDestroy
- private void stop() throws IOException {
- if (Objects.nonNull(this.debeziumEngine)) {
- this.debeziumEngine.close();
- }
- }
-
-
- }
- package com.et.postgres;
-
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
- @SpringBootApplication
- public class DemoApplication {
-
-
- public static void main(String[] args) {
- SpringApplication.run(DemoApplication.class, args);
- }
- }
- customer.datasource.host=localhost
- customer.datasource.port=30028
- customer.datasource.database=test_db
- customer.datasource.username=test1
- customer.datasource.password=test123
-
-
- logging.level.root=INFO
- logging.level.io.debezium.postgres.BinlogReader=INFO
- logging.level.io.davidarhcanjo=DEBUG
以上只是一些关键代码,所有代码请参见下面代码仓库
https://github.com/Harries/springboot-demo
启动spring boot应用程序
在表里面插入一些数据
INSERT INTO public.t_user(id, "name", age) VALUES(1, 'harries', 18);
观察控制台输出
- 2024-04-07 14:22:01.621 INFO 29260 --- [pool-1-thread-1] i.d.connector.common.BaseSourceTask : 1 records sent during previous 00:00:42.015, last recorded offset: {transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}
- 2024-04-07 14:22:01.622 INFO 29260 --- [pool-1-thread-1] c.et.postgres.listener.DebeziumListener : Key = Struct{id=1}, Value = Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}
- 2024-04-07 14:22:01.622 INFO 29260 --- [pool-1-thread-1] c.et.postgres.listener.DebeziumListener : SourceRecordChangeValue = 'EmbeddedEngineChangeEvent [key=null, value=SourceRecord{sourcePartition={server=customer-postgres-db-server}, sourceOffset={transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}} ConnectRecord{topic='customer-postgres-db-server.public.t_user', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{customer_postgres_db_server.public.t_user.Key:STRUCT}, value=Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}, valueSchema=Schema{customer_postgres_db_server.public.t_user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}, sourceRecord=SourceRecord{sourcePartition={server=customer-postgres-db-server}, sourceOffset={transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}} ConnectRecord{topic='customer-postgres-db-server.public.t_user', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{customer_postgres_db_server.public.t_user.Key:STRUCT}, value=Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}, valueSchema=Schema{customer_postgres_db_server.public.t_user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
https://github.com/davidarchanjo/spring-boot-debezium-postgres
http://www.liuhaihua.cn/archives/710398.html
https://debezium.io/documentation/reference/0.10/features.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。