当前位置:   article > 正文

RabbitMQ如何保证消息不丢失_rabbitmq怎么保障消息不丢失

rabbitmq怎么保障消息不丢失

目录

前言

消息消费的步骤

消息丢失的原因

确保消息不丢失的方案

生产者可靠性投递

消息持久化

消费者可靠性投递 

SpringBoot 提供的消息重试


前言

MQTT协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once:至多一次。消息在传递时,最多会被送达一次。也就是说,没什么消息可靠性保证,允许丢消息。
  • At least once:至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级 这个服务质量标准不仅适用于MQTT,对所有的消息队列都是适用的。现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括RocketMQ、RabbitMQ和Kafka都是这样。也就是说,消息队列很难保证消息不重复。

At least once+幂等消费=Exactly once

消息消费的步骤:
  1. 生产端发送消息到RabbitMQ;
  2. RabbitMQ发送消息到消费端;
  3. 消费端消费这条消息;
消息丢失的原因
  1. 生产者没有成功发送到MQ;
  2. 消息发送到MQ后,MQ宕机导致内存的消息数据丢失;
  3. 消费者获取到消息,但是消费者还没来得及处理就宕机了,但此时MQ中消息已经删除,消费者重启后不能再消费之前的消息;

确保消息不丢失的方案
  1. 生产者发送MQ后,MQ给生产者确认收到;
  2. MQ收到消息后进行消息持久化;
  3. 消费者收到消息,处理完毕后,手动进行ack确认;
  4. MQ收到消费者ack确认后,删除持久化的消息;

 生产者可靠性投递

1.事务消息机制:

  1. // 设置channel开启事务
  2. rabbitTemplate.setChannelTransacted(true);
  3. @Bean
  4. public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory)
  5. {
  6. return new RabbitTransactionManager(connectionFactory);
  7. }
  8. @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
  9. public void publishMessage(String message) throws Exception {
  10. rabbitTemplate.setMandatory(true);
  11. rabbitTemplate.convertAndSend("javatrip",message);
  12. }

事务消息机制由于会严重降低性能,所以一般不采用这种方法,因为这是同步操作,一条消息发送之后会使发送端阻塞,以等待RabbitMQ-Server的回应,之后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大降低。

2.confirm消息确认机制(推荐)

顾名思义,就是生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息(ack)给生产端,让生产端知道我已经收到消息了,如果rabbitmq没能处理该消息,会发送一个Nack给生产者,生产端可以重新发送消息。

# 开启发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败回退
spring.rabbitmq.publisher-returns=true         

  1. @Configuration
  2. @Slf4j
  3. public class RabbitMQConfig {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @PostConstruct
  7. public void enableConfirmCallback() {
  8. //confirm 监听,当消息成功发到交换机 ack = true,没有发送到交换机 ack = false
  9. //correlationData 可在发送时指定消息唯一 id
  10. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  11. if(!ack){
  12. //记录日志、发送邮件通知、落库定时任务扫描重发
  13. }
  14. });
  15. //当消息成功发送到交换机没有路由到队列触发此监听
  16. rabbitTemplate.setReturnsCallback(returned -> {
  17. //记录日志、发送邮件通知、落库定时任务扫描重发
  18. });
  19. }
  20. }

实际在这两个监听里面去做重发并不是很多,因为成本太高了,首先 RabbitMQ 本身丢失的可能性就非常低,其次如果这里需要落库再用定时任务扫描重发还要开发一堆代码,分布式定时任务…再其次定时任务扫描肯定会增加消息延迟,不是很有必要。真实业务场景是记录一下日志就行了,方便问题回溯,顺便发个邮件给相关人员,如果真的极其罕见的是生产者弄丢消息,那么开发往数据库补数据就行了。

消息持久化

RabbitMQ收到消息后将这个消息暂时存在了内存中,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到硬盘中,需要给exchange、queue和message都进行持久化。

这个持久化配置可以和confirm机制配合使用,可以在消息持久化磁盘后,再给生产者发送一个ACK信号,这样如果消息持久化磁盘之前,rabbitmq阵亡了,那么生产者收不到ack信号,生产者会自动重发。 这样即使mq挂了,重启后也能恢复数据。

  1. @Bean
  2. public Queue TestQueue() {
  3. // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
  4. // exclusive:该队列是否只供一个消费者进行消费是否进行消息共享,true可以多个消费者消费,false:只能-一个消费者消费
  5. // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
  6. return new Queue("test",true,true,false);
  7. }

在Spring Boot中消息默认就是持久化的。

消费者可靠性投递 

RabbitMQ的自动ack机制工作流程如下:

  • 1. 消费者接收到消息并开始处理。
  • 2. 一旦消息被成功处理且没有发生异常,RabbitMQ会立即将消息从队列中删除。
  • 3. 如果消费者在处理消息期间发生了异常,消息将会被重新放回队列中以便后续重新处理。

这种机制确保了当消费者成功处理消息时,消息会被立即删除,从而避免重复消费。 

 ACK机制改为手动

RabbitMQ的手动ack机制工作流程如下:
1. 消费者接收到消息并开始处理。
2. 消费者在处理完消息后,显式发送acknowledgement(ACK)给RabbitMQ来确认消息已经被处理。
3. 一旦RabbitMQ接收到ACK,它会将消息从队列中删除。
4. 如果消费者在处理消息期间发生了异常,可以选择不发送ACK,这样消息会被重新放回队列中以便后续重新处理。

这种机制允许消费者控制何时确认消息的处理完成,从而确保消息在被处理后才会被删除。

  1. #开启手动ACK,消费消息的时候,就必须发送ack确认,不然消息永远还在队列中
  2. spring.rabbitmq.listener.simple.acknowledge-mode=manual

在这里插入图片描述
在这里插入图片描述
这里要小心! basicNack 方法的第三个参数代表是否重回队列,通常代码的报错并不会因为重试就能解决,所以可能这种情况:继续被消费,继续报错,重回队列,继续被消费…死循环。
一定要有重发消息次数的限制,或者干脆不入队,发送到Redis进行下记录也行。

SpringBoot 提供的消息重试

SpringBoot 给我们提供了一种重试机制,当消费者执行的业务方法报错时会重试执行消费者业务方法。

启用 SpringBoot 提供的重试机制

  1. spring.rabbitmq.listener.simple.retry.enabled=true
  2. # 重试次数
  3. spring.rabbitmq.listener.simple.max-attempts=3
  4. # 重试时间间隔
  5. spring.rabbitmq.listener.simpleinitial-interval: 3000

消费者代码:

  1. @RabbitListener(queues = "queue")
  2. public void listen(String object, Message message, Channel channel) throws IOException {
  3. try {
  4. /**
  5. * 执行业务代码...
  6. * */
  7. int i = 1 / 0; //故意报错测试
  8. } catch (Exception e) {
  9. log.error("签收失败", e);
  10. /**
  11. * 记录日志、发送邮件、保存消息到数据库,落库之前判断如果消息已经落库就不保存
  12. * */
  13. throw new RuntimeException("消息消费失败");
  14. }
  15. }

注意一定要手动 throw 一个异常,因为 SpringBoot 触发重试是根据方法中发生未捕捉的异常来决定的。值得注意的是这个重试是 SpringBoot 提供的,重新执行消费者方法,而不是让 RabbitMQ 重新推送消息。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/694382
推荐阅读
相关标签
  

闽ICP备14008679号