赞
踩
canal是阿里巴巴的来源项目。我们可以通过配置binlog实现数据库监控,得到数据库表或者数据的更新信息。
参考我的文档前先去官网看下,可能已经支持更高版本的MySQL了
https://github.com/alibaba/canal
https://github.com/alibaba/canal/releases
ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)
所以需要安装mysql5.7以下的版本稳妥些
修改对应位置:
添加:
- #添加这一行就ok
- log-bin=mysql-bin
- #选择row模式
- binlog-format=ROW
- #配置mysql replaction需要定义,不能和canal的slaveId重复
- server_id=1
- character-set-server=utf8
- collation-server=utf8_general_ci
添加:
- [mysql]
- default-character-set = utf8
- [mysql.server]
- default-character-set = utf8
- [mysqld_safe]
- default-character-set = utf8
- [client]
- default-character-set = utf8
- .6\bin>sc delete mysql
- \bin>net stop mysql
- \bin>mysqld --install mysql --defaults-file="C:\Program Files\MySQL\MySQL Server 5.6\my.ini"
show variables like'log_%';
On:表示已开启
官网是%,%是对所有非本地主机授权,不包括localhost。由于我们是在windows本机上做,所以需要配置为localhost.
这两个新添的配置可以注解调,还不太明白具体的用处,可能是testdb 的 账号密码
- canal.instance.tsdb.dbUsername=canal
- canal.instance.tsdb.dbPassword=canal
注:这里的slaveId=1234不能和my.ini的一样
如果没有报错那就是启动成功了
Pom或者gradle。主要依赖:
- compile group: 'org.jetbrains', name: 'annotations', version: '13.0'
- compile group: 'com.alibaba.otter', name: 'canal.client', version: '1.0.25'
- package com.shao.demo.canalclient;
- import java.net.InetSocketAddress;
- import java.util.List;
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.common.utils.AddressUtils;
- import com.alibaba.otter.canal.protocol.Message;
- import com.alibaba.otter.canal.protocol.CanalEntry.Column;
- import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
- import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
- import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
- import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
- import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
- import com.alibaba.otter.canal.client.*;
- import org.jetbrains.annotations.NotNull;
- /**
- * @author zhiqi.shao
- * @Date 2018/6/4 18:29
- */
- public class ClientSample {
-
- public static void main(String args[]) {
- // 创建链接
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
- 11111), "example", "", "");
- int batchSize = 1000;
- int emptyCount = 0;
- try {
- connector.connect();
- connector.subscribe(".*\\..*");
- connector.rollback();
- int totalEmtryCount = 1200;
- while (emptyCount < totalEmtryCount) {
- Message message = connector.getWithoutAck(batchSize);
- // 获取指定数量的数据
- long batchId = message.getId();
- int size = message.getEntries().size();
- if (batchId == -1 || size == 0) {
- emptyCount++;
- System.out.println("empty count : " + emptyCount);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } else {
- emptyCount = 0;
- // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
- printEntry(message.getEntries());
- }
- connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 }
- System.out.println("empty too many times, exit");
- } finally {
- connector.disconnect();
- }
- }
-
- private static void printEntry(@NotNull List<Entry> entrys) {
- for (Entry entry : entrys) {
- if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
- continue;
- }
-
- RowChange rowChage = null;
- try {
- rowChage = RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
- e);
- }
-
- EventType eventType = rowChage.getEventType();
- System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- eventType));
-
- for (RowData rowData : rowChage.getRowDatasList()) {
- if (eventType == EventType.DELETE) {
- printColumn(rowData.getBeforeColumnsList());
- } else if (eventType == EventType.INSERT) {
- printColumn(rowData.getAfterColumnsList());
- } else {
- System.out.println("-------> before");
- printColumn(rowData.getBeforeColumnsList());
- System.out.println("-------> after");
- printColumn(rowData.getAfterColumnsList());
- }
- }
- }
- }
-
- private static void printColumn(@NotNull List<Column> columns) {
- for (Column column : columns) {
- System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
- }
- }
- }

- use canal_test;
- CREATE TABLE user (
- uid INT(4) PRIMARY KEY NOT NULL AUTO_INCREMENT,
- name VARCHAR(10) NOT NULL
- );
-
- insert into user (name) values('shaoshao');
rowData.getAfterColumnsList()
canalConnector.subscribe("canal_test\..");//客户端只消费canal_test库的数据变化
subscribe(filter)方法;有的话,filter需要和instance.properties的canal.instance.filter.regex一致,否则subscribe的filter会覆盖instance的配置,如果subscribe的filter是.\..*,那么相当于你消费了所有的更新数据。
canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。
原理相对比较简单:
当mysql变动时,两个client都能获取到变动
Standby:备库
这里总结了一下Canal的一些点,仅供参考:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。