赞
踩
安装mysql和redis教程跳过,如未安装,自行百度
GitHub安装地址:https://bgithub.xyz/alibaba/canal/releases/tag/canal-1.1.7
配置文件路径\canal.deployer-1.1.7\conf\example\instance.properties
修改连接MySQL数据库相关配置
value为ON代表已开启
show variables like '%log_bin%';
如果未开启,需要修改mysql的my.ini配置
添加开启bin_log日志
添加配置后,重启mysql服务
重新输入sql查看是否开启
show variables like '%log_bin%';
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT,
REPLICATION SLAVE,
REPLICATION CLIENT ON *.* TO 'canal' @'%';
GRANT ALL PRIVILEGES ON *.* TO 'canal' @'%';
FLUSH PRIVILEGES;
双击\canal.deployer-1.1.7\bin\startup.bat
如果双击出现闪退,
则需要修改startup.bat启动命令
删除PermSize=128m这段即可
截图中已经删除
添加pom依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
简单代码实现
package com.ljt; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.test.context.junit4.SpringRunner; import java.net.InetSocketAddress; import java.util.List; /** * @author ljt * @date 2024/1/23 * @description */ @SpringBootTest @RunWith(SpringRunner.class) @Slf4j public class CanalClientTest { @Autowired private RedisTemplate<String, Object> redisTemplate; @Test public void test1() { //创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1" , 11111), "example", "", ""); //创建链接 connector.connect(); int batchSize = 1000; while (true) { //订阅数据库 // connector.subscribe(""); Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 List<CanalEntry.Entry> entries = message.getEntries(); if (entries.size() <= 0) { log.info("当前未监测到数据修改!"); try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } } else { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } ByteString storeValue = entry.getStoreValue(); String tableName = entry.getHeader().getTableName(); CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(storeValue); } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } CanalEntry.EventType eventType = rowChange.getEventType(); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), tableName, eventType)); for (CanalEntry.RowData rowData : rowDatasList) { if (eventType == CanalEntry.EventType.DELETE) { redisDel(rowData.getAfterColumnsList(), tableName); } else if (eventType == CanalEntry.EventType.INSERT) { redisSet(rowData.getAfterColumnsList(), tableName); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); redisSet(rowData.getAfterColumnsList(), tableName); } } } } } } private void redisDel(List<CanalEntry.Column> columns, String tableName) { JSONObject jsonObject = new JSONObject(); for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(), column.getValue()); } if (columns.size() > 0) { redisTemplate.delete(tableName + ":" + columns.get(0).getValue()); } } private void redisSet(List<CanalEntry.Column> columns, String tableName) { JSONObject jsonObject = new JSONObject(); for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(), column.getValue()); } if (columns.size() > 0) { redisTemplate.opsForValue().set(tableName + ":" + columns.get(0).getValue(), jsonObject.toJSONString()); } } private void printColumn(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。