赞
踩
canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。
基于日志增量订阅和消费的业务包括
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
github:https://github.com/alibaba/canal/
gitee:https://gitee.com/mirrors/canal
MySQL主备复制原理
canal 工作原理
mysql比较简单,这里不介绍,rabbitmq安装及使用可以参考:
查看是否开启binlog
SHOW VARIABLES LIKE '%log_bin%'
如果log_bin的值为OFF是未开启,为ON是已开启。
未开启的话可以修改/etc/my.cnf 开启binlog
- [mysqld]
- log-bin=mysql-bin
- binlog-format=ROW
- server_id=1
配置好后重启mysql。
创建用于同步的mysql账号:
- create user canal@'%' IDENTIFIED by 'canal';
- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
- FLUSH PRIVILEGES;
在virtualHost:/ 下新增Exchanges: canal.topic
新增队列:test.queue, 绑定canal.topic, RoutingKey:test.routingKey
https://github.com/alibaba/canal/releases/tag/canal-1.1.5
目前最新版本为1.1.5,支持将数据发送至rabbitmq。
目录如下:
修改conf目录下的canal.properties
- canal.serverMode = rabbitMQ
-
- rabbitmq.host = 127.0.0.1
- rabbitmq.virtual.host = /
- # rabbitmq中新建的 Exchange
- rabbitmq.exchange = canal.topic
- rabbitmq.username = guest
- rabbitmq.password = guest
修改conf/example下的instance.properties
- canal.instance.master.address=127.0.0.1:3306
-
- # mysql中配置的用于同步的canal用户
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
-
- # rabbitmq中配置的 绑定的 routingkey
- canal.mq.topic=test.routingKey
修改完成后,启动bin下的startup.bat(windows下,linux下启动startup.sh),查看logs/canal下canal.log, 如下内容说明启动成功:
- 2021-07-10 18:30:18.088 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
- 2021-07-10 18:30:18.147 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
- 2021-07-10 18:30:18.320 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
- 2021-07-10 18:30:18.899 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.104(192.168.0.104):11111]
- 2021-07-10 18:30:20.723 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
至此,环境准备完成。
新建测试数据库:canal
可以看到rabbitmq,test.queue队列中收到数据:
{
"data": null,
"database": "`canal`",
"es": 1625913686000,
"id": 6,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "CREATE DATABASE `canal` CHARACTER SET utf8",
"sqlType": null,
"table": "",
"ts": 1625913686928,
"type": "QUERY"
}
新建测试表:tb_user
- CREATE TABLE `tb_user` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `name` varchar(30) NOT NULL DEFAULT '',
- `address` varchar(200) DEFAULT NULL,
- `email` varchar(100) DEFAULT NULL,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
mq中收到数据:
{
"data": null,
"database": "canal",
"es": 1625914138000,
"id": 7,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "CREATE TABLE `tb_user` (\r\n`id` int(11) NOT NULL AUTO_INCREMENT ,\r\n`name` varchar(30) NOT NULL DEFAULT '' ,\r\n`address` varchar(200) NULL ,\r\n`email` varchar(100) NULL ,\r\nPRIMARY KEY (`id`)\r\n)",
"sqlType": null,
"table": "tb_user",
"ts": 1625914138420,
"type": "CREATE"
}
在表中插入一条数据:
INSERT INTO `canal`.`tb_user` (`id`, `name`, `address`, `email`) VALUES ('1', 'jason', '安徽合肥', 'jason@qq.com');
{
"data": [
{
"id": "1",
"name": "jason",
"address": "安徽合肥",
"email": "jason@qq.com"
}
],
"database": "canal",
"es": 1625914305000,
"id": 8,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"name": "varchar(30)",
"address": "varchar(200)",
"email": "varchar(100)"
},
"old": null,
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"address": 12,
"email": 12
},
"table": "tb_user",
"ts": 1625914305197,
"type": "INSERT"
}
通过 "type": "INSERT" 及data可以获取到新增的数据
更新 jason 的 address 信息
{
"data": [
{
"id": "1",
"name": "jason",
"address": "安徽合肥高新区",
"email": "jason@qq.com"
}
],
"database": "canal",
"es": 1625914494000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"name": "varchar(30)",
"address": "varchar(200)",
"email": "varchar(100)"
},
"old": [
{
"address": "安徽合肥"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"address": 12,
"email": 12
},
"table": "tb_user",
"ts": 1625914494755,
"type": "UPDATE"
}
通过 "type": "UPDATE",data及old中的信息 可以获取到更新的数据
{
"data": [
{
"id": "1",
"name": "jason",
"address": "安徽合肥高新区",
"email": "jason@qq.com"
}
],
"database": "canal",
"es": 1625914783000,
"id": 10,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"name": "varchar(30)",
"address": "varchar(200)",
"email": "varchar(100)"
},
"old": null,
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"address": 12,
"email": 12
},
"table": "tb_user",
"ts": 1625914783109,
"type": "DELETE"
}
通过 "type": "DELETE",data 就可以判断删除的数据
新增remark字段
{
"data": null,
"database": "canal",
"es": 1625914881000,
"id": 11,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "ALTER TABLE `tb_user`\r\nADD COLUMN `remark` varchar(100) NULL AFTER `email`",
"sqlType": null,
"table": "tb_user",
"ts": 1625914881179,
"type": "ALTER"
}
通过 "type": "ALTER" 及 sql即可知道 新增了remark列。
删除email字段
{
"data": null,
"database": "canal",
"es": 1625915072000,
"id": 12,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "ALTER TABLE `tb_user`\r\nDROP COLUMN `email`",
"sqlType": null,
"table": "tb_user",
"ts": 1625915072617,
"type": "ALTER"
}
通过 "type": "ALTER",sql即可知道删除了email列。
mysql中变更的数据都可以通过canal发送到rabbitmq,只要监听rabbitmq队列,就可以获取到变更的数据,进行业务处理,可以写入redis,刷新缓存,也可以同步到elasticsearch等等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。