当前位置:   article > 正文

Canal监听mysql日志数据(增、删、改操作)windows版_canal监听binlog记录日志

canal监听binlog记录日志

Canal是阿里巴巴开源的数据库binlog日志解析工具,其可以监听MySQL、PostgreSQL、TiDB、Oracle数据库等数据库的binlog,并将变更实时地推送给指定的消费端。

以下为监听mysql的binlog日志流程:

一、打开mysql的binlog权限:

1、打开方式:

找到mysql的my.ini文件(或者my.cnf文件)找到 [mysqld] 部分,并添加如下配置:

log-bin = mysql-bin

server-id = 1

binlog_format = raw

binlog-do-db=canal_text(要监听的库名,如果不配置,默认监听所有库)

2、查看是否开启成功:

show variables like '%log_bin%';

3、给canal链接数据库权限:

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'root';

刷新:

FLUSH PRIVILEGES;

二、下载canal(windows版):

1、修改conf /canal.properties配置文件,注册地址为本机IP

2、修改conf/example/instance.properties配置文件,配置数据库信息

3、启动服务:Windows下使用startup.bat

4、启动成功:

三、java依赖和客户端代码:

1、配置java和maven环境后加入依赖包(java环境和maven环境配置可以上网搜,有详细教程):
  1. <parent>
  2.    <artifactId>spring-boot-parent</artifactId>
  3.    <groupId>org.springframework.boot</groupId>
  4.    <version>2.1.4.RELEASE</version>
  5. </parent>
  6. <dependencies>
  7.    <!--canal依赖-->
  8.    <dependency>
  9.        <groupId>com.alibaba.otter</groupId>
  10.        <artifactId>canal.client</artifactId>
  11.        <version>1.1.5</version>
  12.    </dependency>
  13.    <!-- Message、CanalEntry.Entry等来自此安装包 -->
  14.    <dependency>
  15.        <groupId>com.alibaba.otter</groupId>
  16.        <artifactId>canal.protocol</artifactId>
  17.        <version>1.1.5</version>
  18.    </dependency>
  19.    <dependency>
  20.        <groupId>org.slf4j</groupId>
  21.        <artifactId>jcl-over-slf4j</artifactId>
  22.        <version>1.5.6</version>
  23.    </dependency>
  24.    <dependency>
  25.        <groupId>org.eclipse.aether</groupId>
  26.        <artifactId>aether-impl</artifactId>
  27.        <version>1.0.0.v20140518</version>
  28.    </dependency>
  29.    <dependency>
  30.        <groupId>org.eclipse.jetty</groupId>
  31.        <artifactId>jetty-server</artifactId>
  32.        <version>9.4.46.v20220331</version>
  33.    </dependency>
  34.    <dependency>
  35.        <groupId>org.projectlombok</groupId>
  36.        <artifactId>lombok</artifactId>
  37.    </dependency>
  38.    <dependency>
  39.        <groupId>mysql</groupId>
  40.        <artifactId>mysql-connector-java</artifactId>
  41.        <version>8.0.15</version>
  42.    </dependency>
  43. </dependencies>
2、编写客户端和写入记录表操作:

客户端代码:

  1. package org.example;
  2. import com.alibaba.otter.canal.client.CanalConnector;
  3. import com.alibaba.otter.canal.client.CanalConnectors;
  4. import com.alibaba.otter.canal.protocol.CanalEntry;
  5. import com.alibaba.otter.canal.protocol.Message;
  6. import java.net.InetSocketAddress;
  7. import java.util.ArrayList;
  8. import java.util.List;
  9. public class Monitor {
  10.     //static List<String> str_list = new ArrayList<>();
  11.     static String operate;
  12.     public static void main(String[] args) throws InterruptedException {
  13.     CanalConnector connector = CanalConnectors.newSingleConnector(
  14.             new InetSocketAddress("192.168.26.114",11111), "example", "admin", "4ACFE3202A5FF5CF467898FC58AAB1D615029441");//之前配置的canal的注册地址和端口号,默认端口号为11111
  15.     int batchSize = 1000;
  16.     try {
  17.         connector.connect();
  18.         connector.subscribe(".*\\..*");
  19.         connector.rollback();
  20.         while (true) {
  21.             Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
  22.             long batchId = message.getId();
  23.             int size = message.getEntries().size();
  24.             if (batchId == -1 || size == 0) {//间隔一秒
  25.                 // 没有变化,等一秒钟再去拉取数据
  26.                 Thread.sleep(1000);
  27.             } else {
  28.                 printEntry(message.getEntries());
  29.             }
  30.             connector.ack(batchId); // 提交确认
  31.             // connector.rollback(batchId); // 处理失败, 回滚数据
  32.         }
  33.     } finally {
  34.         connector.disconnect();
  35.     }
  36. }
  37.     private static void printEntry(List<CanalEntry.Entry> entrys) {
  38.         for (CanalEntry.Entry entry : entrys) {
  39.             if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
  40.                 continue;
  41.             }
  42.             CanalEntry.RowChange rowChage = null;
  43.             try {
  44.                 rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  45.             } catch (Exception e) {
  46.                 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
  47.                         e);
  48.             }
  49.             CanalEntry.EventType eventType = rowChage.getEventType();
  50.             operate = eventType.toString();//获取操作方式(增删改)
  51.             System.out.println(String.format("================> \nbinlog[%s:%s]\ndatabase_name: %s\ntable_name: %s\neventType : %s",
  52.                     entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  53.                     entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  54.                     eventType));//java客户端输出信息
  55.             for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {//判断增删改
  56.                 if (eventType == CanalEntry.EventType.DELETE) {
  57.                     printColumn(rowData.getBeforeColumnsList());
  58.                 } else if (eventType == CanalEntry.EventType.INSERT) {
  59.                     printColumn(rowData.getAfterColumnsList());
  60.                 } else {//修改会产生两条信息,一条修改前,一条修改后
  61.                     System.out.println("-------> before");
  62.                     operate = eventType.toString()+"-"+"before";//修改前的记录为UPDATE-before
  63.                     printColumn(rowData.getBeforeColumnsList());
  64.                     operate = eventType.toString()+"-"+"after";//修改后的记录为UPDATE-after
  65.                     System.out.println("-------> after");
  66.                     printColumn(rowData.getAfterColumnsList());
  67.                 }
  68.             }
  69.         }
  70.     }
  71.     private static void printColumn(List<CanalEntry.Column> columns) {//修改内容函数,获取修改内容,并写入新的记录表
  72.         List<String> str_list = new ArrayList<>();//定义新的集合,用来记录修改内容数据
  73.         String name,age,sal;
  74.         for (CanalEntry.Column column : columns) {
  75.             str_list.add(column.getValue());
  76.             System.out.println(column.getName() + " : " + column.getValue() );
  77.          }
  78.         DDL_Mysql a = new DDL_Mysql();//定义插入类对象
  79.         name = str_list.get(0);//将修改的name字段数据赋值给name
  80.         age = str_list.get(1);//将修改的age字段数据赋值给age
  81.         sal = str_list.get(2);//将修改的sal字段数据赋值给sal
  82.         a.insert(name,age,sal,operate);//调用插入函数,将mysql日志的数据的内容和操作方式插入记录表中
  83.     }
  84. }
3、编写插入记录表代码:
  1. package org.example;
  2. import java.sql.Connection;
  3. import java.sql.DriverManager;
  4. import java.sql.ResultSet;
  5. import java.sql.SQLException;
  6. import java.sql.Statement;
  7. import java.sql.PreparedStatement;
  8. public class DDL_Mysql {
  9.     String driver = "com.mysql.jdbc.Driver";
  10.     /*
  11.      * URL指向要访问的数据库名(我用的是数据库名为test)
  12.      *本地用127.0.0.1
  13.      */
  14.     String url = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=GMT";
  15.     String user = "root";      //MySQL配置时的用户名
  16.     String password = "root";   //MySQL配置时的密码
  17.     //throw则是指抛出的一个具体的异常类型。
  18.     /*
  19.      * 连接数据库
  20.      */
  21.     public  Connection getConn()
  22.     {
  23.         Connection con = null;
  24.         try
  25.         {  加载驱动程序
  26.             Class.forName(driver);
  27.         }
  28.         catch (ClassNotFoundException e)
  29.         {
  30.             e.printStackTrace();
  31.         }
  32.         try
  33.         {
  34.             con = DriverManager.getConnection(url,user,password);//注意是三个参数
  35.         }
  36.         catch (SQLException e)
  37.         {
  38.             e.printStackTrace();
  39.         }
  40.         return con;
  41.     }
  42.    
  43.     /*
  44.      * 插入操作
  45.      */
  46.     public int insert(String name,String age,String sal,String operate)
  47.     {
  48.         Connection con = getConn();
  49.         String sql = "insert into test (name, age, sal,operate) values (?, ?, ?, ?)";
  50.         try {
  51.             //用来执行SQL语句
  52.             PreparedStatement pst = con.prepareStatement(sql);
  53.             pst.setString(1,name);
  54.             pst.setString(2, age);
  55.             pst.setString(3, sal);
  56.             pst.setString(4, operate);
  57.             i = pst.executeUpdate();
  58.         }
  59.         catch (Exception e) {
  60.             e.printStackTrace();
  61.         }
  62.         System.out.println(i);
  63.         return i; //返回影响的行数,1为执行成功
  64.     }
  65. }

四、测试:

  测试中使用两张表,一张是canal_text库中的test表(操作表),一张是test库中的test表(记录表)

操作表:

记录表:

当我对操作表添加一条数据时:

操作表中加入一条信息:

同时,记录库中也有一条操作信息,包括操作方式:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/587394
推荐阅读
相关标签
  

闽ICP备14008679号