赞
踩
开始
1:引入mysql-binlog-connector-java.jar
- <!-- binlog -->
- <dependency>
- <groupId>com.zendesk</groupId>
- <artifactId>mysql-binlog-connector-java</artifactId>
- <version>0.27.1</version>
- </dependency>
- <!-- guava -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>31.1-jre</version>
- </dependency>
2:配置文件
- #用户必须要有权限
- binlog:
- # 服务器地址
- host: localhost
- port: 3306
- username: root
- password: 123456
- # 监听数据库与表,隔开,格式[库.表,,,]
- dbTable: 库.表,库1.表1,库1.表2
- serverId: 1
注:1:mysql8.0之后binlog是默认开启的
2:需要一个mysql查看binlog的权限用户
获取配置文件参数 BinLogConfig
- /**
- * @Description binlog配置
- * @Author WangKun
- * @Date 2024/8/8 15:01
- * @Version
- */
- @Data
- @Component
- public class BinLogConfig {
-
- /**
- * mysql服务地址
- **/
- @Value("${binlog.host}")
- private String host;
-
- /**
- * mysql数据库端口号
- **/
- @Value("${binlog.port}")
- private int port;
-
- /**
- * 查看BinLog权限用户名
- **/
- @Value("${binlog.username}")
- private String username;
-
- /**
- * 查看BinLog权限密码
- **/
- @Value("${binlog.password}")
- private String password;
-
- /**
- * 库表
- **/
- @Value("${binlog.dbTable}")
- private String dbTable;
-
- /**
- * 服务标识
- **/
- @Value("${binlog.serverId}")
- private Integer serverId;
-
- /**
- * 获取所有库表,并转化
- **/
- private List<String> tables;
-
- public List<String> getTables() {
- if (StringUtils.hasText(dbTable)){
- tables = Arrays.asList(dbTable.split(BinLogUtils.COMMA));
- }
- return tables;
- }
-
- }
BinLog与字段类型实体对象
- /**
- * @Description Binlog实体对象
- * @Author WangKun
- * @Date 2024/8/8 16:56
- * @Version
- */
- @Data
- public class BinLog implements Serializable {
-
- /**
- * 库表
- **/
- private String dbTable;
- /**
- * 事件类型
- **/
- private EventType eventType;
- /**
- * 存储字段-之前的值
- **/
- private Map<String, Serializable> before;
- /**
- * 存储字段-之后的值
- **/
- private Map<String, Serializable> after;
- /**
- * 存储字段--类型
- **/
- private Map<String, Field> fields;
-
-
- }
- /**
- * @Description 字段
- * @Author WangKun
- * @Date 2024/8/8 16:33
- * @Version
- */
- @AllArgsConstructor
- @Data
- public class Field implements Serializable {
-
- /**
- * 数据库
- **/
- public String schema;
-
- /**
- * 表
- **/
- public String table;
-
- /**
- * 列索引位置
- **/
- public int inx;
- /**
- * 列名
- **/
- public String colName;
- /**
- * 类型
- **/
- public String dataType;
-
-
- }
BinLog事件类型枚举(新增,修改,删除)
- /**
- * @Description BinLog事件枚举
- * @Author WangKun
- * @Date 2024/8/19 15:23
- * @Version
- */
- @Getter
- @AllArgsConstructor
- public enum BinLogEventEnum {
-
- WRITE("WRITE"),UPDATE("UPDATE"),DELETE("DELETE");
-
- /**
- * 获取key
- **/
- private final String key;
-
- }
BinLog工具与BinLog数据操作工具
- /**
- * @Description Binlog工具
- * @Author WangKun
- * @Date 2024/8/8 17:09
- * @Version
- */
- @Slf4j
- public class BinLogUtils {
-
- /**
- * 逗号
- **/
- public final static String COMMA = ",";
- /**
- * 点
- **/
- public final static String POINT = ".";
-
- /**
- * 双斜线
- **/
- public final static String D_SLASH = "\\";
-
-
-
- public static final long QUEUE_SLEEP = 1000;
-
- /**
- * @param db
- * @param table
- * @Description 拼接DB与Table
- * @Throws
- * @Return java.lang.String
- * @Date 2024-08-12 16:09:10
- * @Author WangKun
- **/
- public static String getDbTable(String db, String table) {
- return db + "-" + table;
- }
-
- }
- /**
- * @Description BinLog数据工具
- * @Author WangKun
- * @Date 2024/8/12 16:40
- * @Version
- */
- @Slf4j
- public class BinLogDataUtils {
-
-
- /**
- * @param db
- * @param table
- * @Description 获取columns集合
- * @Throws
- * @Return java.util.Map<java.lang.String, com.harmonywisdom.binlog.entity.Field>
- * @Date 2024-08-12 16:10:08
- * @Author WangKun
- **/
- public static Map<String, Field> getColumnsMap(String db, String table) {
- PreparedStatement ps = null;
- ResultSet rs = null;
- Connection connection = null;
- try {
- //获取数据源
- DataSource dataSource = SpringUtil.getBean(DataSource.class);
- connection = dataSource.getConnection();
- // 执行sql获取表数据
- String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?";
- ps = connection.prepareStatement(preSql);
- ps.setString(1, db);
- ps.setString(2, table);
- rs = ps.executeQuery();
- Map<String, Field> map = new HashMap<>(rs.getRow());
- while (rs.next()) {
- String column = rs.getString("COLUMN_NAME");
- int idx = rs.getInt("ORDINAL_POSITION");
- if (column != null && idx >= 1) {
- // sql的位置从1开始
- map.put(column, new Field(rs.getString("TABLE_SCHEMA"), rs.getString("TABLE_NAME"), idx - 1, column, rs.getString("DATA_TYPE")));
- }
- }
- ps.close();
- rs.close();
- connection.close();
- return map;
- } catch (SQLException e) {
- log.error("加载BinLog监控配置库.表字段错误, db_table={}.{} ", db, table, e);
- } finally {
- try {
- if (ps != null) {
- ps.close();
- }
- if (rs != null) {
- rs.close();
- }
- if (connection != null) {
- connection.close();
- }
- } catch (SQLException e) {
- log.error("加载BinLog监控配置库.表字段错误关闭连接失败, db_table={}.{} ", db, table, e);
- }
- }
- return null;
- }
-
- /**
- * @param row
- * @param dbTable
- * @param columMap
- * @param eventType
- * @Description 新增或删除操作数据格式化
- * @Throws
- * @Return com.harmonywisdom.binlog.entity.BinLog
- * @Date 2024-08-12 16:53:07
- * @Author WangKun
- **/
- private static BinLog insertOrDeletedColum(Serializable[] row, String dbTable, Map<String, Field> columMap, EventType eventType) {
- if (null == row || null == columMap || row.length != columMap.size()) {
- return null;
- }
- // 初始化Item
- BinLog item = new BinLog();
- item.setEventType(eventType);
- item.setFields(columMap);
- Map<String, Serializable> beOrAf = new HashMap<>();
- columMap.forEach((key, colum) -> {
- Serializable serializable = row[colum.inx];
- if (serializable instanceof byte[]) {
- beOrAf.put(key, new String((byte[]) serializable));
- } else {
- beOrAf.put(key, serializable);
- }
- });
- // 写操作放after,删操作放before
- if (isWrite(eventType)) {
- item.setAfter(beOrAf);
- }
- if (isDelete(eventType)) {
- item.setBefore(beOrAf);
- }
- return item;
- }
-
- /**
- * @param mapEntry
- * @param columMap
- * @param eventType
- * @Description 更新操作数据格式化
- * @Throws
- * @Return com.harmonywisdom.binlog.entity.BinLog
- * @Date 2024-08-12 16:52:46
- * @Author WangKun
- **/
- private static BinLog updateColum(Map.Entry<Serializable[], Serializable[]> mapEntry, Map<String, Field> columMap, EventType eventType) {
- if (null == mapEntry || null == columMap) {
- return null;
- }
- BinLog item = new BinLog();
- item.setEventType(eventType);
- item.setFields(columMap);
- Map<String, Serializable> be = new HashMap<>();
- Map<String, Serializable> af = new HashMap<>();
- columMap.forEach((key, colum) -> {
- Serializable serializableKey = mapEntry.getKey()[colum.inx];
- Serializable serializableValue = mapEntry.getValue()[colum.inx];
- if (serializableKey instanceof byte[]) {
- be.put(key, new String((byte[]) serializableKey));
- } else {
- be.put(key, serializableKey);
- }
- if (serializableValue instanceof byte[]) {
- af.put(key, new String((byte[]) serializableValue));
- } else {
- af.put(key, serializableValue);
- }
- });
- item.setBefore(be);
- item.setAfter(af);
- return item;
- }
-
- /**
- * @param data
- * @param dbTableIdCols
- * @param dbTableCols
- * @param eventType
- * @param queue
- * @Description 更新数据
- * @Throws
- * @Return void
- * @Date 2024-08-14 17:35:49
- * @Author WangKun
- **/
- public static void updateData(UpdateRowsEventData data, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, EventType eventType, BlockingQueue<BinLog> queue) {
- for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {
- if (dbTableIdCols.containsKey(data.getTableId())) {
- String dbTable = dbTableIdCols.get(data.getTableId());
- BinLog item = updateColum(row, dbTableCols.get(dbTable), eventType);
- item.setDbTable(dbTable);
- try {
- queue.put(item);
- } catch (InterruptedException e) {
- log.error("BinLog 更新数据添加阻塞队列异常:{}", e.getMessage(), e);
- }
- }
- }
- }
-
- /**
- * @param eventType
- * @param rows
- * @param tableId
- * @param dbTableIdCols
- * @param dbTableCols
- * @param queue
- * @Description 新增与删除数据
- * @Throws
- * @Return void
- * @Date 2024-08-13 17:30:30
- * @Author WangKun
- **/
- public static void insertOrDeletedData(EventType eventType, List<Serializable[]> rows, long tableId, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, BlockingQueue<BinLog> queue) {
- for (Serializable[] row : rows) {
- if (dbTableIdCols.containsKey(tableId)) {
- String dbTable = dbTableIdCols.get(tableId);
- BinLog item = insertOrDeletedColum(row, dbTable, dbTableCols.get(dbTable), eventType);
- item.setDbTable(dbTable);
- try {
- queue.put(item);
- } catch (InterruptedException e) {
- log.error("BinLog 新增或者删除数据添加阻塞队列异常:{}", e.getMessage(), e);
- }
- }
- }
- }
-
- }
BinLog监听
- /**
- * @Description 监听(@FunctionalInterface确保该接口只有以一个抽象方法)
- * @Author WangKun
- * @Date 2024/8/8 17:31
- * @Version
- */
- @FunctionalInterface
- public interface BinLogListener {
-
- void onEvent(BinLog binLog);
-
- }
- /**
- * @Description MySQL监听
- * @Author WangKun
- * @Date 2024/8/8 17:32
- * @Version
- */
- @Slf4j
- public class MySQLBinLogListener implements BinaryLogClient.EventListener {
-
-
- /**
- * BinLog连接信息
- **/
- private final BinaryLogClient client;
-
- /**
- * 阻塞队列,存放信息
- **/
- private final BlockingQueue<BinLog> queue;
-
- /**
- * 线程池
- **/
- private final ExecutorService executorService;
-
- /**
- * 存放每张数据表对应的listener器,允许将多个值存储在单个键下(每张表一个监听器)
- **/
- private final Multimap<String, BinLogListener> listeners;
-
- /**
- * 存放监控所有库表结构
- **/
- private final Map<String, Map<String, Field>> dbTableCols;
-
- /**
- * 存放改变的库表结构
- **/
- private final Map<Long, String> dbTableIdCols;
-
- /**
- * @param conf
- * @Description 监听器初始化配置
- * @Throws
- * @Return
- * @Date 2024-08-13 16:53:18
- * @Author WangKun
- **/
- public MySQLBinLogListener(BinLogConfig conf) {
- BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPassword());
- EventDeserializer eventDeserializer = new EventDeserializer();
- // 序列化
- eventDeserializer.setCompatibilityMode(
- EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
- EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
- );
- client.setEventDeserializer(eventDeserializer);
- client.setServerId(conf.getServerId());
- this.client = client;
- this.queue = new ArrayBlockingQueue<>(ThreadPoolConfig.queueCapacity);
- this.listeners = ArrayListMultimap.create();
- this.dbTableCols = new ConcurrentHashMap<>();
- this.dbTableIdCols = new ConcurrentHashMap<>();
- // 开启线程池
- this.executorService = ThreadPoolUtils.create().setPrefixName("Binlog-Listener-Thread").setCorePoolSize(6).build();
- }
-
- /**
- * @param event
- * @Description 监听处理, 只支持MySQL中BinLog的ROW模式的
- * @Throws
- * @Return void
- * @Date 2024-08-13 16:54:01
- * @Author WangKun
- **/
- @Override
- public void onEvent(Event event) {
- EventType eventType = event.getHeader().getEventType();
- // 装配库表结构
- if (eventType == EventType.TABLE_MAP) {
- TableMapEventData tableData = event.getData();
- String dbTable = BinLogUtils.getDbTable(tableData.getDatabase(), tableData.getTable());
- if (dbTableCols.containsKey(dbTable)) {
- dbTableIdCols.put(tableData.getTableId(), dbTable);
- }
- }
- //新增数据
- if (EventType.isWrite(eventType)) {
- WriteRowsEventData data = event.getData();
- BinLogDataUtils.insertOrDeletedData(eventType, data.getRows(), data.getTableId(), dbTableIdCols, dbTableCols, queue);
- } else if (EventType.isUpdate(eventType)) {
- // 更新数据
- UpdateRowsEventData data = event.getData();
- BinLogDataUtils.updateData(data, dbTableIdCols, dbTableCols, eventType, queue);
- } else if (EventType.isDelete(eventType)) {
- // 删除数据
- DeleteRowsEventData data = event.getData();
- BinLogDataUtils.insertOrDeletedData(eventType, data.getRows(), data.getTableId(), dbTableIdCols, dbTableCols, queue);
- }
- }
-
- /**
- * @param db
- * @param table
- * @param listener
- * @Description 注册监听
- * @Throws
- * @Return void
- * @Date 2024-08-13 17:32:44
- * @Author WangKun
- **/
- public void registerListener(String db, String table, BinLogListener listener) {
- String dbTable = BinLogUtils.getDbTable(db, table);
- // 连接获取字段集合
- Map<String, Field> cols = BinLogDataUtils.getColumnsMap(db, table);
- // 保存字段信息
- dbTableCols.put(dbTable, cols);
- // 保存当前注册的listener
- listeners.put(dbTable, listener);
- }
-
- /**
- * @param
- * @Description 开启异步多线程消费
- * @Throws
- * @Return void
- * @Date 2024-08-13 18:02:48
- * @Author WangKun
- **/
- @Async
- public void openThreadConsumeBinLog(){
- client.registerEventListener(this);
- for (int i = 0; i < ThreadPoolConfig.corePoolSize*ThreadPoolConfig.CPU_NUMS; i++) {
- executorService.execute(() -> {
- // 轮询监控
- while (true) {
- if (!queue.isEmpty()) {
- try {
- BinLog binLogQueue = queue.take();
- listeners.get(binLogQueue.getDbTable()).forEach(binLogListener -> binLogListener.onEvent(binLogQueue));
- } catch (InterruptedException e) {
- log.error("BinLog多线程消费异常:{}", e.getMessage(), e);
- }
- }
- }
- });
- }
- try {
- //连接(不设置时间将会使用主线程)
- client.connect(BinLogUtils.QUEUE_SLEEP);
- } catch (Exception e) {
- log.error("BinLog多线程连接消费异常:{}", e.getMessage(), e);
- }
- }
- }
- /**
- * @Description 初始化Binlog监听
- * @Author WangKun
- * @Date 2024/8/9 10:36
- * @Version
- */
- @Slf4j
- @RequiredArgsConstructor
- @Component
- @Order(value = 1)
- public class BinLogInitListener implements CommandLineRunner {
-
- /**
- * 资源注入
- **/
- private final BinLogConfig config;
-
- /**
- * @param args
- * @Description 初始化
- * @Throws
- * @Return void
- * @Date 2024-08-13 14:07:49
- * @Author WangKun
- **/
- @Override
- public void run(String... args) throws Exception {
- try {
- // 初始化监听器
- MySQLBinLogListener mySqlBinLogListener = new MySQLBinLogListener(config);
- this.getListMap().forEach((db, tables) -> {
- tables.forEach(table -> {
- mySqlBinLogListener.registerListener(db, table, info -> {
- if(info.getEventType().name().contains(BinLogEventEnum.UPDATE.getKey())){
- log.info("库.表: {}, 修改之前:{}" ,db+"."+table,info.getBefore().toString());
- log.info("库.表: {}, 修改之后:{}" ,db+"."+table,info.getAfter().toString());
- }
- if(info.getEventType().name().contains(BinLogEventEnum.WRITE.getKey())){
- log.info("库.表: {}, 新增: {}" ,db+"."+table,info.getAfter().toString());
- }
- if(info.getEventType().name().contains(BinLogEventEnum.DELETE.getKey())){
- log.info("库.表: {}, 删除: {}" ,db+"."+table,info.getBefore().toString());
- }
- });
- });
- });
- // 开启多线程消费
- mySqlBinLogListener.openThreadConsumeBinLog();
- } catch (Exception e) {
- log.error("BinLog初始化监听异常:{}", e.getMessage(), e);
- }
- }
-
- /**
- * @param
- * @Description 初始化监听库表
- * @Throws
- * @Return java.util.Map<java.lang.String, java.util.List < java.lang.String>>
- * @Date 2024-08-12 16:19:32
- * @Author WangKun
- **/
- private Map<String, List<String>> getListMap() {
- Map<String, List<String>> map = new ConcurrentHashMap<>();
- try {
- for (String key : config.getTables()) {
- // 正则转义,要加双斜线
- String[] split = key.split(BinLogUtils.D_SLASH + BinLogUtils.POINT);
- if (split.length != 2) {
- log.error("BinLog配置同步,类型错误 [库名.表名]。请正确配置:{}", key);
- throw new Exception("BinLog配置同步,类型错误 [库名.表名]。请正确配置:" + key);
- }
- map.computeIfAbsent(split[0], k -> new ArrayList<>()).add(split[1]);
- }
- return map;
- } catch (Exception e) {
- log.error("BinLog配置同步,类型错误 [库名.表名]。请正确配置:{}", e.getMessage(), e);
- }
- return map;
- }
-
- }
启动IDEA,在控制台出现以下信息,成功
2024-08-19 17:40:47.129 INFO 493984 --- [ blc-localhost:3306] c.g.shyiko.mysql.binlog.BinaryLogClient : Connected to localhost:3306 at log.000004/7294671 (sid:1, cid:800)
效果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。