赞
踩
Canal是阿里巴巴开源的数据库binlog日志解析工具,其可以监听MySQL、PostgreSQL、TiDB、Oracle数据库等数据库的binlog,并将变更实时地推送给指定的消费端。
以下为监听mysql的binlog日志流程:
找到mysql的my.ini文件(或者my.cnf文件)找到 [mysqld] 部分,并添加如下配置:
log-bin = mysql-bin
server-id = 1
binlog_format = raw
binlog-do-db=canal_text(要监听的库名,如果不配置,默认监听所有库)
show variables like '%log_bin%';
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'root';
刷新:
FLUSH PRIVILEGES;
- <parent>
-
- <artifactId>spring-boot-parent</artifactId>
-
- <groupId>org.springframework.boot</groupId>
-
- <version>2.1.4.RELEASE</version>
-
- </parent>
-
- <dependencies>
-
- <!--canal依赖-->
-
- <dependency>
-
- <groupId>com.alibaba.otter</groupId>
-
- <artifactId>canal.client</artifactId>
-
- <version>1.1.5</version>
-
- </dependency>
-
- <!-- Message、CanalEntry.Entry等来自此安装包 -->
-
- <dependency>
-
- <groupId>com.alibaba.otter</groupId>
-
- <artifactId>canal.protocol</artifactId>
-
- <version>1.1.5</version>
-
- </dependency>
-
- <dependency>
-
- <groupId>org.slf4j</groupId>
-
- <artifactId>jcl-over-slf4j</artifactId>
-
- <version>1.5.6</version>
-
- </dependency>
-
- <dependency>
-
- <groupId>org.eclipse.aether</groupId>
-
- <artifactId>aether-impl</artifactId>
-
- <version>1.0.0.v20140518</version>
-
- </dependency>
-
- <dependency>
-
- <groupId>org.eclipse.jetty</groupId>
-
- <artifactId>jetty-server</artifactId>
-
- <version>9.4.46.v20220331</version>
-
- </dependency>
-
- <dependency>
-
- <groupId>org.projectlombok</groupId>
-
- <artifactId>lombok</artifactId>
-
- </dependency>
-
- <dependency>
-
- <groupId>mysql</groupId>
-
- <artifactId>mysql-connector-java</artifactId>
-
- <version>8.0.15</version>
-
- </dependency>
-
- </dependencies>
客户端代码:
- package org.example;
-
- import com.alibaba.otter.canal.client.CanalConnector;
-
- import com.alibaba.otter.canal.client.CanalConnectors;
-
- import com.alibaba.otter.canal.protocol.CanalEntry;
-
- import com.alibaba.otter.canal.protocol.Message;
-
- import java.net.InetSocketAddress;
-
- import java.util.ArrayList;
-
- import java.util.List;
-
- public class Monitor {
-
- //static List<String> str_list = new ArrayList<>();
-
- static String operate;
-
- public static void main(String[] args) throws InterruptedException {
-
- CanalConnector connector = CanalConnectors.newSingleConnector(
-
- new InetSocketAddress("192.168.26.114",11111), "example", "admin", "4ACFE3202A5FF5CF467898FC58AAB1D615029441");//之前配置的canal的注册地址和端口号,默认端口号为11111
-
- int batchSize = 1000;
-
- try {
-
- connector.connect();
-
- connector.subscribe(".*\\..*");
-
- connector.rollback();
-
- while (true) {
-
- Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
-
- long batchId = message.getId();
-
- int size = message.getEntries().size();
-
- if (batchId == -1 || size == 0) {//间隔一秒
-
- // 没有变化,等一秒钟再去拉取数据
-
- Thread.sleep(1000);
-
- } else {
-
- printEntry(message.getEntries());
-
- }
-
- connector.ack(batchId); // 提交确认
-
- // connector.rollback(batchId); // 处理失败, 回滚数据
-
- }
-
- } finally {
-
- connector.disconnect();
-
- }
-
- }
-
-
- private static void printEntry(List<CanalEntry.Entry> entrys) {
-
- for (CanalEntry.Entry entry : entrys) {
-
- if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
-
- continue;
-
- }
-
-
- CanalEntry.RowChange rowChage = null;
-
- try {
-
- rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
-
- } catch (Exception e) {
-
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
-
- e);
-
- }
-
- CanalEntry.EventType eventType = rowChage.getEventType();
-
- operate = eventType.toString();//获取操作方式(增删改)
-
- System.out.println(String.format("================> \nbinlog[%s:%s]\ndatabase_name: %s\ntable_name: %s\neventType : %s",
-
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
-
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
-
- eventType));//java客户端输出信息
-
- for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {//判断增删改
-
- if (eventType == CanalEntry.EventType.DELETE) {
-
- printColumn(rowData.getBeforeColumnsList());
-
- } else if (eventType == CanalEntry.EventType.INSERT) {
-
-
- printColumn(rowData.getAfterColumnsList());
-
- } else {//修改会产生两条信息,一条修改前,一条修改后
-
- System.out.println("-------> before");
-
- operate = eventType.toString()+"-"+"before";//修改前的记录为UPDATE-before
-
- printColumn(rowData.getBeforeColumnsList());
-
- operate = eventType.toString()+"-"+"after";//修改后的记录为UPDATE-after
-
- System.out.println("-------> after");
-
- printColumn(rowData.getAfterColumnsList());
-
- }
-
- }
-
- }
-
- }
-
- private static void printColumn(List<CanalEntry.Column> columns) {//修改内容函数,获取修改内容,并写入新的记录表
-
- List<String> str_list = new ArrayList<>();//定义新的集合,用来记录修改内容数据
-
- String name,age,sal;
-
- for (CanalEntry.Column column : columns) {
-
- str_list.add(column.getValue());
-
- System.out.println(column.getName() + " : " + column.getValue() );
-
- }
-
- DDL_Mysql a = new DDL_Mysql();//定义插入类对象
-
- name = str_list.get(0);//将修改的name字段数据赋值给name
-
- age = str_list.get(1);//将修改的age字段数据赋值给age
-
- sal = str_list.get(2);//将修改的sal字段数据赋值给sal
-
- a.insert(name,age,sal,operate);//调用插入函数,将mysql日志的数据的内容和操作方式插入记录表中
-
- }
-
- }
- package org.example;
-
- import java.sql.Connection;
-
- import java.sql.DriverManager;
-
- import java.sql.ResultSet;
-
- import java.sql.SQLException;
-
- import java.sql.Statement;
-
- import java.sql.PreparedStatement;
-
- public class DDL_Mysql {
-
- String driver = "com.mysql.jdbc.Driver";
-
- /*
- * URL指向要访问的数据库名(我用的是数据库名为test)
- *本地用127.0.0.1
- */
-
- String url = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=GMT";
-
- String user = "root"; //MySQL配置时的用户名
-
- String password = "root"; //MySQL配置时的密码
-
- //throw则是指抛出的一个具体的异常类型。
-
-
- /*
- * 连接数据库
- */
-
-
- public Connection getConn()
-
- {
-
- Connection con = null;
-
- try
-
- { 加载驱动程序
-
- Class.forName(driver);
-
- }
-
- catch (ClassNotFoundException e)
-
- {
-
- e.printStackTrace();
-
- }
-
- try
-
- {
-
- con = DriverManager.getConnection(url,user,password);//注意是三个参数
-
- }
-
- catch (SQLException e)
-
- {
-
- e.printStackTrace();
-
- }
-
- return con;
-
- }
-
-
-
- /*
- * 插入操作
- */
-
-
-
- public int insert(String name,String age,String sal,String operate)
-
- {
-
- Connection con = getConn();
-
- String sql = "insert into test (name, age, sal,operate) values (?, ?, ?, ?)";
-
- try {
-
- //用来执行SQL语句
-
- PreparedStatement pst = con.prepareStatement(sql);
-
- pst.setString(1,name);
-
- pst.setString(2, age);
-
- pst.setString(3, sal);
-
- pst.setString(4, operate);
-
- i = pst.executeUpdate();
-
- }
-
- catch (Exception e) {
-
- e.printStackTrace();
-
- }
-
- System.out.println(i);
-
- return i; //返回影响的行数,1为执行成功
-
- }
-
- }
测试中使用两张表,一张是canal_text库中的test表(操作表),一张是test库中的test表(记录表)
操作表:
记录表:
当我对操作表添加一条数据时:
操作表中加入一条信息:
同时,记录库中也有一条操作信息,包括操作方式:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。