赞
踩
文件位置在mysql的安装目录下,博主的目录如下:D:\Environments\Mysql\mysql-8.0.34-winx64
打开my.ini文件,在**[mysqld]**下添加:
log-bin=mysql-bin
binlog-format=ROW
server_id=1
首先以管理员的身份打开命令行工具,WIN+R后,Ctrl+Shift+Enter即可
关闭mysql服务
net stop mysql80
开启mysql服务
net start mysql80
在mysql数据库中原生的mysql数据表中新建查询,输入以下命令:
drop user canal;
CREATE USER canal IDENTIFIED WITH mysql_native_password BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
# 进入conf/example目录
cd conf/example
# 修改配置文件
vi instance.properties
修改如下:
第二个红框中个的ip地址即mysql所在的ip,博主的mysql是在windows上的
在windows查看本机ip地址的方式是:WIN+R cmd后输入:
ipconfig
# 进入bin目录
cd bin
# 启动脚本
sh startup.sh
@Component public class RedisCanalClientExample { public static final Integer _60SECONDS = 60; public static final String REDIS_IP_ADDR = "192.168.81.128"; private static void redisInsert(List<CanalEntry.Column> columns) { JSONObject jsonObject = new JSONObject(); for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(),column.getValue()); } if(columns.size() > 0) { try(Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0).getValue(),jsonObject.toJSONString()); }catch (Exception e){ e.printStackTrace(); } } } private static void redisDelete(List<CanalEntry.Column> columns) { JSONObject jsonObject = new JSONObject(); for (CanalEntry.Column column : columns) { jsonObject.put(column.getName(),column.getValue()); } if(columns.size() > 0) { try(Jedis jedis = RedisUtils.getJedis()) { jedis.del(columns.get(0).getValue()); }catch (Exception e){ e.printStackTrace(); } } } private static void redisUpdate(List<CanalEntry.Column> columns) { JSONObject jsonObject = new JSONObject(); for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(),column.getValue()); } if(columns.size() > 0) { try(Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0).getValue(),jsonObject.toJSONString()); System.out.println("---------update after: "+jedis.get(columns.get(0).getValue())); }catch (Exception e){ e.printStackTrace(); } } } public static void printEntry(List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { //获取变更的row数据 rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e); } //获取变动类型 CanalEntry.EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.INSERT) { redisInsert(rowData.getAfterColumnsList()); } else if (eventType == CanalEntry.EventType.DELETE) { redisDelete(rowData.getBeforeColumnsList()); } else {//EventType.UPDATE redisUpdate(rowData.getAfterColumnsList()); } } } } public static void main(String[] args) { System.out.println("--------------init()--------------"); // 创建链接canal服务器 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR, 11111), "example", "", ""); int batchSize = 1000; //空闲空转计数器 int emptyCount = 0; System.out.println("--------------canal init ok, 开始监听mysql变化--------------"); try { connector.connect(); connector.subscribe("redis_learn.t_user"); connector.rollback(); int totalEmptyCount = 10 * _60SECONDS; while (emptyCount < totalEmptyCount) { System.out.println("我是canal,每秒一次正在监听:" + UUID.randomUUID().toString()); //获取指定数量的数据 Message mes = connector.getWithoutAck(batchSize); long batchId = mes.getId(); int size = mes.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } else { //计数器重新置零 emptyCount = 0; printEntry(mes.getEntries()); } //提交确认 connector.ack(batchId); } System.out.println("已经监听了" + totalEmptyCount + "秒,无任何消息,请重启重试......"); } finally { connector.disconnect(); } } }
修改mysql指定的表的数据,如下图所示:
控制台输出监控日志,运行结果如下:
redis中的数据,如下图所示:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。