当前位置:   article > 正文

使用canal同步mysql数据库信息到RabbitMQ_下载canal从gitee

下载canal从gitee

1、canal简介

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

github:https://github.com/alibaba/canal/

gitee:https://gitee.com/mirrors/canal

2、工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

3、环境准备

3.1 首先安装好mysql及rabbitmq

mysql比较简单,这里不介绍,rabbitmq安装及使用可以参考:

RabbitMQ安装及简单使用

3.2 mysql需开启binlog

查看是否开启binlog

SHOW VARIABLES LIKE '%log_bin%'

如果log_bin的值为OFF是未开启,为ON是已开启。

未开启的话可以修改/etc/my.cnf 开启binlog

  1. [mysqld]
  2. log-bin=mysql-bin
  3. binlog-format=ROW
  4. server_id=1

配置好后重启mysql。

创建用于同步的mysql账号:

  1. create user canal@'%' IDENTIFIED by 'canal';
  2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
  3. FLUSH PRIVILEGES;

3.3 rabbitmq配置

在virtualHost:/  下新增Exchanges: canal.topic

 新增队列:test.queue,  绑定canal.topic, RoutingKey:test.routingKey

3.4 canal下载及配置

https://github.com/alibaba/canal/releases/tag/canal-1.1.5

 

目前最新版本为1.1.5,支持将数据发送至rabbitmq。

目录如下:

修改conf目录下的canal.properties 

  1. canal.serverMode = rabbitMQ
  2. rabbitmq.host = 127.0.0.1
  3. rabbitmq.virtual.host = /
  4. # rabbitmq中新建的 Exchange
  5. rabbitmq.exchange = canal.topic
  6. rabbitmq.username = guest
  7. rabbitmq.password = guest

修改conf/example下的instance.properties

  1. canal.instance.master.address=127.0.0.1:3306
  2. # mysql中配置的用于同步的canal用户
  3. canal.instance.dbUsername=canal
  4. canal.instance.dbPassword=canal
  5. # rabbitmq中配置的 绑定的 routingkey
  6. canal.mq.topic=test.routingKey

修改完成后,启动bin下的startup.bat(windows下,linux下启动startup.sh),查看logs/canal下canal.log, 如下内容说明启动成功:

  1. 2021-07-10 18:30:18.088 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
  2. 2021-07-10 18:30:18.147 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
  3. 2021-07-10 18:30:18.320 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
  4. 2021-07-10 18:30:18.899 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.104(192.168.0.104):11111]
  5. 2021-07-10 18:30:20.723 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

至此,环境准备完成。

4、数据同步测试

4.1 新建数据库

新建测试数据库:canal

可以看到rabbitmq,test.queue队列中收到数据:

{
    "data": null,
    "database": "`canal`",
    "es": 1625913686000,
    "id": 6,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "sql": "CREATE DATABASE `canal` CHARACTER SET utf8",
    "sqlType": null,
    "table": "",
    "ts": 1625913686928,
    "type": "QUERY"
}

4.2 新增表

新建测试表:tb_user

  1. CREATE TABLE `tb_user` (
  2. `id` int(11) NOT NULL AUTO_INCREMENT,
  3. `name` varchar(30) NOT NULL DEFAULT '',
  4. `address` varchar(200) DEFAULT NULL,
  5. `email` varchar(100) DEFAULT NULL,
  6. PRIMARY KEY (`id`)
  7. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

mq中收到数据:

{
    "data": null,
    "database": "canal",
    "es": 1625914138000,
    "id": 7,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "sql": "CREATE TABLE `tb_user` (\r\n`id`  int(11) NOT NULL AUTO_INCREMENT ,\r\n`name`  varchar(30) NOT NULL DEFAULT '' ,\r\n`address`  varchar(200) NULL ,\r\n`email`  varchar(100) NULL ,\r\nPRIMARY KEY (`id`)\r\n)",
    "sqlType": null,
    "table": "tb_user",
    "ts": 1625914138420,
    "type": "CREATE"
}

4.3 新增数据记录

在表中插入一条数据:

INSERT INTO `canal`.`tb_user` (`id`, `name`, `address`, `email`) VALUES ('1', 'jason', '安徽合肥', 'jason@qq.com');

{
    "data": [
        {
            "id": "1",
            "name": "jason",
            "address": "安徽合肥",
            "email": "jason@qq.com"
        }
    ],
    "database": "canal",
    "es": 1625914305000,
    "id": 8,
    "isDdl": false,
    "mysqlType": {
        "id": "int(11)",
        "name": "varchar(30)",
        "address": "varchar(200)",
        "email": "varchar(100)"
    },
    "old": null,
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": 4,
        "name": 12,
        "address": 12,
        "email": 12
    },
    "table": "tb_user",
    "ts": 1625914305197,
    "type": "INSERT"
}

通过 "type": "INSERT" 及data可以获取到新增的数据

4.4 更新数据

更新 jason 的 address 信息

{
    "data": [
        {
            "id": "1",
            "name": "jason",
            "address": "安徽合肥高新区",
            "email": "jason@qq.com"
        }
    ],
    "database": "canal",
    "es": 1625914494000,
    "id": 9,
    "isDdl": false,
    "mysqlType": {
        "id": "int(11)",
        "name": "varchar(30)",
        "address": "varchar(200)",
        "email": "varchar(100)"
    },
    "old": [
        {
            "address": "安徽合肥"
        }
    ],
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": 4,
        "name": 12,
        "address": 12,
        "email": 12
    },
    "table": "tb_user",
    "ts": 1625914494755,
    "type": "UPDATE"
}

通过 "type": "UPDATE",data及old中的信息 可以获取到更新的数据

4.5 删除数据

{
    "data": [
        {
            "id": "1",
            "name": "jason",
            "address": "安徽合肥高新区",
            "email": "jason@qq.com"
        }
    ],
    "database": "canal",
    "es": 1625914783000,
    "id": 10,
    "isDdl": false,
    "mysqlType": {
        "id": "int(11)",
        "name": "varchar(30)",
        "address": "varchar(200)",
        "email": "varchar(100)"
    },
    "old": null,
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": 4,
        "name": 12,
        "address": 12,
        "email": 12
    },
    "table": "tb_user",
    "ts": 1625914783109,
    "type": "DELETE"
}

通过 "type": "DELETE",data 就可以判断删除的数据

4.6 新增字段

新增remark字段

{
    "data": null,
    "database": "canal",
    "es": 1625914881000,
    "id": 11,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "sql": "ALTER TABLE `tb_user`\r\nADD COLUMN `remark`  varchar(100) NULL AFTER `email`",
    "sqlType": null,
    "table": "tb_user",
    "ts": 1625914881179,
    "type": "ALTER"
}

通过  "type": "ALTER" 及 sql即可知道 新增了remark列。

4.7 删除字段

删除email字段

{
    "data": null,
    "database": "canal",
    "es": 1625915072000,
    "id": 12,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "sql": "ALTER TABLE `tb_user`\r\nDROP COLUMN `email`",
    "sqlType": null,
    "table": "tb_user",
    "ts": 1625915072617,
    "type": "ALTER"
}

通过 "type": "ALTER",sql即可知道删除了email列。

5、总结

mysql中变更的数据都可以通过canal发送到rabbitmq,只要监听rabbitmq队列,就可以获取到变更的数据,进行业务处理,可以写入redis,刷新缓存,也可以同步到elasticsearch等等。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/445438
推荐阅读
相关标签
  

闽ICP备14008679号