赞
踩
消息幂等性,其实就是保证一个消息不管因为什么原因都只会被消费一次,正常情况下,消费者消费完消息之后,会发送一个ack给生产者,但是中途有可能因为网络等原因导致生产者未收到确认信息,导致一条消息被重复消费。
如何避免重复消费的问题
1.使用一个唯一的id。消费之前根据id去判断消息是否被消费过,未被消费则正常使用消息。
2.使用redis只能有一个key的特性。
我们这里只介绍第一种方法,SpringAMQP中给出了这种问题的解决方案。
关于配置问题和入门使用SpringAMQP,大家可以看我之前写的文章。
在这里我们直接展示生产者和消费者代码,看一下具体如何通过代码实现。
生产者代码:
- @SpringBootTest
- class DemoMqSendApplicationTests {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- void contextLoads() {
- //声明交换机
- String exchange="simple.fanout";
- //amqp自带的配置,可以设置唯一id
- MessageProperties properties = new MessageProperties();
- //在这里我们使用uuid作为唯一id来进行幂等性判断
- properties.setMessageId(UUID.randomUUID().toString());
- properties.setContentType("text/plain");
- properties.setContentEncoding("utf-8");
- //将配置和要发送的消息保存一下
- Message message = new Message("hello".getBytes(), properties);
- //进行消息的发送
- rabbitTemplate.convertAndSend(exchange,"", message);
-
-
- }
-
- }

监听者代码:
- @Component
- @Slf4j
- public class Listener {
-
- @RabbitListener(queues = "simple.queue")
- public void listener(Message message, Channel channel) {
- //获取到发送的信息
- byte[] body = message.getBody();
- String msg = new String(body);
- //获取到对应的配置
- MessageProperties properties = message.getMessageProperties();
- //获取到唯一的业务id
- String messageId = properties.getMessageId();
- /**
- * 在这里进行业务判断,如果数据库中有这个id则不进行操作,否则进行操作,保证幂等性
- */
-
- log.info("listener: {}", msg);
- log.info("listener: {}", message.getMessageProperties().getMessageId());
-
- }
-
- }

listener: hello listenerID: 4b40e96e-2c39-4880-bea9-25d673a57a9a
根据两条日志看到唯一的id就是4b40e96e-2c39-4880-bea9-25d673a57a9a,发送的消息就是hello
拿到id之后,在消费消息前,先去数据库查询这条消息是否存在消费记录,没有就执行insert操作,如果有就代表已经被消费了,则不进行处理。
消息从生产端到消费端消费要经过3个步骤:
生产端发送消息到RabbitMQ;
RabbitMQ发送消息到消费端;
消费端消费这条消息;
在这里不对12进行详细的讲解,发生最多的也就是消费端接收消息丢失。如果想知道12具体操作,以后可以具体介绍。
- @RabbitListener(queues = "simple.queue2")
- public void listener2(Message message, Channel channel) {
- try {
- byte[] body = message.getBody();
- //拿到接收到的消息
- String msg = new String(body, StandardCharsets.UTF_8);
- //获取配置文件
- MessageProperties properties = message.getMessageProperties();
- //获取到唯一标识符
- long tag = properties.getDeliveryTag();
- //消息确认机制
- channel.basicAck(tag, false);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }

在这里的手动确认机制我们使用了channel.basicAck来实现,里面有两个参数,分别是一个long类型的唯一标识符和布尔类型的参数。第一个参数对应的就是配置里面的DeliveryTag,每条消息的标识符都是唯一的。布尔类型参数代表的是当为true时就是确认多条消息,false的话就是只确认这一条消息,一般我们都是false。
还需要再配置文件中加上simple:acknowledge-mode: manual就是开启手动确认。
- listener:
- simple:
- acknowledge-mode: manual
最后一个问题就是如何进行限流,为什么要限流?防止数据量突然特别大导致服务崩溃。
使用channel.basicQos(int prefetchSize, int prefetchCount, boolean global)来实现
参数介绍:
prefetchSize:表示消息的大小,0的话不限制大小
prefetchCount:表示消息的数量
global:true表示该通道下的所有消费者都适用这个策略,而false表示只有当前这一个消费者适用这个策略。
代码实现:
- @RabbitListener(queues = "simple.queue2")
- public void listener2(Message message, Channel channel) {
- try {
- //开启限流
- channel.basicQos(0,1,false);
- byte[] body = message.getBody();
- //拿到接收到的消息
- String msg = new String(body, StandardCharsets.UTF_8);
- //获取配置文件
- MessageProperties properties = message.getMessageProperties();
- //获取到唯一标识符
- long tag = properties.getDeliveryTag();
- //消息确认机制
- channel.basicAck(tag, false);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }

这里的限流表示不限制消息大小,但是限制消息数量,,一次只能发送一条消息。
为什么要限制流量和大小呢?
限流肯定都可以理解,突然的大流量会导致服务崩溃,如果流量不是很多,但是数据量特别大,比如一个人一天吃一个馒头,今天还是吃一个,但是有10公斤重,也会导致服务崩溃。
注意:限流和手动确认机制最好一起使用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。