赞
踩
消息投递:如何保证消息仅仅被消费一次
由于系统引入了消息队列,可能出现 消息在投递的过程中发生丢失,
消息需要从生产者,到消息队列,到消费者。在整个链路中都有丢失的可能。
消息的生产者一般是 业务服务器,消息队列是独立部署在单独的服务器上,两个服务之间一般通过内网进行交互,但是可能会出现网络抖动,消息因为网络错误而丢失。
发现网络超时后将消息重新发送一次,但是不可无限制的重传消息。如果不是消息队列发生故障或者到消息队列服务的网络有问题,重生 2~3 次即可。
消息重复,由于重试发送,可能会造成消息重复。
比如,在 Kafka 中,消息是存储在本地磁盘上的,但是为了减少消息存储时对磁盘的随机 I/O,一般会把消息先写入操作系统的 Page Cache中,然后再找合适的时机刷新到磁盘上。即Kafka 可以配置当达到某一时间间隔或者积累了一定的消息数量的时候再刷新,异步刷盘。
但是如果发生机器掉电或者机器异常重启,Page Cache 中还没有来得及刷盘的消息就会丢失。
根据自己的业务,判断消息的丢失容忍,确定自己使用的方式。
接收消息----》处理消息-----》更新消费进度
一定要等到消息接收和处理完成后才能更新消费进度,但是这也会造成消息重复的问题,比方说某一条消息在处理之后消费者恰好宕机了,那么因为没有更新消费进度,所以当这个消费者重启之后还会重复地消费这条消息。
在消息的生产和消费过程中都可能会产生重复,所以在这两个过程中增加消息幂等性。
Kafka 支持 producer idempotency 的特性,即生产过程的幂等性。保证消息虽然可能在生产端重复,但是在消息队列存储时只会存储一份。
它的做法是给每一个生产者一个唯一的 ID,并且为生产的每一条消息赋予一个唯一 ID,消息队列的服务端会存储 < 生产者 ID,最后一条消息 ID> 的映射。当某一个生产者产生新的消息时,消息队列服务端会比对消息 ID 是否与存储的最后一条 ID 一致,如果一致就认为是重复的消息,服务端会自动丢弃。
在通用层和业务层两个层面来进行考虑。
通用层
在消息被生产的时候使用发号器给它生成一个全局唯一的消息 ID,消息被处理之后把这个 ID 存储在数据库中,在处理下一条消息之前查库是否被消费,如果没有消费就进行消费。
// 伪代码
boolean isIDExisted = selectByID(ID); // 判断ID是否存在
if(isIDExisted) {
return; //存在则直接返回
} else {
process(message); //不存在,则处理消息
saveID(ID); //存储ID
}
业务层
使用乐观锁进行控制。
比如你的消息处理程序需要给一个人的账号加钱,那么你可以通过乐观锁的方式来解决。
update user set amount = amount + 20, version=version+1 where userId=1 and version=1;
即:我们在更新数据时给数据加了乐观锁,这样在消费第一条消息时,version 值为 1,SQL 可以执行成功,并且同时把 version 值改为了 2;在执行第二条相同的消息时,由于 version 值不再是 1,所以这条 SQL 不能执行成功,也就保证了消息的幂等性。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。