赞
踩
canal主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费。
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
可以理解为一个增量数据同步工具
基于日志增量订阅和消费的业务包括
mysql主从复制
canal工作原理:
Binlog日志记录了所有DDL和DML语句,以事件的形式记录,同时还会记录所花费的时间,是mysql日志中最重要的日志。开启Binlog日志后对性能的损耗可以忽略不计,而且Binlog日志是事务安全型。
Binlog日志有两个重要的使用场景:
数据同步以及数据恢复
MySQL的binlog的格式有三种,分别是STATEMENT、MIXED、ROW。
statement [ 语句级 ] | row [ 行级 ] | mixed [ 综合语句级和行级 ] |
---|---|---|
相对row模式节省空间,但是可能产生不一致性 | binlog会记录每次操作后每行记录的变化。 | statement的升级版,一定程度上解决了因一些情况而造成的statement模式不一致问题 |
节省空间 | 保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。 | 节省空间,同时兼顾了一定的一致性。 |
有可能造成数据不一致(语句执行时间为异步,如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同) | 占用较大空间。 | 还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。 |
开启binlog日志需要修改my.cnf文件。
需配置以下内容:
[mysqld]
server-id=1 #id(只要不和下文中Canal的id相同即可)
log-bin=mysql-bin
binlog_format=row #以行级开启日志
binlog-do-db=******** #需要监听的数据库
docker pull canal/canal-server
#以11111端口启动(如果是阿里云ECS服务器,记得在安全组对11111端口进行放行。)
docker run -d -p 11111:11111 --name canal canal/canal-server
注意canal.properties文件中的端口号(文件在docker容器的/home/admin/canal-server/conf目录下)。
接下来修改instance.properties文件(文件在docker容器中/home/admin/canal-server/conf/example目录下)。
1、pom文件引入
<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<!--与自己的canal版本一致-->
<version>1.1.3</version>
</dependency>
2、项目配置canal
canal-monitor-mysql:
hostname: 部署canal的服务器地址
port: 11111
3、代码集成
@Slf4j @Component public class CanalCommandLineRunner implements CommandLineRunner { @Override public void run(String... args) throws Exception { //在canal部署的conf/canal.properties ip和端口信息 CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("你的canal地址", 你的canal端口号), "example", "", ""); try { //打开连接 connector.connect(); //订阅数据库表,全部表q connector.subscribe(".*\\..*"); //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿 connector.rollback(); while (true) { // 获取指定数量的数据 Message message = connector.getWithoutAck(1); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId > 0 && size != 0) { handleDataChange(message.getEntries()); } // 提交确认 connector.ack(batchId); } } catch (Exception e) { e.printStackTrace(); } finally { connector.disconnect(); //防止频繁访问数据库链接: 线程睡眠 10秒 try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } private void handleDataChange(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { // 只解析mysql事务的操作,其他的不解析 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } //RowChange对象,包含了一行数据变化的所有特征 CanalEntry.RowChange rowChange; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry, e); } CanalEntry.EventType eventType = rowChange.getEventType(); // 获取当前操作所属的数据库 String dbName = entry.getHeader().getSchemaName(); // 获取当前操作所属的表 String tableName = entry.getHeader().getTableName(); // 事务提交时间 long timestamp = entry.getHeader().getExecuteTime(); log.info("Canal监测到更新:【{}】库的【{}】表", dbName, tableName); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, eventType); log.info("-------------------------------------------------------------"); } } } /** * 解析具体一条Binlog消息的数据 * @param tableName 当前操作所属表名称 * @param eventType 当前操作类型(新增、修改、删除) */ private static void dataDetails(List<CanalEntry.Column> beforeColumns, List<CanalEntry.Column> afterColumns, String tableName, CanalEntry.EventType eventType) { JSONObject beforeData = new JSONObject(); for (CanalEntry.Column column : beforeColumns) { beforeData.put(column.getName(), column.getValue()); } JSONObject afterData = new JSONObject(); for (CanalEntry.Column column : afterColumns) { afterData.put(column.getName(), column.getValue()); } System.out.println("tableName = " + tableName + ",eventType = " + eventType + ",beforeData = " + beforeData + ",afterData = " + afterData); SecurityUser securityUser = JSONObject.toJavaObject(afterData, SecurityUser.class); System.out.println("securityUser = " + securityUser); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。