当前位置:   article > 正文

使用Canal进行数据同步(附代码-SpringBoot+Canal)_spring配置canal

spring配置canal

一、简介

canal主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费。
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

可以理解为一个增量数据同步工具

在这里插入图片描述
基于日志增量订阅和消费的业务包括

  1. 数据库镜像
  2. 数据库实时备份
  3. 索引构建和实时维护(拆分异构索引、倒排索引等)
  4. 业务cache刷新
  5. 带业务逻辑的增量数据处理

二、工作原理

mysql主从复制
在这里插入图片描述

  1. MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)。
  2. MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log) 。
  3. MySQL slave 重放 relay log中事件,将数据变更反映它自己的数据。

canal工作原理:

  1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump协议。
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ) 。
  3. canal 解析 binary log 对象(原始为 byte 流)。

三、开启mysqlBinlog日志(必要条件)

什么是Binlog日志

Binlog日志记录了所有DDL和DML语句,以事件的形式记录,同时还会记录所花费的时间,是mysql日志中最重要的日志。开启Binlog日志后对性能的损耗可以忽略不计,而且Binlog日志是事务安全型。

Binlog日志有两个重要的使用场景:

数据同步以及数据恢复

  1. 数据同步:在Master节点上开启mysqlBinlog日志,对DDL和DML语句进行记录,将Binlog二进制文件发送给Slaves节点,进行数据同步。
  2. 数据恢复:使用MysqlBinlog工具恢复Mysql数据。

binlog的分类设置:

MySQL的binlog的格式有三种,分别是STATEMENT、MIXED、ROW。

statement [ 语句级 ]row [ 行级 ]mixed [ 综合语句级和行级 ]
相对row模式节省空间,但是可能产生不一致性binlog会记录每次操作后每行记录的变化。​statement的升级版,一定程度上解决了因一些情况而造成的statement模式不一致问题
节省空间保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。​节省空间,同时兼顾了一定的一致性。
有可能造成数据不一致(语句执行时间为异步,如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同)占用较大空间。​还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。

开启binlog日志需要修改my.cnf文件。

在这里插入图片描述

需配置以下内容:

[mysqld]
server-id=1 #id(只要不和下文中Canal的id相同即可)
log-bin=mysql-bin
binlog_format=row #以行级开启日志
binlog-do-db=******** #需要监听的数据库
  • 1
  • 2
  • 3
  • 4
  • 5

四、安装canal

docker pull canal/canal-server
#以11111端口启动(如果是阿里云ECS服务器,记得在安全组对11111端口进行放行。)
docker run -d -p 11111:11111 --name canal canal/canal-server
  • 1
  • 2
  • 3

注意canal.properties文件中的端口号(文件在docker容器的/home/admin/canal-server/conf目录下)。

  1. 注意检查第一个红框部门的端口号是否正确。
  2. 可以在第二个红框位置对同步策略进行更换,可以选择tcp,kafka,RocketMq,这里使用的是tcp。
    在这里插入图片描述

接下来修改instance.properties文件(文件在docker容器中/home/admin/canal-server/conf/example目录下)。

  1. 打码部分更改位自己的数据库地址以及端口号。
  2. 红框部分为数据库账号密码。

在这里插入图片描述

五、在代码中整合canal(SpringBoot+canal)

1、pom文件引入

	<!--canal-->
     <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <!--与自己的canal版本一致-->
        <version>1.1.3</version>
     </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2、项目配置canal

canal-monitor-mysql:
  hostname: 部署canal的服务器地址
  port: 11111
  • 1
  • 2
  • 3

3、代码集成

@Slf4j
@Component
public class CanalCommandLineRunner implements CommandLineRunner {

    @Override
    public void run(String... args) throws Exception {
       //在canal部署的conf/canal.properties ip和端口信息
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("你的canal地址", 你的canal端口号),
                "example",
                "",
                "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表q
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(1);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId > 0 && size != 0) {
                    handleDataChange(message.getEntries());
                }
                // 提交确认
                connector.ack(batchId);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
            //防止频繁访问数据库链接: 线程睡眠 10秒
            try {
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleDataChange(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            // 只解析mysql事务的操作,其他的不解析
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            //RowChange对象,包含了一行数据变化的所有特征
            CanalEntry.RowChange rowChange;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry, e);
            }
            CanalEntry.EventType eventType = rowChange.getEventType();
            // 获取当前操作所属的数据库
            String dbName = entry.getHeader().getSchemaName();
            // 获取当前操作所属的表
            String tableName = entry.getHeader().getTableName();

            // 事务提交时间
            long timestamp = entry.getHeader().getExecuteTime();

            log.info("Canal监测到更新:【{}】库的【{}】表", dbName, tableName);

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, eventType);
                log.info("-------------------------------------------------------------");
            }
        }
    }

    /**
     * 解析具体一条Binlog消息的数据
     * @param tableName 当前操作所属表名称
     * @param eventType 当前操作类型(新增、修改、删除)
     */
    private static void dataDetails(List<CanalEntry.Column> beforeColumns,
                                    List<CanalEntry.Column> afterColumns,
                                    String tableName,
                                    CanalEntry.EventType eventType) {

        JSONObject beforeData = new JSONObject();

        for (CanalEntry.Column column : beforeColumns) {
            beforeData.put(column.getName(), column.getValue());
        }
        JSONObject afterData = new JSONObject();

        for (CanalEntry.Column column : afterColumns) {
            afterData.put(column.getName(), column.getValue());
        }

        System.out.println("tableName = " + tableName +
                ",eventType = " + eventType +
                ",beforeData = " + beforeData +
                ",afterData = " + afterData);

        SecurityUser securityUser = JSONObject.toJavaObject(afterData, SecurityUser.class);

        System.out.println("securityUser = " + securityUser);

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/801342
推荐阅读
相关标签
  

闽ICP备14008679号