当前位置:   article > 正文

rabbitmq的消息发布确认机制_rabbitmq手动确认机制

rabbitmq手动确认机制

1. 消息可靠性投递
前言
在代码里面一定是先操作数据库再发送消息。避免因为数据库回滚导致的数据不一致。但是如果先操作数据,后发送消息,发送消息出了问题,那不是一样会出现业务数据的不一致?

这篇文章我们来分析 RabbitMQ 的可靠性投递,也就是在使用 RabbitMQ 实现异步通信的时候,消息丢了怎么办,消息重复消费怎么办?在 RabbitMQ 里面提供了很多保证消息可靠投递的机制,这个也是 RabbitMQ 的一个特性。

在学习RabbitMQ前,必须要明确一个问题,因为效率与可靠性是无法兼得的,如果要保证每一个环节都成功,势必会对消息的收发效率造成影响。所以如果是一些业务实时一致性要求不是特别高的场合,可以牺牲一些可靠性来换取效率。比如发送通知或者记录日志的这种场景,如果用户没有收到通知,不会造成业务影响,只要再次发送就可以了。

在我们使用 RabbitMQ 收发消息的时候,有几个主要环节:

在这里插入图片描述
1 代表消息从生产者发送到 Broker。
生产者把消息发到 Broker 之后,怎么知道自己的消息有没有被 Broker 成功接收?
2 代表消息从 Exchange 路由到 Queue
Exchange 是一个绑定列表,如果消息没有办法路由到正确的队列,会发生什么事情?应该怎么处理?
3 代表消息在 Queue 中存储
队列是一个独立运行的服务,有自己的数据库(Mnesia),它是真正用来存储消息的。如果还没有消费者来消费,那么消息要一直存储在队列里面。如果队列出了问题,消息肯定会丢失。怎么保证消息在队列稳定地存储呢?
4 代表消费者订阅 Queue 并消费消息
队列的特性是什么?FIFO。队列里面的消息是一条一条的投递的,也就是说,只有上一条消息被消费者接收以后,才能把这一条消息从数据库删掉,继续投递下一条消息。那么问题来了,Broker 怎么知道消费者已经接收了消息呢?
 

下面我们通过收到ack的方式保证消息的可靠性

1.我们进行配置开启手动确认机制

  1. server.port=9994
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=guest
  4. spring.rabbitmq.password=guest
  5. # 发送方
  6. # 开启发送确认(未到达MQ服务器)
  7. spring.rabbitmq.publisher-confirms=true
  8. # 开启发送失败退回(未找到对应queue)
  9. spring.rabbitmq.publisher-returns=true
  10. #启动时自动启动容器
  11. spring.rabbitmq.listener.auto-startup=true
  12. # 消费方 开启手动ACK
  13. spring.rabbitmq.listener.direct.acknowledge-mode=manual
  14. spring.rabbitmq.listener.simple.acknowledge-mode=manual

2.创建交换机,队列和路由 

  1. @Slf4j
  2. @Configuration
  3. public class Config1 implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. //后置处理器,先将其他注解都加载完在加载此方法,不然会造成空指针
  7. @PostConstruct
  8. public void init(){
  9. rabbitTemplate.setConfirmCallback(this);
  10. rabbitTemplate.setReturnCallback(this);
  11. }
  12. //交换机是否收到消息的确认机制
  13. @Override
  14. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  15. if (ack){
  16. log.info("交换机收到生产者发送的消息");
  17. }else {
  18. log.error("交换机没有收到生产者发送的消息");
  19. }
  20. }
  21. //队列未收到消息会进行消息回调
  22. @Override
  23. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  24. String mes=new String(message.getBody());
  25. log.error("队列未收到消息:{}",mes);
  26. }
  27. //交换机
  28. public static final String CONFIRM_EXCHANGE="confirm.exchange";
  29. //队列
  30. public static final String CONFIRM_QUEUE="confirm.queue";
  31. public static final String CONFIRM_ROUTING_KEY="confirm.routing";
  32. @Bean
  33. public DirectExchange confirmExchange(){
  34. return new DirectExchange(CONFIRM_EXCHANGE,true,false);
  35. }
  36. @Bean
  37. public Queue confirmQueue(){
  38. return QueueBuilder.durable(CONFIRM_QUEUE).build();
  39. }
  40. @Bean
  41. public Binding confirmBinding(){
  42. return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(CONFIRM_ROUTING_KEY);
  43. }
  44. }

3.生产者发送消息

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("/confirm")
  4. public class ConfirmController {
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. @RequestMapping(value = "/send/{message}",method = RequestMethod.GET)
  8. public void sendMes(@PathVariable("message") String message){
  9. rabbitTemplate.convertAndSend(Config1.CONFIRM_EXCHANGE,Config1.CONFIRM_ROUTING_KEY,message);
  10. log.info("生产者发送的消息为:{}",message);
  11. }
  12. }

4.消费者监听消息(此时还未手动ack)

  1. @Slf4j
  2. @Component
  3. public class ConfirmConsumer {
  4. @RabbitListener(queues = Config1.CONFIRM_QUEUE)
  5. public void getMes(Message message, Channel channel){
  6. String mes=new String(message.getBody());
  7. log.info("消费者收到的消息为:{}",mes);
  8. }
  9. }

5.我们首先测试第一次发消息成功的情况如何
http://localhost:9994/confirm/send/第一次


 

 如上图所示,当我们消息发送成功的时候,交换机会触发回调函数.

5.1 接下来我们在发送消息的时候将交换机和路由key依次改错来验证一次啊回调机制

此时交换机是改成了错误的,我们重启继续发消息试验

 

这个时候就看到产生了报错信息404,表示没有这个交换机,同时交换机的回调机制响应了

5. 2  接下来把队列改错

看测试结果

 

此时根据结果我们可以看出消息是发送到了交换机,然后由交换机将消息路由到队列的时候发生了错误.因为我们将路由key改错了.

6.回退消息

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息
果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的 。那么如何
让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory
数可以在当消息传递过程中不可达目的地时将消息返回给生产者。 如下图

接下来我们修改消费者代码

如上图所示,我打了断点,因为不超过一百,所以会一直Nack,然后因为我们的参数设置了true,那么消息就会一直返回队列,一直重新发送,这样子保证消费者消息不会丢失

但是鱼与熊掌不可兼得,需要性能的话就要牺牲稳定性,不能保证消息不丢失.

需要稳定性的话,就要牺牲西能,让未被消费的消息一直发送,直到消费成功 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/385796?site
推荐阅读
相关标签
  

闽ICP备14008679号