当前位置:   article > 正文

SpringBoot MySQL BinLog 监听数据变化(多库多表)

SpringBoot MySQL BinLog 监听数据变化(多库多表)

开始

1:引入mysql-binlog-connector-java.jar

  1. <!-- binlog -->
  2. <dependency>
  3. <groupId>com.zendesk</groupId>
  4. <artifactId>mysql-binlog-connector-java</artifactId>
  5. <version>0.27.1</version>
  6. </dependency>
  7. <!-- guava -->
  8. <dependency>
  9. <groupId>com.google.guava</groupId>
  10. <artifactId>guava</artifactId>
  11. <version>31.1-jre</version>
  12. </dependency>

2:配置文件

  1. #用户必须要有权限
  2. binlog:
  3. # 服务器地址
  4. host: localhost
  5. port: 3306
  6. username: root
  7. password: 123456
  8. # 监听数据库与表,隔开,格式[库.表,,,]
  9. dbTable: 库.表,库1.表1,库1.表2
  10. serverId: 1

:1:mysql8.0之后binlog是默认开启的

        2:需要一个mysql查看binlog的权限用户

获取配置文件参数 BinLogConfig

  1. /**
  2. * @Description binlog配置
  3. * @Author WangKun
  4. * @Date 2024/8/8 15:01
  5. * @Version
  6. */
  7. @Data
  8. @Component
  9. public class BinLogConfig {
  10. /**
  11. * mysql服务地址
  12. **/
  13. @Value("${binlog.host}")
  14. private String host;
  15. /**
  16. * mysql数据库端口号
  17. **/
  18. @Value("${binlog.port}")
  19. private int port;
  20. /**
  21. * 查看BinLog权限用户名
  22. **/
  23. @Value("${binlog.username}")
  24. private String username;
  25. /**
  26. * 查看BinLog权限密码
  27. **/
  28. @Value("${binlog.password}")
  29. private String password;
  30. /**
  31. * 库表
  32. **/
  33. @Value("${binlog.dbTable}")
  34. private String dbTable;
  35. /**
  36. * 服务标识
  37. **/
  38. @Value("${binlog.serverId}")
  39. private Integer serverId;
  40. /**
  41. * 获取所有库表,并转化
  42. **/
  43. private List<String> tables;
  44. public List<String> getTables() {
  45. if (StringUtils.hasText(dbTable)){
  46. tables = Arrays.asList(dbTable.split(BinLogUtils.COMMA));
  47. }
  48. return tables;
  49. }
  50. }

BinLog与字段类型实体对象

  1. /**
  2. * @Description Binlog实体对象
  3. * @Author WangKun
  4. * @Date 2024/8/8 16:56
  5. * @Version
  6. */
  7. @Data
  8. public class BinLog implements Serializable {
  9. /**
  10. * 库表
  11. **/
  12. private String dbTable;
  13. /**
  14. * 事件类型
  15. **/
  16. private EventType eventType;
  17. /**
  18. * 存储字段-之前的值
  19. **/
  20. private Map<String, Serializable> before;
  21. /**
  22. * 存储字段-之后的值
  23. **/
  24. private Map<String, Serializable> after;
  25. /**
  26. * 存储字段--类型
  27. **/
  28. private Map<String, Field> fields;
  29. }
  1. /**
  2. * @Description 字段
  3. * @Author WangKun
  4. * @Date 2024/8/8 16:33
  5. * @Version
  6. */
  7. @AllArgsConstructor
  8. @Data
  9. public class Field implements Serializable {
  10. /**
  11. * 数据库
  12. **/
  13. public String schema;
  14. /**
  15. * 表
  16. **/
  17. public String table;
  18. /**
  19. * 列索引位置
  20. **/
  21. public int inx;
  22. /**
  23. * 列名
  24. **/
  25. public String colName;
  26. /**
  27. * 类型
  28. **/
  29. public String dataType;
  30. }

BinLog事件类型枚举(新增,修改,删除)

  1. /**
  2. * @Description BinLog事件枚举
  3. * @Author WangKun
  4. * @Date 2024/8/19 15:23
  5. * @Version
  6. */
  7. @Getter
  8. @AllArgsConstructor
  9. public enum BinLogEventEnum {
  10. WRITE("WRITE"),UPDATE("UPDATE"),DELETE("DELETE");
  11. /**
  12. * 获取key
  13. **/
  14. private final String key;
  15. }

BinLog工具与BinLog数据操作工具

  1. /**
  2. * @Description Binlog工具
  3. * @Author WangKun
  4. * @Date 2024/8/8 17:09
  5. * @Version
  6. */
  7. @Slf4j
  8. public class BinLogUtils {
  9. /**
  10. * 逗号
  11. **/
  12. public final static String COMMA = ",";
  13. /**
  14. * 点
  15. **/
  16. public final static String POINT = ".";
  17. /**
  18. * 双斜线
  19. **/
  20. public final static String D_SLASH = "\\";
  21. public static final long QUEUE_SLEEP = 1000;
  22. /**
  23. * @param db
  24. * @param table
  25. * @Description 拼接DB与Table
  26. * @Throws
  27. * @Return java.lang.String
  28. * @Date 2024-08-12 16:09:10
  29. * @Author WangKun
  30. **/
  31. public static String getDbTable(String db, String table) {
  32. return db + "-" + table;
  33. }
  34. }
  1. /**
  2. * @Description BinLog数据工具
  3. * @Author WangKun
  4. * @Date 2024/8/12 16:40
  5. * @Version
  6. */
  7. @Slf4j
  8. public class BinLogDataUtils {
  9. /**
  10. * @param db
  11. * @param table
  12. * @Description 获取columns集合
  13. * @Throws
  14. * @Return java.util.Map<java.lang.String, com.harmonywisdom.binlog.entity.Field>
  15. * @Date 2024-08-12 16:10:08
  16. * @Author WangKun
  17. **/
  18. public static Map<String, Field> getColumnsMap(String db, String table) {
  19. PreparedStatement ps = null;
  20. ResultSet rs = null;
  21. Connection connection = null;
  22. try {
  23. //获取数据源
  24. DataSource dataSource = SpringUtil.getBean(DataSource.class);
  25. connection = dataSource.getConnection();
  26. // 执行sql获取表数据
  27. String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?";
  28. ps = connection.prepareStatement(preSql);
  29. ps.setString(1, db);
  30. ps.setString(2, table);
  31. rs = ps.executeQuery();
  32. Map<String, Field> map = new HashMap<>(rs.getRow());
  33. while (rs.next()) {
  34. String column = rs.getString("COLUMN_NAME");
  35. int idx = rs.getInt("ORDINAL_POSITION");
  36. if (column != null && idx >= 1) {
  37. // sql的位置从1开始
  38. map.put(column, new Field(rs.getString("TABLE_SCHEMA"), rs.getString("TABLE_NAME"), idx - 1, column, rs.getString("DATA_TYPE")));
  39. }
  40. }
  41. ps.close();
  42. rs.close();
  43. connection.close();
  44. return map;
  45. } catch (SQLException e) {
  46. log.error("加载BinLog监控配置库.表字段错误, db_table={}.{} ", db, table, e);
  47. } finally {
  48. try {
  49. if (ps != null) {
  50. ps.close();
  51. }
  52. if (rs != null) {
  53. rs.close();
  54. }
  55. if (connection != null) {
  56. connection.close();
  57. }
  58. } catch (SQLException e) {
  59. log.error("加载BinLog监控配置库.表字段错误关闭连接失败, db_table={}.{} ", db, table, e);
  60. }
  61. }
  62. return null;
  63. }
  64. /**
  65. * @param row
  66. * @param dbTable
  67. * @param columMap
  68. * @param eventType
  69. * @Description 新增或删除操作数据格式化
  70. * @Throws
  71. * @Return com.harmonywisdom.binlog.entity.BinLog
  72. * @Date 2024-08-12 16:53:07
  73. * @Author WangKun
  74. **/
  75. private static BinLog insertOrDeletedColum(Serializable[] row, String dbTable, Map<String, Field> columMap, EventType eventType) {
  76. if (null == row || null == columMap || row.length != columMap.size()) {
  77. return null;
  78. }
  79. // 初始化Item
  80. BinLog item = new BinLog();
  81. item.setEventType(eventType);
  82. item.setFields(columMap);
  83. Map<String, Serializable> beOrAf = new HashMap<>();
  84. columMap.forEach((key, colum) -> {
  85. Serializable serializable = row[colum.inx];
  86. if (serializable instanceof byte[]) {
  87. beOrAf.put(key, new String((byte[]) serializable));
  88. } else {
  89. beOrAf.put(key, serializable);
  90. }
  91. });
  92. // 写操作放after,删操作放before
  93. if (isWrite(eventType)) {
  94. item.setAfter(beOrAf);
  95. }
  96. if (isDelete(eventType)) {
  97. item.setBefore(beOrAf);
  98. }
  99. return item;
  100. }
  101. /**
  102. * @param mapEntry
  103. * @param columMap
  104. * @param eventType
  105. * @Description 更新操作数据格式化
  106. * @Throws
  107. * @Return com.harmonywisdom.binlog.entity.BinLog
  108. * @Date 2024-08-12 16:52:46
  109. * @Author WangKun
  110. **/
  111. private static BinLog updateColum(Map.Entry<Serializable[], Serializable[]> mapEntry, Map<String, Field> columMap, EventType eventType) {
  112. if (null == mapEntry || null == columMap) {
  113. return null;
  114. }
  115. BinLog item = new BinLog();
  116. item.setEventType(eventType);
  117. item.setFields(columMap);
  118. Map<String, Serializable> be = new HashMap<>();
  119. Map<String, Serializable> af = new HashMap<>();
  120. columMap.forEach((key, colum) -> {
  121. Serializable serializableKey = mapEntry.getKey()[colum.inx];
  122. Serializable serializableValue = mapEntry.getValue()[colum.inx];
  123. if (serializableKey instanceof byte[]) {
  124. be.put(key, new String((byte[]) serializableKey));
  125. } else {
  126. be.put(key, serializableKey);
  127. }
  128. if (serializableValue instanceof byte[]) {
  129. af.put(key, new String((byte[]) serializableValue));
  130. } else {
  131. af.put(key, serializableValue);
  132. }
  133. });
  134. item.setBefore(be);
  135. item.setAfter(af);
  136. return item;
  137. }
  138. /**
  139. * @param data
  140. * @param dbTableIdCols
  141. * @param dbTableCols
  142. * @param eventType
  143. * @param queue
  144. * @Description 更新数据
  145. * @Throws
  146. * @Return void
  147. * @Date 2024-08-14 17:35:49
  148. * @Author WangKun
  149. **/
  150. public static void updateData(UpdateRowsEventData data, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, EventType eventType, BlockingQueue<BinLog> queue) {
  151. for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {
  152. if (dbTableIdCols.containsKey(data.getTableId())) {
  153. String dbTable = dbTableIdCols.get(data.getTableId());
  154. BinLog item = updateColum(row, dbTableCols.get(dbTable), eventType);
  155. item.setDbTable(dbTable);
  156. try {
  157. queue.put(item);
  158. } catch (InterruptedException e) {
  159. log.error("BinLog 更新数据添加阻塞队列异常:{}", e.getMessage(), e);
  160. }
  161. }
  162. }
  163. }
  164. /**
  165. * @param eventType
  166. * @param rows
  167. * @param tableId
  168. * @param dbTableIdCols
  169. * @param dbTableCols
  170. * @param queue
  171. * @Description 新增与删除数据
  172. * @Throws
  173. * @Return void
  174. * @Date 2024-08-13 17:30:30
  175. * @Author WangKun
  176. **/
  177. public static void insertOrDeletedData(EventType eventType, List<Serializable[]> rows, long tableId, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, BlockingQueue<BinLog> queue) {
  178. for (Serializable[] row : rows) {
  179. if (dbTableIdCols.containsKey(tableId)) {
  180. String dbTable = dbTableIdCols.get(tableId);
  181. BinLog item = insertOrDeletedColum(row, dbTable, dbTableCols.get(dbTable), eventType);
  182. item.setDbTable(dbTable);
  183. try {
  184. queue.put(item);
  185. } catch (InterruptedException e) {
  186. log.error("BinLog 新增或者删除数据添加阻塞队列异常:{}", e.getMessage(), e);
  187. }
  188. }
  189. }
  190. }
  191. }

BinLog监听

  1. /**
  2. * @Description 监听(@FunctionalInterface确保该接口只有以一个抽象方法)
  3. * @Author WangKun
  4. * @Date 2024/8/8 17:31
  5. * @Version
  6. */
  7. @FunctionalInterface
  8. public interface BinLogListener {
  9. void onEvent(BinLog binLog);
  10. }
  1. /**
  2. * @Description MySQL监听
  3. * @Author WangKun
  4. * @Date 2024/8/8 17:32
  5. * @Version
  6. */
  7. @Slf4j
  8. public class MySQLBinLogListener implements BinaryLogClient.EventListener {
  9. /**
  10. * BinLog连接信息
  11. **/
  12. private final BinaryLogClient client;
  13. /**
  14. * 阻塞队列,存放信息
  15. **/
  16. private final BlockingQueue<BinLog> queue;
  17. /**
  18. * 线程池
  19. **/
  20. private final ExecutorService executorService;
  21. /**
  22. * 存放每张数据表对应的listener器,允许将多个值存储在单个键下(每张表一个监听器)
  23. **/
  24. private final Multimap<String, BinLogListener> listeners;
  25. /**
  26. * 存放监控所有库表结构
  27. **/
  28. private final Map<String, Map<String, Field>> dbTableCols;
  29. /**
  30. * 存放改变的库表结构
  31. **/
  32. private final Map<Long, String> dbTableIdCols;
  33. /**
  34. * @param conf
  35. * @Description 监听器初始化配置
  36. * @Throws
  37. * @Return
  38. * @Date 2024-08-13 16:53:18
  39. * @Author WangKun
  40. **/
  41. public MySQLBinLogListener(BinLogConfig conf) {
  42. BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPassword());
  43. EventDeserializer eventDeserializer = new EventDeserializer();
  44. // 序列化
  45. eventDeserializer.setCompatibilityMode(
  46. EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
  47. EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
  48. );
  49. client.setEventDeserializer(eventDeserializer);
  50. client.setServerId(conf.getServerId());
  51. this.client = client;
  52. this.queue = new ArrayBlockingQueue<>(ThreadPoolConfig.queueCapacity);
  53. this.listeners = ArrayListMultimap.create();
  54. this.dbTableCols = new ConcurrentHashMap<>();
  55. this.dbTableIdCols = new ConcurrentHashMap<>();
  56. // 开启线程池
  57. this.executorService = ThreadPoolUtils.create().setPrefixName("Binlog-Listener-Thread").setCorePoolSize(6).build();
  58. }
  59. /**
  60. * @param event
  61. * @Description 监听处理, 只支持MySQL中BinLog的ROW模式的
  62. * @Throws
  63. * @Return void
  64. * @Date 2024-08-13 16:54:01
  65. * @Author WangKun
  66. **/
  67. @Override
  68. public void onEvent(Event event) {
  69. EventType eventType = event.getHeader().getEventType();
  70. // 装配库表结构
  71. if (eventType == EventType.TABLE_MAP) {
  72. TableMapEventData tableData = event.getData();
  73. String dbTable = BinLogUtils.getDbTable(tableData.getDatabase(), tableData.getTable());
  74. if (dbTableCols.containsKey(dbTable)) {
  75. dbTableIdCols.put(tableData.getTableId(), dbTable);
  76. }
  77. }
  78. //新增数据
  79. if (EventType.isWrite(eventType)) {
  80. WriteRowsEventData data = event.getData();
  81. BinLogDataUtils.insertOrDeletedData(eventType, data.getRows(), data.getTableId(), dbTableIdCols, dbTableCols, queue);
  82. } else if (EventType.isUpdate(eventType)) {
  83. // 更新数据
  84. UpdateRowsEventData data = event.getData();
  85. BinLogDataUtils.updateData(data, dbTableIdCols, dbTableCols, eventType, queue);
  86. } else if (EventType.isDelete(eventType)) {
  87. // 删除数据
  88. DeleteRowsEventData data = event.getData();
  89. BinLogDataUtils.insertOrDeletedData(eventType, data.getRows(), data.getTableId(), dbTableIdCols, dbTableCols, queue);
  90. }
  91. }
  92. /**
  93. * @param db
  94. * @param table
  95. * @param listener
  96. * @Description 注册监听
  97. * @Throws
  98. * @Return void
  99. * @Date 2024-08-13 17:32:44
  100. * @Author WangKun
  101. **/
  102. public void registerListener(String db, String table, BinLogListener listener) {
  103. String dbTable = BinLogUtils.getDbTable(db, table);
  104. // 连接获取字段集合
  105. Map<String, Field> cols = BinLogDataUtils.getColumnsMap(db, table);
  106. // 保存字段信息
  107. dbTableCols.put(dbTable, cols);
  108. // 保存当前注册的listener
  109. listeners.put(dbTable, listener);
  110. }
  111. /**
  112. * @param
  113. * @Description 开启异步多线程消费
  114. * @Throws
  115. * @Return void
  116. * @Date 2024-08-13 18:02:48
  117. * @Author WangKun
  118. **/
  119. @Async
  120. public void openThreadConsumeBinLog(){
  121. client.registerEventListener(this);
  122. for (int i = 0; i < ThreadPoolConfig.corePoolSize*ThreadPoolConfig.CPU_NUMS; i++) {
  123. executorService.execute(() -> {
  124. // 轮询监控
  125. while (true) {
  126. if (!queue.isEmpty()) {
  127. try {
  128. BinLog binLogQueue = queue.take();
  129. listeners.get(binLogQueue.getDbTable()).forEach(binLogListener -> binLogListener.onEvent(binLogQueue));
  130. } catch (InterruptedException e) {
  131. log.error("BinLog多线程消费异常:{}", e.getMessage(), e);
  132. }
  133. }
  134. }
  135. });
  136. }
  137. try {
  138. //连接(不设置时间将会使用主线程)
  139. client.connect(BinLogUtils.QUEUE_SLEEP);
  140. } catch (Exception e) {
  141. log.error("BinLog多线程连接消费异常:{}", e.getMessage(), e);
  142. }
  143. }
  144. }
  1. /**
  2. * @Description 初始化Binlog监听
  3. * @Author WangKun
  4. * @Date 2024/8/9 10:36
  5. * @Version
  6. */
  7. @Slf4j
  8. @RequiredArgsConstructor
  9. @Component
  10. @Order(value = 1)
  11. public class BinLogInitListener implements CommandLineRunner {
  12. /**
  13. * 资源注入
  14. **/
  15. private final BinLogConfig config;
  16. /**
  17. * @param args
  18. * @Description 初始化
  19. * @Throws
  20. * @Return void
  21. * @Date 2024-08-13 14:07:49
  22. * @Author WangKun
  23. **/
  24. @Override
  25. public void run(String... args) throws Exception {
  26. try {
  27. // 初始化监听器
  28. MySQLBinLogListener mySqlBinLogListener = new MySQLBinLogListener(config);
  29. this.getListMap().forEach((db, tables) -> {
  30. tables.forEach(table -> {
  31. mySqlBinLogListener.registerListener(db, table, info -> {
  32. if(info.getEventType().name().contains(BinLogEventEnum.UPDATE.getKey())){
  33. log.info("库.表: {}, 修改之前:{}" ,db+"."+table,info.getBefore().toString());
  34. log.info("库.表: {}, 修改之后:{}" ,db+"."+table,info.getAfter().toString());
  35. }
  36. if(info.getEventType().name().contains(BinLogEventEnum.WRITE.getKey())){
  37. log.info("库.表: {}, 新增: {}" ,db+"."+table,info.getAfter().toString());
  38. }
  39. if(info.getEventType().name().contains(BinLogEventEnum.DELETE.getKey())){
  40. log.info("库.表: {}, 删除: {}" ,db+"."+table,info.getBefore().toString());
  41. }
  42. });
  43. });
  44. });
  45. // 开启多线程消费
  46. mySqlBinLogListener.openThreadConsumeBinLog();
  47. } catch (Exception e) {
  48. log.error("BinLog初始化监听异常:{}", e.getMessage(), e);
  49. }
  50. }
  51. /**
  52. * @param
  53. * @Description 初始化监听库表
  54. * @Throws
  55. * @Return java.util.Map<java.lang.String, java.util.List < java.lang.String>>
  56. * @Date 2024-08-12 16:19:32
  57. * @Author WangKun
  58. **/
  59. private Map<String, List<String>> getListMap() {
  60. Map<String, List<String>> map = new ConcurrentHashMap<>();
  61. try {
  62. for (String key : config.getTables()) {
  63. // 正则转义,要加双斜线
  64. String[] split = key.split(BinLogUtils.D_SLASH + BinLogUtils.POINT);
  65. if (split.length != 2) {
  66. log.error("BinLog配置同步,类型错误 [库名.表名]。请正确配置:{}", key);
  67. throw new Exception("BinLog配置同步,类型错误 [库名.表名]。请正确配置:" + key);
  68. }
  69. map.computeIfAbsent(split[0], k -> new ArrayList<>()).add(split[1]);
  70. }
  71. return map;
  72. } catch (Exception e) {
  73. log.error("BinLog配置同步,类型错误 [库名.表名]。请正确配置:{}", e.getMessage(), e);
  74. }
  75. return map;
  76. }
  77. }

目录结构

启动IDEA,在控制台出现以下信息,成功

2024-08-19 17:40:47.129  INFO 493984 --- [       blc-localhost:3306] c.g.shyiko.mysql.binlog.BinaryLogClient  : Connected to localhost:3306 at log.000004/7294671 (sid:1, cid:800)

效果:

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/1011755
推荐阅读
相关标签
  

闽ICP备14008679号