当前位置:   article > 正文

Spring Boot集成Debezium实现postgres增量同步

Spring Boot集成Debezium实现postgres增量同步

1.Debezium和postgres介绍

Debezium

是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台。你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别(row-level)的更改。只有已提交的更改才是可见的,所以你的应用不用担心事务(transaction)或者更改被回滚(roll back)。Debezium为所有的数据库更改事件提供了一个统一的模型,所以你的应用不用担心每一种数据库管理系统的错综复杂性。另外,由于Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉

postgres介绍

PostgreSQL是一个功能强大的开源数据库系统。经过长达15年以上的积极开发和不断改进,PostgreSQL已在可靠性、稳定性、数据一致性等获得了业内极高的声誉。目前PostgreSQL可以运行在所有主流操作系统上,包括Linux、Unix(AIX、BSD、HP-UX、SGI IRIX、Mac OS X、Solaris和Tru64)和Windows。PostgreSQL是完全的事务安全性数据库,完整地支持外键、联合、视图、触发器和存储过程(并支持多种语言开发存储过程)。它支持了大多数的SQL:2008标准的数据类型,包括整型、数值型、布尔型、字节型、字符型、日期型、时间间隔型和时间型,它也支持存储二进制的大对像,包括图片、声音和视频。PostgreSQL对很多高级开发语言有原生的编程接口,如C/C++、Java、.Net、Perl、Python、Ruby、Tcl 和ODBC以及其他语言等,也包含各种文档。

2.postgres测试环境搭建

1.安装

step1:拉取 PostgreSQL 10.6 版本的镜像:
docker pull postgres:10.6
step2:创建并启动 PostgreSQL 容器,

在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为postgres,并将 pgoutput 插件加载到 PostgreSQL 实例中:

docker run -d -p 30028:5432 --name postgres-10.6 -e POSTGRES_PASSWORD=postgres postgres:10.6 -c 'shared_preload_libraries=pgoutput'
step3:查看容器是否创建成功:
docker ps | grep postgres-10.6

2.配置

step1:docker进去Postgresql数据的容器:
docker exec -it postgres-10.6 bash
step2:编辑postgresql.conf配置文件:
vi /var/lib/postgresql/data/postgresql.conf

配置内容如下:

  1. # 更改wal日志方式为logical(方式有:minimal、replica 、logical )
  2. wal_level = logical
  3. # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
  4. max_replication_slots = 20
  5. # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
  6. max_wal_senders = 20
  7. # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
  8. wal_sender_timeout = 180s
step3:重启容器:
docker restart postgres-10.6

连接数据库,如果查询一下语句,返回logical表示修改成功:

SHOW wal_level

3.新建用户并赋权

使用创建容器时的账号密码(postgres/postgres)登录Postgresql数据库。

先创建数据库和表:

-- 创建数据库test_db

CREATE DATABASE test_db;

-- 连接到新创建的数据库 test_db

\c test_db

-- 创建 t_user 表

  1. CREATE TABLE "public"."t_user" (
  2. "id" int8 NOT NULL,
  3. "name" varchar(255),
  4. "age" int2,
  5. PRIMARY KEY ("id")
  6. );
新建用户并且给用户权限:

-- pg新建用户

CREATE USER test1 WITH PASSWORD 'test123';

-- 给用户复制流权限

ALTER ROLE test1 replication;

-- 给用户登录数据库权限

GRANT CONNECT ON DATABASE test_db to test1;

-- 把当前库public下所有表查询权限赋给用户

GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;

5. 发布表

-- 设置发布为true

update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布

CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布

select * from pg_publication_tables;

-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)

ALTER TABLE t_user REPLICA IDENTITY FULL;

-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)

select relreplident from pg_class where relname='t_user';

3.代码工程

实验目标:测试读取postgres数据库中t_user增量数据

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>springboot-demo</artifactId>
  7. <groupId>com.et</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>postgre</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. <debezium.version>1.9.4.Final</debezium.version>
  16. </properties>
  17. <dependencies>
  18. <dependency>
  19. <groupId>org.springframework.boot</groupId>
  20. <artifactId>spring-boot-starter-web</artifactId>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-autoconfigure</artifactId>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.springframework.boot</groupId>
  28. <artifactId>spring-boot-starter-test</artifactId>
  29. <scope>test</scope>
  30. </dependency>
  31. <!-- lombok -->
  32. <dependency>
  33. <groupId>org.projectlombok</groupId>
  34. <artifactId>lombok</artifactId>
  35. <optional>true</optional>
  36. </dependency>
  37. <!-- debezium -->
  38. <dependency>
  39. <groupId>io.debezium</groupId>
  40. <artifactId>debezium-api</artifactId>
  41. <version>${debezium.version}</version>
  42. </dependency>
  43. <dependency>
  44. <groupId>io.debezium</groupId>
  45. <artifactId>debezium-embedded</artifactId>
  46. <version>${debezium.version}</version>
  47. </dependency>
  48. <dependency>
  49. <groupId>io.debezium</groupId>
  50. <artifactId>debezium-connector-postgres</artifactId>
  51. <version>${debezium.version}</version>
  52. </dependency>
  53. <dependency>
  54. <groupId>org.apache.commons</groupId>
  55. <artifactId>commons-lang3</artifactId>
  56. <version>3.12.0</version>
  57. </dependency>
  58. </dependencies>
  59. </project>

DebeziumConnectorConfig.java

  1. package com.et.postgres.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.core.env.Environment;
  5. import java.io.File;
  6. import java.io.IOException;
  7. @Configuration
  8. public class DebeziumConnectorConfig {
  9. @Bean
  10. public io.debezium.config.Configuration customerConnector(Environment env) throws IOException {
  11. File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
  12. return io.debezium.config.Configuration.create()
  13. .with("name", "customer_postgres_connector")
  14. .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
  15. .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
  16. .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
  17. .with("offset.flush.interval.ms", "60000")
  18. .with("database.hostname", env.getProperty("customer.datasource.host"))
  19. .with("database.port", env.getProperty("customer.datasource.port")) // defaults to 5432
  20. .with("database.user", env.getProperty("customer.datasource.username"))
  21. .with("database.password", env.getProperty("customer.datasource.password"))
  22. .with("database.dbname", env.getProperty("customer.datasource.database"))
  23. .with("database.server.id", "10181")
  24. .with("database.server.name", "customer-postgres-db-server")
  25. .with("database.history", "io.debezium.relational.history.MemoryDatabaseHistory")
  26. .with("table.include.list", "public.t_user")
  27. .with("column.include.list", "public.t_user.name,public.t_user.age")
  28. .with("publication.autocreate.mode", "filtered")
  29. .with("plugin.name", "pgoutput")
  30. .with("slot.name", "dbz_customerdb_listener")
  31. .build();
  32. }
  33. }

DebeziumListener.java

  1. package com.et.postgres.listener;
  2. import io.debezium.config.Configuration;
  3. import io.debezium.embedded.Connect;
  4. import io.debezium.engine.DebeziumEngine;
  5. import io.debezium.engine.RecordChangeEvent;
  6. import io.debezium.engine.format.ChangeEventFormat;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.apache.kafka.connect.data.Struct;
  9. import org.apache.kafka.connect.source.SourceRecord;
  10. import org.springframework.stereotype.Component;
  11. import javax.annotation.PostConstruct;
  12. import javax.annotation.PreDestroy;
  13. import java.io.IOException;
  14. import java.util.Objects;
  15. import java.util.concurrent.Executor;
  16. import java.util.concurrent.Executors;
  17. @Slf4j
  18. @Component
  19. public class DebeziumListener {
  20. private final Executor executor = Executors.newSingleThreadExecutor();
  21. private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;
  22. public DebeziumListener(Configuration customerConnectorConfiguration) {
  23. this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
  24. .using(customerConnectorConfiguration.asProperties())
  25. .notifying(this::handleChangeEvent)
  26. .build();
  27. }
  28. private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
  29. SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
  30. log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());
  31. Object sourceRecordChangeValue= (Struct) sourceRecord.value();
  32. log.info("SourceRecordChangeValue = '{}'",sourceRecordRecordChangeEvent);
  33. // if (sourceRecordChangeValue != null) {
  34. // Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
  35. // Operation.READ operation events are always triggered when application initializes
  36. // We're only interested in CREATE operation which are triggered upon new insert registry
  37. // if(operation != Operation.READ) {
  38. // String record = operation == Operation.DELETE ? BEFORE : AFTER; // Handling Update & Insert operations.
  39. // Struct struct = (Struct) sourceRecordChangeValue.get(record);
  40. // Map<String, Object> payload = struct.schema().fields().stream()
  41. // .map(Field::name)
  42. // .filter(fieldName -> struct.get(fieldName) != null)
  43. // .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
  44. // .collect(toMap(Pair::getKey, Pair::getValue));
  45. // // this.customerService.replicateData(payload, operation);
  46. // log.info("Updated Data: {} with Operation: {}", payload, operation.name());
  47. // }
  48. // }
  49. }
  50. @PostConstruct
  51. private void start() {
  52. this.executor.execute(debeziumEngine);
  53. }
  54. @PreDestroy
  55. private void stop() throws IOException {
  56. if (Objects.nonNull(this.debeziumEngine)) {
  57. this.debeziumEngine.close();
  58. }
  59. }
  60. }

DemoApplication.java

  1. package com.et.postgres;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class DemoApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(DemoApplication.class, args);
  8. }
  9. }

application.properties

  1. customer.datasource.host=localhost
  2. customer.datasource.port=30028
  3. customer.datasource.database=test_db
  4. customer.datasource.username=test1
  5. customer.datasource.password=test123
  6. logging.level.root=INFO
  7. logging.level.io.debezium.postgres.BinlogReader=INFO
  8. logging.level.io.davidarhcanjo=DEBUG

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • https://github.com/Harries/springboot-demo

4.测试

  • 启动spring boot应用程序

  • 在表里面插入一些数据

 
 
INSERT INTO public.t_user(id, "name", age) VALUES(1, 'harries', 18);
  • 观察控制台输出

  1. 2024-04-07 14:22:01.621 INFO 29260 --- [pool-1-thread-1] i.d.connector.common.BaseSourceTask : 1 records sent during previous 00:00:42.015, last recorded offset: {transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}
  2. 2024-04-07 14:22:01.622 INFO 29260 --- [pool-1-thread-1] c.et.postgres.listener.DebeziumListener : Key = Struct{id=1}, Value = Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}
  3. 2024-04-07 14:22:01.622 INFO 29260 --- [pool-1-thread-1] c.et.postgres.listener.DebeziumListener : SourceRecordChangeValue = 'EmbeddedEngineChangeEvent [key=null, value=SourceRecord{sourcePartition={server=customer-postgres-db-server}, sourceOffset={transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}} ConnectRecord{topic='customer-postgres-db-server.public.t_user', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{customer_postgres_db_server.public.t_user.Key:STRUCT}, value=Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}, valueSchema=Schema{customer_postgres_db_server.public.t_user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}, sourceRecord=SourceRecord{sourcePartition={server=customer-postgres-db-server}, sourceOffset={transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}} ConnectRecord{topic='customer-postgres-db-server.public.t_user', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{customer_postgres_db_server.public.t_user.Key:STRUCT}, value=Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}, valueSchema=Schema{customer_postgres_db_server.public.t_user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'

5.引用

  • https://github.com/davidarchanjo/spring-boot-debezium-postgres

  • http://www.liuhaihua.cn/archives/710398.html

  • https://debezium.io/documentation/reference/0.10/features.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/391681
推荐阅读
相关标签
  

闽ICP备14008679号