赞
踩
在并发场景下,MySQL和Redis之间的数据不一致性可能成为一个突出问题。这种不一致性可能由网络延迟、并发写入冲突以及异常情况处理等因素引起,导致MySQL和Redis中的数据在某些时间点不同步或出现不一致的情况。数据一致性问题的级别可以分为三种:
强一致性: 强一致性解决方案在高并发场景下实现过于苛刻,本案例暂不讨论。
弱一致性: 一致性的解决方案可以使用“先写MySQL,再删除Redis”策略,这种方案在极限条件下有不一致的可能性,但结合需求和技术实现可以综合评判。弱一致性的应用场景如:社交平台点赞功能,用户可以实时看到点赞的更新,尽管MySQL和Redis可能存在短暂的数据不一致。
最终一致性: 采用“先写MySQL,通过MySQL的Binlog特性,异步写入Redis”。这种方案一般适用于库存、金融等业务场景,但是需要建立相关失败重试、告警、补偿机制,以及容灾措施。
在本案例中,弱一致性采用 Cache Aside 方案,最终一致性采用阿里巴巴开源组件 canal 实现。
综上所述,Cache Aside方案适用于读取频率较高、对数据实时性要求不高的场景,通过合理地使用缓存来提高系统性能和扩展性,并通过维护数据的一致性来避免数据不一致的问题。
基于Cache Aside实现点赞功能。
实体类信息
public class Like {
private String postId;
private int likeCount;
// 构造函数、getter和setter方法
}
逻辑层
@Service public class LikeService { private final LikeRepository likeRepository; private final RedisUtils redisUtils; public LikeService(LikeRepository likeRepository, RedisUtils redisUtils) { this.likeRepository = likeRepository; this.redisUtils = redisUtils; } public Like getLikeInfo(String postId) { String cacheKey = "like:" + postId; // 从缓存中获取点赞信息 Like like = (Like) redisUtils.get(cacheKey); // 如果缓存中不存在,则从持久层(数据库)获取 if (like == null) { like = likeRepository.findByPostId(postId); // 如果数据库中存在数据,则保存到缓存中 if (like != null) { redisUtils.set(cacheKey, like); } } // 如果点赞信息为空,则初始化为0 if (like == null) { like = new Like(postId, 0); } return like; } public void addLike(String postId) { String cacheKey = "like:" + postId; // 在持久层(数据库)新增点赞信息 Like like = likeRepository.findByPostId(postId); if (like == null) { like = new Like(postId, 1); } else { like.setLikeCount(like.getLikeCount() + 1); } likeRepository.save(like); // 更新缓存中的数据 redisUtils.set(cacheKey, like); } }
引用canal官方说明:
canal [kə’næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
需要的开发环境:
特别说明:canal只支持JDK 8和JDK 11,如果您在本地物理机安装,请切换JDK默认版本。笔者更建议您使用Docker安装开发环境,由于canal安装后需要修改的配置较多,可以通过Docker-Compose安装。
那么,麻烦ChatGPT写一个Docker-Compose文件吧:
version: '2.4' services: mysql: image: mysql:8.0 container_name: mysql restart: false environment: MYSQL_ROOT_PASSWORD: root ports: - "33060:3306" volumes: - ./mysql-data:/var/lib/mysql canal: image: canal/canal-server:v1.1.5 container_name: canal restart: false ports: - "11111:11111" - "11112:11112" depends_on: - mysql environment: - canal.destinations=example - canal.instance.mysql.slaveId=1234 - canal.instance.master.address=mysql:3306 - canal.instance.dbUsername=root - canal.instance.dbPassword=root - canal.instance.connectionCharset=UTF-8 - canal.instance.tsdb.enable=false - canal.instance.gtidon=false - canal.instance.filter.regex=.* - canal.instance.filter.black.regex=mysql\.slave_.* redis: image: redis:latest restart: always ports: - 6379:6379 volumes: - ./redis_data:/data
将文件命名为:docker-compose.yml,开始安装。
docker-compose up -d
本案例使用balance余额表来演示,数据库表设计如下:
CREATE TABLE `balance` (
`id` varchar(50) NOT NULL COMMENT '主键',
`account` varchar(50) NOT NULL COMMENT '账户',
`amount` decimal(10,2) NOT NULL COMMENT '金额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
COMMENT='余额表';
开发环境根据您的实际需要选择即可。
环境启动后,进入编码阶段。
@Component public class BalanceRedisProcessorService implements EntryHandler<Balance>, Runnable { private final Logger logger = LoggerFactory.getLogger(BalanceRedisProcessorService.class); private final RedisUtils redisUtils; private final CanalConfig canalConfig; private final Executor executor; private final RocketMQProducer rocketMQProducer; @Value("${canal.server.open}") private boolean open; /** * 重试次数 */ private final static int MAX_RETRY_COUNT = 3; @Autowired public BalanceRedisProcessorService(RedisUtils redisUtils, CanalConfig canalConfig, @Qualifier("ownThreadPoolExecutor") Executor executor, RocketMQProducer rocketMqProducer) { this.redisUtils = redisUtils; this.canalConfig = canalConfig; this.executor = executor; this.rocketMQProducer = rocketMqProducer; } @PostConstruct public void init() { Map<String, String> mainMdcContext = Maps.newHashMap(); mainMdcContext.put("canal-thread", "balance-redis-processor-service"); MDC.setContextMap(mainMdcContext); executor.execute(this); logger.info("MySQL-Balance数据自动同步到Redis:线程已经启动"); } @Override public void run() { CanalConnector canalConnector = canalConfig.canalConnector(); canalConnector.connect(); // 回滚到未进行ack的地方 canalConnector.rollback(); try { while (open) { // 获取数据 每次获取一百条改变数据 Message message = canalConnector.getWithoutAck(100); //获取这条消息的id long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { TimeUnit.SECONDS.sleep(1); continue; } // 处理数据 for (CanalEntry.Entry entry : message.getEntries()) { if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) { continue; } CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType(); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); boolean syncRedisDataFlag = eventType == CanalEntry.EventType.UPDATE || eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.DELETE; if (!syncRedisDataFlag) { continue; } for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> columns = rowData.getAfterColumnsList(); String tableName = entry.getHeader().getTableName(); // 判断是否是 Balance 表的 amount 字段变更 if (!"balance".equals(tableName)) { return; } StringBuilder redisKey = new StringBuilder("balance:"); handleCanalChangeColumns(columns, redisKey); } } // 确认消费完成这条消息 canalConnector.ack(message.getId()); } } catch (Exception e) { logger.error("canal-数据同步异常"); //运行时异常,服务监控告警,需要开发介入排查 throw new RuntimeException(e); } finally { // 关闭连接 canalConnector.disconnect(); } } /** * 开始处理canal获取到的变更列到Redis * * @param columns 列 * @param redisKey Redis中数据存储的Key * @throws InterruptedException 异常 */ private void handleCanalChangeColumns(List<CanalEntry.Column> columns, StringBuilder redisKey) throws Exception { String changeInfo = null; for (CanalEntry.Column column : columns) { logger.info("Balance changed in 'balance' dataInfo: {}", column); if ("id".equals(column.getName())) { String changeId = column.getValue(); logger.info("当前变更id为:{}", changeId); redisKey.append(changeId); } if ("amount".equals(column.getName())) { String changeValue = column.getValue(); boolean success = false; logger.info(changeValue); for (int retryCount = 0; retryCount < MAX_RETRY_COUNT; retryCount++) { try { redisUtils.set(redisKey.toString(), changeValue); success = true; logger.info("消费成功"); return; } catch (Exception ex) { logger.error("存入Redis失败,进行重试:{}", ex.getMessage()); // 等待一段时间后进行重试 TimeUnit.SECONDS.sleep(1); } changeInfo = redisKey.append(":").append(changeValue).toString(); } //发送告警消息 if (!success) { rocketMQProducer.sendMessage("DefaultCluster", changeInfo); } } } } }
使用接口调用或者手动改库的方式,制造数据变更,查看日志打印情况:
Redis数据:
消费失败情况测试:
完成。
我已将canal实现数据同步代码开源,请自行下载领取,笔者不介意您宝贵的Star,如果能帮到您,十分荣幸。
同时,如果您对笔者其他文章感兴趣,可以扫一扫关注笔者的公众号:种颗代码技术树
公众号文章更新更及时,以及一些程序员周边相关更新。
感谢您阅读到这里,不胜感激。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。