赞
踩
拉取镜像,默认拉取latest,可以选择对应版本,这里使用v1.1.5版本演示
docker pull canal/canal-server:v1.1.5
查看镜像是否拉取成功
docker images
宿主机新增配置文件、日志文件目录,后面会将canal.properties及instance.properties配置出来
mkdir /home/canal/conf
mkdir /home/canal/logs
mkdir /home/canal/logs/canal
mkdir /home/canal/logs/example
拉取完成后,先启动下canal,主要是为了从里面copy出配置文件
#启动镜像
docker run -d --name canal canal/canal-server:v1.1.5
#进入容器 查看配置文件路径
docker exec -it canal bash
#找到文件位置后 exit退出容器 将容器内部文件copy到上面新建的目录中
docker cp canal:/home/admin/canal-server/conf/canal.properties /home/canal/conf
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /home/canal/conf/example/
容器内配置文件位置
修改instance.properties:
第一个红框是你需要监听数据库的地址和端口;
第二个红框是你数据库的用户和密码,这个用户信息一定是要有全部权限的用户,非root用户;
第三个是匹配数据表的规则,我这里默认为全部表
修改完成后,将之前的canal容器关闭并删除后,重新起一个新的容器
#关闭容器
docker stop canal
#移除容器
docker rm canal
#启动新容器,并挂载上面配置的相关目录
docker run --name canal -p 11111:11111 -d
-v /home/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
-v /home/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
-v /home/canal/logs/:/home/admin/canal-server/logs/ canal/canal-server:v1.1.5
查看是否启动成功
查看example中是否存在异常
若出现类似异常show master status
的异常,说明当前配置的canal账号存在数据库权限问题,可在sql执行工具中查看用户权限:
其中:执行上述SQL(show master status
)需要有SUPER或REPLICATION CLIENT权限
查找mysql配置文件
find / -name mysqld.cnf
进入容器编辑
vi /etc/mysql/mysql.conf.d/mysqld.cnf
修改 MySQL 配置文件 my.cnf,开启 binlog 写入功能,并配置模式为 ROW
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=66 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
重启数据库,查看配置是否生效
mysql> show variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set (0.19 sec) mysql> mysql> show variables like 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.00 sec) mysql> mysql> show master status; +------------------+----------+--------------+------------------+-------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | +------------------+----------+--------------+------------------+-------------------+ | mysql-bin.000003 | 4230 | | | | +------------------+----------+--------------+------------------+-------------------+ 1 row in set (0.00 sec)
创建用户并授权
mysql> CREATE USER canal IDENTIFIED BY 'canal';
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%%';
mysql> FLUSH PRIVILEGES;
mysql> show grants for 'canal'@'%%';
+----------------------------------------------------------------------------+
| Grants for canal@%% |
+----------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `canal`@`%%` |
+----------------------------------------------------------------------------+
1 row in set (0.00 sec)
package com.rn.test.demo01.cannal; import com.alibaba.fastjson.JSON; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import com.rn.test.demo01.config.CanalConfig; import com.rn.test.demo01.vo.CanalVo; import io.micrometer.common.util.StringUtils; import jakarta.annotation.Resource; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @Component public class CanalClient implements InitializingBean { @Resource CanalConfig canalConfig; public static void main(String[] args) { // 地址、端口、example默认就好(就是instance.properties文件所在文件夹名称) CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.116.128", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); // connector.subscribe("management\t_stop_heating,t_recover_heating"); // 监听的库表Perl正则表达式(我这里是test库下所有表) connector.subscribe("management\\..*"); // 监听的库表Perl正则表达式 connector.rollback(); try { while (true) { //尝试从master那边拉去数据batchSize条记录,有多少取多少 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; dataHandle(message.getEntries()); //数据处理 } connector.ack(batchId); } } catch (Exception e) { e.printStackTrace(); } // catch (InvalidProtocolBufferException e) { // connector.connect(); // e.printStackTrace(); // } } finally { connector.disconnect(); } } /** * 数据处理 * * @param entrys */ private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException { for (CanalEntry.Entry entry : entrys) { if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType(); if (eventType == CanalEntry.EventType.DELETE) { saveDeleteSql(entry); } else if (eventType == CanalEntry.EventType.UPDATE) { saveUpdateSql(entry); } else if (eventType == CanalEntry.EventType.INSERT) { saveInsertSql(entry); } } } } /** * 保存更新语句 * * @param entry */ private static void saveUpdateSql(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList(); Map<String, String> data = new HashMap<>(); newColumnList.stream().forEach(item -> { if (StringUtils.isNotEmpty(item.getValue())) { data.put(lineToHump(item.getName()), item.getValue()); } }); List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList(); CanalVo update = new CanalVo(entry.getHeader().getTableName(), "UPDATE", data, oldColumnList.get(0).getValue()); System.out.println("更新返回 : " + JSON.toJSONString(update)); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存删除语句 * * @param entry */ private static void saveDeleteSql(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList(); CanalVo delete = new CanalVo(entry.getHeader().getTableName(), "DELETE", null, oldColumnList.get(0).getValue()); System.out.println("删除返回 : " + JSON.toJSONString(delete)); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存插入语句 * * @param entry */ private static void saveInsertSql(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> columnList = rowData.getAfterColumnsList(); Map<String, String> data = new HashMap<>(); columnList.stream().forEach(item -> { if (StringUtils.isNotEmpty(item.getValue())) { data.put(lineToHump(item.getName()), item.getValue()); } }); CanalVo insert = new CanalVo(entry.getHeader().getTableName(), "INSERT", data, null); System.out.println("插入返回 : " + JSON.toJSONString(insert)); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } public static String lineToHump(String str) { str = str.toLowerCase(); Matcher matcher = Pattern.compile("_(\\w)").matcher(str); StringBuffer sb = new StringBuffer(); while (matcher.find()) { matcher.appendReplacement(sb, matcher.group(1).toUpperCase()); } matcher.appendTail(sb); return sb.toString(); } @Override public void afterPropertiesSet() throws Exception { // 地址、端口、example默认就好(就是instance.properties文件所在文件夹名称) CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.url, canalConfig.port), canalConfig.destination, "", ""); try { connector.connect(); connector.subscribe("management\\management.t_stop_heating,management.t_recover_heating"); // 监听的库表Perl正则表达式(我这里是test库下所有表) connector.rollback(); try { while (true) { //尝试从master那边拉去数据batchSize条记录,有多少取多少 Message message = connector.getWithoutAck(canalConfig.batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { Thread.sleep(1000); //没有数据则等待1s } else { dataHandle(message.getEntries()); //数据处理 } connector.ack(batchId); } } catch (InterruptedException e) { e.printStackTrace(); } catch (InvalidProtocolBufferException e) { connector.connect(); e.printStackTrace(); } } finally { connector.disconnect(); } } }
如果存在无法同步,记得看example目录下的日志进行分析
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。