当前位置:   article > 正文

Debezium的基本使用(以MySQL为例)

debezium mysql
  • Debezium介绍

  • 基本使用

    • MySQL的准备工作

    • 编写程序

    • 测试

  • 总结


一、Debezium介绍

摘自官网:

Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。

二、基本使用

下面以MySQL为例介绍Debezium的基本使用。

1. MySQL的准备工作

  1. 准备一个MySQL用户并且拥有相应权限,像这样:

  1. CREATE USER 'dbz'@'%' IDENTIFIED BY '******';
  2. GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz' IDENTIFIED BY '******';
  1. 检查MySQL是否开启log-bin

  1. SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';
  2. -- If the following error occurs: The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled...
  3. -- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;

如果是OFF则需要修改MySQL配置文件,类似下面这样:

  1. server-id         = 223344  #必须有
  2. log_bin           = mysql-bin #log_bin的值是binlog文件序列的基本名称
  3. binlog_format     = ROW    #必须是ROW
  4. binlog_row_image  = FULL   #必须是FULL
  5. expire_logs_days  = 10    #依据实际情况而定
  1. 准备数据库&表

  1. create database inventory;
  2. create table inventory.a (id bigint primary key auto_increment, name varchar(32));
  3. insert into inventory.a values (null, 'n1'),(null, 'n2'),(null, 'n3');

2. 编写程序

2.1. 工程依赖(Maven)

pom.xml

  1. <dependency>
  2.     <groupId>io.debezium</groupId>
  3.     <artifactId>debezium-api</artifactId>
  4.     <version>${version.debezium}</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>io.debezium</groupId>
  8.     <artifactId>debezium-embedded</artifactId>
  9.     <version>${version.debezium}</version>
  10. </dependency>
  11. <dependency>
  12.     <groupId>io.debezium</groupId>
  13.     <artifactId>debezium-connector-mysql</artifactId>
  14.     <version>${version.debezium}</version>
  15. </dependency>

目前Debezium最新稳定版本为:1.9.5.Final

2.2. 准备数据库&表

  1. create database inventory;
  2. create table inventory.a (id bigint primary key, name varchar(32));
  3. insert into inventory.a values (1'n1'),(2'n2'),(3'n3');

2.3. 代码编写

  1. package com.greatdb.dbzdemo;
  2. import java.io.IOException;
  3. import java.util.Properties;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.TimeUnit;
  7. import io.debezium.engine.ChangeEvent;
  8. import io.debezium.engine.DebeziumEngine;
  9. import io.debezium.engine.format.Json;
  10. /**
  11.  * @version 1.0
  12.  * @date 2022/07/29
  13.  */
  14. public class DebeziumTest {
  15.     private static DebeziumEngine<ChangeEvent<String, String>> engine;
  16.     public static void main(String[] args) throws Exception {
  17.         final Properties props = new Properties();
  18.         props.setProperty("name""dbz-engine");
  19.         props.setProperty("connector.class""io.debezium.connector.mysql.MySqlConnector");
  20.         //offset config begin - 使用文件来存储已处理的binlog偏移量
  21.         props.setProperty("offset.storage""org.apache.kafka.connect.storage.FileOffsetBackingStore");
  22.         props.setProperty("offset.storage.file.filename""/tmp/dbz/storage/mysql_offsets.dat");
  23.         props.setProperty("offset.flush.interval.ms""0");
  24.         //offset config end
  25.         props.setProperty("database.server.name""mysql-connector");
  26.         props.setProperty("database.history""io.debezium.relational.history.FileDatabaseHistory");
  27.         props.setProperty("database.history.file.filename""/tmp/dbz/storage/mysql_dbhistory.txt");
  28.         props.setProperty("database.server.id""122112"); //需要与MySQL的server-id不同
  29.         props.setProperty("database.hostname""tmg");
  30.         props.setProperty("database.port""3306");
  31.         props.setProperty("database.user""mysqluser");
  32.         props.setProperty("database.password""******");
  33.         props.setProperty("database.include.list""inventory");//要捕获的数据库名
  34.         props.setProperty("table.include.list""inventory.a");//要捕获的数据表
  35.         props.setProperty("snapshot.mode""initial");//全量+增量
  36.         // 使用上述配置创建Debezium引擎,输出样式为Json字符串格式
  37.         engine = DebeziumEngine.create(Json.class)
  38.                 .using(props)
  39.                 .notifying(record -> {
  40.                     System.out.println(record);//输出到控制台
  41.                 })
  42.                 .using((success, message, error) -> {
  43.                     if (error != null) {
  44.                         // 报错回调
  45.                         System.out.println("------------error, message:" + message + "exception:" + error);
  46.                     }
  47.                     closeEngine(engine);
  48.                 })
  49.                 .build();
  50.         ExecutorService executor = Executors.newSingleThreadExecutor();
  51.         executor.execute(engine);
  52.         addShutdownHook(engine);
  53.         awaitTermination(executor);
  54.         System.out.println("------------main finished.");
  55.     }
  56.     private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) {
  57.         try {
  58.             engine.close();
  59.         } catch (IOException ignored) {
  60.         }
  61.     }
  62.     private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) {
  63.         Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));
  64.     }
  65.     private static void awaitTermination(ExecutorService executor) {
  66.         if (executor != null) {
  67.             try {
  68.                 executor.shutdown();
  69.                 while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
  70.                 }
  71.             } catch (InterruptedException e) {
  72.                 Thread.currentThread().interrupt();
  73.             }
  74.         }
  75.     }
  76. }

3. 测试

程序跑起来后,可以看到控制台输出:

  1. ...(省略)
  2. EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":1}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"n1"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005191,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=1,name=n1},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005191}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
  3. EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":2}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"n2"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=2,name=n2},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
  4. EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":3}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":3,"name":"n3"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196,"snapshot":"last","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=3,name=n3},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
  5. ...(省略)

可以看到全量的数据已经输出,关键的数据如下:

  1. ..."payload":{"before":null,"after":{"id":1,"name":"n1"}..."op":"r"...
  2. ..."payload":{"before":null,"after":{"id":2,"name":"n2"}..."op":"r"...
  3. ..."payload":{"before":null,"after":{"id":3,"name":"n3"}..."op":"r"...
  • 接下来新增一条数据:

insert into inventory.a values (4'n4');

控制台输出:

..."payload":{"before":null,"after":{"id":4,"name":"n4"}..."op":"c"...
  • 修改一条数据:

update inventory.a set name = 'n4-upd' where id = 4;

控制台输出:

..."payload":{"before":{"id":4,"name":"n4"},"after":{"id":4,"name":"n4-upd"}..."op":"u"...
  • 删除一条数据:

delete from inventory.a where id = 1;

控制台输出:

..."payload":{"before":{"id":1,"name":"n1"},"after":null..."op":"d"...

三、总结

本文以MySQL为例介绍了Debezium在代码中基本使用流程,对MySQL的数据进行常见的增删改操作,Debezium将捕获这些数据行的变化,并记录了数据行变化前后的数据,并对外提供事件流,外部可以获取并对事件进行相应处理。

参考:https://debezium.io/documentation/reference/1.8/index.html

Enjoy GreatSQL :)


《深入浅出MGR》视频课程

戳此小程序即可直达B站

https://www.bilibili.com/medialist/play/1363850082?business=space_collection&business_id=343928&desc=0



文章推荐:


想看更多技术好文,点个“在看”吧!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/477769
推荐阅读
相关标签
  

闽ICP备14008679号