赞
踩
最近遇到一个需求,需要监听数据库中的数据变化,并及时通知后端服务做出相应的处理。本文将介绍如何使用四种方式实现监听MySQL数据库中的数据变化并通知后端服务的功能,包括:
MySQL
自带的Binlog
方式轮询方式是指定时查询MySQL数据库中的某个表,然后与上一次查询结果进行比较,从而得知是否有数据发生变化。
它通过定期查询MySQL数据库的方式来检测数据变化。我们可以在后端服务中使用定时器,在一定时间间隔内轮询MySQL数据库,从而检测数据变化并做出相应的动作。
具体实现步骤如下:
轮询方法的优点是实现简单,不需要额外的插件或配置,适用于数据变化频率较低的场景。但是存在以下几个问题:
对于高并发场景下实时性要求较高的情况不适用,轮询方法存在一定的性能问题,轮询间隔过短会增加数据库负担,轮询间隔过长又可能会错过数据变化。
这种方法是在MySQL数据库中使用触发器,触发器是一种数据库对象,在数据插入或更新时触发相应的事件,并将事件信息传递给后端服务。这种方法可以保证实时性,但是需要在MySQL数据库中添加触发器,增加了系统复杂度。此外,触发器的数量和复杂度也会对数据库性能造成影响。
这种方法的优点是可以减少对数据库的轮询次数,提高了性能,同时也可以更及时地获得数据变化的通知。但是,它的缺点是比较难以维护,容易导致性能问题,因为触发器的开销很大。
具体实现步骤如下:
CREATE TRIGGER my_trigger AFTER INSERT ON my_table
FOR EACH ROW
BEGIN
-- 在此处编写通知后端服务的代码
END;
基于触发器的方法的优点是能够实时检测数据变化,并且不需要在后端服务中进行轮询操作。但是,该方法需要在MySQL数据库中创建触发器,需要对MySQL数据库有一定的了解和操作权限,并且在高并发的场景下可能存在性能问题。
使用MySQL的binlog日志:MySQL的binlog日志记录了数据库的所有修改操作,可以通过读取binlog日志来获取数据库中的数据变化,并发送通知到后端服务。
这种方法是通过解析MySQL Binlog日志文件中的数据变更事件,识别和提取感兴趣的事件,并通知后端服务。这种方法可以保证实时性,且不需要在MySQL数据库中添加额外的触发器或表,但需要对MySQL Binlog日志文件进行解析,实现起来较为复杂。
该方式具有以下优点:
具体实现步骤如下:
1. 开启binlog:在MySQL配置文件中,将`log_bin`参数设置为ON。
2. 使用MySQL的`mysqlbinlog`命令行工具将binlog中的内容读取出来,并进行解析。
3. 解析binlog中的内容,判断数据变更的类型,如果是插入、更新或删除,则触发对应的回调函数,通知后端服务进行相应的处理。
需要注意的是,使用binlog监听MySQL数据变更需要注意以下问题:
综上所述,使用binlog实现MySQL数据变更的监听需要一定的技术功底和额外的开销,但是可以实现较为精确的数据变更监听,并且支持对数据变更进行回滚等操作。在一些对数据一致性要求较高的场景中,可以考虑使用这种方式。
Canal是阿里巴巴开源的一款基于MySQL数据库的增量数据订阅和消费组件,可以将MySQL数据库的数据变更事件以消息的形式通知到后端服务,支持多种协议和多种编程语言。且相比从MySQL Binlog中解析法来说实现起来更为简便。
Canal通过读取MySQL的binlog日志来获取数据库中数据的变化情况,提供了高吞吐、低延迟、低侵入性的数据库增量订阅&消费解决方案。
Canal的工作原理如下图所示:
Canal架构图中,MySQL的binlog日志是Canal的数据源,Canal Server是数据消费者,将解析后的数据发送给下游的消费者进行处理。Canal Client是在业务系统中部署的,用于连接Canal Server,订阅指定的数据变更事件,接收Canal Server发送的数据变更事件,并将数据变更事件转换为Java对象。
Canal的优点:
Canal的缺点:
Canal 的工作原理是模拟 MySQL 自身的复制机制。当有数据发生变化时,MySQL 会将该变化以二进制格式写入 binlog 日志中,Canal 会监听这个 binlog 日志,将其中的增量数据解析成对象,并发送给订阅者。
Canal 支持多种方式接入后端服务,包括 Kafka、RocketMQ、ActiveMQ 等,也可以通过 HTTP 接口直接获取数据。使用 Canal 监听 MySQL 数据库的数据变化,需要进行如下步骤:
首先需要从 Canal 的 GitHub 上下载 Canal 的 Server 和 Client,地址为 https://github.com/alibaba/canal/releases。
选择最新版本,然后下载 Server 和 Client 压缩包,解压缩。
在解压缩后的 Server 目录中,找到 conf 目录,编辑 instance.properties 文件,进行以下配置:
################################################# ## mysql serverId 实例名称 canal.instance.mysql.slaveId = 1234 # position info # MySQL 地址和端口号 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = # username/password # MySQL 用户名和密码 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = test canal.instance.connectionCharset = UTF-8 # table regex # 指定需要监听的数据库和表 canal.instance.filter.regex = .*\\..* # table black regex canal.instance.filter.black.regex = #################################################
address设置为mysql的连接地址,defaultDatabaseName设置为自己要监听的库名,如test。
在mysql命令行,创建一个新用户,作为slave
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
对应配置文件里的canal用户。到此配置完毕。
执行bin目录下的startup.sh
启动后可以在logs目录下查看日志。在example目录下的example.log,如果没有报错,说明启动成功。
服务端启动完毕后,在客户端即可监听test库的变化。
新建一个java maven项目,pom.xml里添加依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; /** * A Camel Application */ public class MainApp { /** * A main() so we can easily run these routing rules in our IDE */ public static void main(String... args) throws Exception { // 创建链接 // 设置canal server的ip和端口,端口默认为11111。 // example是和conf目录下的相对应 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { // 获取指定数量的数据 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry .EntryType .TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
以上四种方法都可以实现对 MySQL 数据库中的数据变化进行监听和通知,不同的方法适用于不同的场景,可以根据实际情况选择最合适的方法。
轮询方式和使用触发器方式相对简单,适合对数据变化的及时性和精确性要求不高的场景;Binlog
方式和Canal工具则更加强大和灵活,适用于对数据变化的及时性和精确性要求比较高的场景。
在实际应用中,需要根据业务需求和技术实现难度进行选择。同时,为了确保系统的稳定性和数据安全,需要对监听和通知的过程进行充分的测试和安全评估。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。