赞
踩
目录
注意kafka的消息没有消息ID这个概念,只维护了offset。而RocketMQ是有消息ID的概念的。
就是消息Body重复,而不管offset是否重复(因为offset默认是不对外暴露的),消息Body里例如包含业务ID等数据。
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列RocketMQ版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。
当消息队列RocketMQ版的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息。
无论是否有【消息ID】,都应该依据【业务ID】来实现消息幂等
1、利用数据库的唯一约束实现幂等
【仅适用于创建事件的消息。如果是修改事件的消息时,订单表就已经有ID了】
比如将订单表中的订单编号设置为唯一索引,创建订单时,根据订单编号就可以保证幂等。
2、去重表
【创建事件的消息和修改事件的消息都可以支持,但是去重表是一个不小的开销】
这个方案本质也是根据数据库的唯一性约束来实现。其实现大体思路是:首先在去重表上建唯一索引,其次操作时把业务表和去重表放在同个本地事务中,如果出现重现重复消费,数据库会抛唯一约束异常,操作就会回滚
3、利用redis的原子性
【类似1方案】
每次操作都直接set到redis里面,然后将redis数据定时同步到数据库中
4、多版本(乐观锁)控制(暂略)
此方案多用于更新的场景下。其实现的大体思路是:给业务数据增加一个版本号属性,每次更新数据前,比较当前数据的版本号是否和消息中的版本一致,如果不一致则拒绝更新数据,更新数据的同时将版本号+1
5、状态机机制(暂略)
此方案多用于更新且业务场景存在多种状态流转的场景
6、token机制(主要)
【在redis里,使用业务id+业务类型等,组成联合“索引”,可以适当加key的过期时间。消费前检查redis,消费后设置redis】
生产者发送每条数据的时候,增加一个全局唯一的id,这个id通常是业务的唯一标识,比如订单编号。在消费端消费时,则验证该id是否被消费过,如果还没消费过,则进行业务处理。处理结束后,在把该id存入redis,同时设置状态为已消费。如果已经消费过了,则不进行处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。