赞
踩
主要是用于MySQL数据库增量日志数据的订阅,消费和解析(由阿里开源的Java项目),canal是通过伪装成MySQL的slave节点来转储master节点的binlog日志的一个中间件,他拿到日志内容以后,就可以把日志的相关数据变更重放到任何地方,可以是其他的MySQL,也可以是消息队列,redis甚至是文件中.
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.TimeUnit; public class RedisCanalClientExample { public static final int _60SECONDS = 60; public static void main(String[] args) { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress( "127.0.0.1", 1111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; System.out.println("---------程序启动,开始监听MySQL的变化: "); try { connector.connect(); //这个就是你要订阅的变化的那个库表 connector.subscribe("db_test.t_user"); connector.rollback(); int totalEmptyCount = 10 * _60SECONDS; while (emptyCount < totalEmptyCount) { //获取指定数量的数据 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } else { emptyCount = 0; printEntry(message.getEntries()); System.out.println(); } //提交确认 connector.ack(batchId); //处理失败,回滚数据 //connector.rollback(batchId); } System.out.println("empty too many times,exit"); } finally { connector.disconnect(); } } private static void printEntry(List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } CanalEntry.EventType eventType = rowChange.getEventType(); System.out.printf("==========binlog[%s:%s],name[%s,%s],eventType : %s%n", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (eventType == CanalEntry.EventType.INSERT) { redisInsert(rowData.getAfterColumnsList()); } else if (eventType == CanalEntry.EventType.UPDATE) { redisUpdate(rowData.getAfterColumnsList()); } else { redisDelete(rowData.getAfterColumnsList()); } } } } private static void redisInsert(List<CanalEntry.Column> columns) { //实现省略,往redis插入数据 } private static void redisUpdate(List<CanalEntry.Column> columns) { //实现省略,往redis修改数据 } private static void redisDelete(List<CanalEntry.Column> columns) { //实现省略,往redis删除数据 } }
让客户稍作等待,然后趁机更新mysql和redis(特别重要级别的数据最好不要多线程)
给缓存设置过期时间,是保证最终一致性的解决方案.所有的写操作以数据库为准,对缓存操作只是尽最大的努力即可.也就是说如果数据库写入成功,缓存更新失败,那么只要到达过期时间.后面的请求自然会从数据库中读取新数据然后回填缓存,达到一致性.切记以mysql的数据库写入为准.
在高并发的情境下,这个操作是跨两个不同的系统的,就一定会可能发生数据不一致的问题,导致读到脏数据(比如某方更新失败了)
容易出现的异常问题:A线程删除了缓存,去更新mysql. B线程过来又要读取,A还在更新中,这时候有可能发生
这种方案尽量不要用
还是会出现短时间的数据不一致(可能会从缓存中读取到旧数据)
canal就是类似的思想
先删除Redis的缓存,在更新完数据库之后,再删除一次Redis的缓存(延迟删除),这时候能保证数据的最终一致性.
分布式系统只有最终一致性,很难去做到强一致性
把Redis作为只读缓存的话还好,没有一致性的问题,但是如果把Redis作为读写缓存来用.建议使用先更新数据库,再删除缓存的方案.理由如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。