当前位置:   article > 正文

RabbitMQ消息一致性及失败重试_rabbitmq失败消息处理策略

rabbitmq失败消息处理策略

RabbitMQ消息的一致性保证,通常从MQ配置、生产者和消费者3个角度:

  1. 生产者:采用confirm消息确认机制及return监听回调机制,确保消息发送成功

  2. MQ:将交换机、队列及消息设置持久化,保证rabbitmq宕机后消息不丢失

  3. 消费者:手动确认接收消息方式,消息处理失败拒收或重回队列

生产者开启消息确认及return监听回调配置

  1. spring:
  2. rabbitmq:
  3. host: ip
  4. port: 5672
  5. username: guest
  6. password: guest
  7. ##消息发送确认回调
  8. publisher-confirms: true
  9. ##采用confirm以及return机制发送返回监听回调
  10. publisher-confirm-type: correlated
  11. ##Return机制确保消息从交换机发送到指定的队列
  12. publisher-returns: true

MQ配置

  1. /**
  2. * RabbitMQ配置信息,绑定交换器、队列、路由键设置
  3. *
  4. * <p>
  5. * 如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,可以将交换机、队列、消息都进行持久化,这样可以保证绝大部分情况下消息不会丢失。
  6. * 但还是会有小概率发生消息丢失的情况(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),
  7. * 如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。(transaction/confirm机制)
  8. *
  9. * <p>
  10. * 说明:
  11. * 1. 队列持久化:需要在声明队列的时候设置durable=true,如果只对队列进行持久化,那么mq重启之后队列里面的消息不会保存
  12. * 如果需要队列里面的消息也保存下来,那么还需要对消息进行持久化;
  13. * <p>
  14. * 2. 消息持久化:设置消息的deliveryMode = 2,消费者重启之后还能够继续消费持久化之后的消息;
  15. * 使用convertAndSend方式发送消息,消息默认就是持久化的,下面是源码:
  16. * new MessageProperties() --> DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT --> deliveryMode = 2;
  17. * <p>
  18. * 3.重启mq: CMD命令行下执行 net stop RabbitMQ && net start RabbitMQ
  19. */
  20. @Component
  21. public class RabbitMQConfig {
  22. private static final String DURABLE_QUEUE_NAME = "durable_queue_name";
  23. private static final String DURABLE_EXCHANGE_NAME = "durable_exchange_name";
  24. private static final String ROUTING_KEY = "user.#";
  25. private static final String QUEUE_NAME = "not_durable_queue_name";
  26. private static final String EXCHANGE_NAME = "not_durable_exchange_name";
  27. @Bean
  28. public Queue durableQueue() {
  29. // public Queue(String name) {
  30. // this(name, true, false, false);
  31. // }
  32. // public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
  33. //不指定durable的话默认好像也是true
  34. //public Queue(String name, boolean durable)
  35. //durable:是否将队列持久化 true表示需要持久化 false表示不需要持久化
  36. return new Queue(DURABLE_QUEUE_NAME, true);
  37. }
  38. @Bean
  39. public TopicExchange durableExchange() {
  40. // public AbstractExchange(String name) {
  41. // this(name, true, false);
  42. // }
  43. // public AbstractExchange(String name, boolean durable, boolean autoDelete) {
  44. // this(name, durable, autoDelete, (Map)null);
  45. // }
  46. //声明交换机的时候默认也是持久化的
  47. return new TopicExchange(DURABLE_EXCHANGE_NAME);
  48. }
  49. @Bean
  50. public Binding durableBinding() {
  51. //如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定
  52. return BindingBuilder.bind(durableQueue()).to(durableExchange()).with(ROUTING_KEY);
  53. }
  54. @Bean
  55. public Queue queue() {
  56. //public Queue(String name, boolean durable)
  57. //durable:是否将队列持久化 true表示需要持久化 false表示不需要持久化
  58. return new Queue(QUEUE_NAME, false);
  59. }
  60. @Bean
  61. public TopicExchange exchange() {
  62. return new TopicExchange(EXCHANGE_NAME,true,false);
  63. }
  64. @Bean
  65. public Binding binding() {
  66. return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY);
  67. }
  68. }

消费者手动确认消息配置

  1. listener:
  2. type: simple
  3. simple:
  4. #手动接收消息方式,# none:关闭ack;manual:手动ack;auto:自动ack
  5. acknowledge-mode: manual

失败消息处理策略

一般消息失败后需要重试,我们在yml中配置

  1. listener: # 开启消费者确认其机制
  2. simple:
  3. prefetch: 1 #消费者每次只能获取一条消息,处理完才能获取下一条(可实现能者多劳)
  4. acknowledge-mode: manual # none:关闭ack;manual:手动ack;auto:自动ack
  5. retry:
  6. enabled: true #开启消费者失败重试
  7. initial-interval: 1000ms #初始的失败等待时长为1
  8. multiplier: 1 #下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
  9. max-attempts: 3 #最大重试次数
  10. stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false

    在开启重试模式后,重试次数耗尽后消息消费仍然失败,则需要通过MessageRecoverer接口来处理,它包含三种不同的实现:

     1、RejectAndDontRequeueRecoverer:重试耗尽后,默认方式:直接丢弃消息,不符合多数业务需求

    2、ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队,容易造成队列死循环消费,不推荐

     3、RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机,人工再处理,推荐使用

以下介绍第三种处理方式:

  • 定义接收失败消息的交换机、队列及其绑定关系:
  1. /**
  2. * 接收错误消费的日志
  3. */
  4. @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "errorQueue"),
  5. exchange = @Exchange(name = "errorExchange", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"),
  6. key = "errorRouting"
  7. ))
  8. public void receiveErrorMessage(String message) {
  9. log.info("消费者收到发送错误的消息: " + message);
  10. }
  • 定义RepublishMessageRecoverer
  1. /**
  2. * 定义错误消息接收
  3. */
  4. @Configuration
  5. @Slf4j
  6. public class ErrorConfig {
  7. @Bean
  8. public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
  9. log.debug("加载RepublishMessageRecoverer");
  10. return new RepublishMessageRecoverer(rabbitTemplate,"errorExchange","errorRouting");
  11. }
  12. }

这个就开启了消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理。 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/996801
推荐阅读
相关标签
  

闽ICP备14008679号