当前位置:   article > 正文

RabbitMQ - 如保证消息的可靠性?

RabbitMQ - 如保证消息的可靠性?

目录

一、消息可靠性

1.1、生产者消息确认(生产者角度)

1.1.1、理论

1.1.2、实践

1.2、消息持久化(消息角度)

1.2.1、理论

1.3、消费者消息确认(消费者角度)

1.3.1、理论

1.3.2、实践

1.4、失败重试机制(失败后的处理机制)

1.4.1、理论


一、消息可靠性


1.1、生产者消息确认(生产者角度)

1.1.1、理论

在生产者这边,RabbitMQ 提供了 消息确认机制 来确保生产者的消息到达队列。

具体的,生产者将消息发送给 MQ 之后,会返回一个结果给生产者,表示消息是否处理成功,具体有以下两种响应:

  1. publish-confirm 正常响应
    1. 消息成功投递到交换机,再转发到对应的队列,最后成功被消费者拿到,返回 ack.
    2. 消息未投递到交换机(比如交换机不存在,或者是交换机名字写错了),返回 nack.
  2. publish-return 异常响应
    1. 消息投递到交换机,但是没有路由到队列(比如指定的队列名字写错了),返回 ack,以及路由失败的原因.

最后生产者这边的回调接收到响应后,根据不同的 ack 执行不同的“策略”(类似于你去买书,然后拿到书以后具体要干啥,都由你决定).

Ps:确认机制发送消息时,需要给每一个消息设置一个全局唯一的 id, 以区分不同消息,避免 ack 冲突.

1.1.2、实践

a)再 publisher 微服务的 application.yml 中添加配置:

  1. spring:
  2. rabbitmq:
  3. publisher-confirm-type: correlated
  4. publisher-returns: true
  5. template:
  6. mandatory: true

配置说明:

  1. publish-confirm-type:开启publisher-confirm,这里支持两种类型,
    1. simple(不推荐,类似死等,占用资源):同步等待confirm结果,直到超时.
    2. correlated(推荐):异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback.
  2. publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback.
  3. template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息.

b)每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

  1. @Slf4j
  2. @Configuration
  3. public class CommonConfig implements ApplicationContextAware {
  4. @Override
  5. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  6. // 获取RabbitTemplate
  7. RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
  8. // 设置ReturnCallback
  9. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  10. log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
  11. replyCode, replyText, exchange, routingKey, message.toString());
  12. });
  13. }
  14. }

Ps:ApplicationContextAware 就是 Spring 容器启动时的要执行的通知接口,通过 setApplicationContext 方法实现具体的通知.

c)生产者发送消息,指定 ID,消息  ConfirmCallback

  1. @Test
  2. public void testSendMessage2SimpleQueue() throws InterruptedException {
  3. // 消息体
  4. String message = "hello, spring amqp!";
  5. // 消息ID,需要封装到CorrelationData中
  6. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  7. // 添加callback
  8. correlationData.getFuture().addCallback(
  9. result -> {
  10. if(result.isAck()){
  11. // ack,消息成功
  12. log.debug("消息发送成功, ID:{}", correlationData.getId());
  13. }else{
  14. // nack,消息失败
  15. log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
  16. }
  17. },
  18. ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
  19. );
  20. // 发送消息
  21. rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData);
  22. }

1.2、消息持久化(消息角度)

1.2.1、理论

MQ 默认时内存存储消息,通过开启持久化功能(设置 durable = true),就可以将消息持久化到文件中,保证保证消息不丢失.

Ps:消息要持久化的前提是交换机(不一定,但最好是)和队列是持久化的.

1.2.2、实践

a)交换机持久化

  1. @Bean
  2. public DirectExchange simpleExchange(){
  3. // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
  4. return new DirectExchange("simple.direct", true, false);
  5. }

b)队列持久化

  1. @Bean
  2. public Queue simpleQueue(){
  3. // 使用QueueBuilder构建队列,durable就是持久化的
  4. return QueueBuilder.durable("simple.queue").build();
  5. }

c)消息持久化

  1. public void testDurableMessage() {
  2. //1.构造一个持久的消息
  3. Message message = MessageBuilder.withBody("hello".getBytes())
  4. .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
  5. .build();
  6. rabbitTemplate.convertAndSend("simple.queue", message);
  7. }

Ps:delivery_mode = 2 就表示消息要持久化.

1.3、消费者消息确认(消费者角度)

1.3.1、理论

在编程式的 RabbitMQ 中消费者有两种应答方式,自动应答和手动应答,而一定我们在消费消息时开启了手动应答,那么当消费消息完成后不会主动删除消息,而是需要通过客户端手动调用basicAck 方法才会进行应答(内部逻辑就是从内存和硬盘上删除消息)。

在注解式的 RabbitMQ 支持消费者确认机制,即:消费者处理消息后可以向 MQ 发送 ack 回执,MQ收到ack回执后才会删除该消息.

SpringAMQP 允许配置三种确认模式:

  • manual:手动ack,需要在消费者执行的消息代码结束后,调用api发送ack。
  • auto:自动ack,由 spring 监测消费者的执行的消费代码是否出现异常,没有异常则返回ack;抛出异常则返回nack,然后会将消息重新加入到队列,再发送给消费者,然后再次异常...,无限循环.
  • none:关闭ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除

1.3.2、实践

这里只需要配置以下 application.yml 文件,添加以下配置:

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. prefetch: 1
  6. acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack

1.4、失败重试机制(失败后的处理机制)

1.4.1、理论

刚刚讲到,消费者消费确认,SpringAMQP 提供了三种确认模式,其中 auto 这种方式,在消费者执行消费代码遇到异常时,会重新将消息加入到队列中,然后发送给消费者,再次异常,无限循环,导致 mq 的消息处理飙升,带来不必要的压力.

假设消费任务如下:

  1. @Component
  2. public class SpringRabbitListener {
  3. @RabbitListener(queues = "simple.queue")
  4. public void listenSimpleQueue(String msg) {
  5. System.out.println("消费者接收到消息:" + msg);
  6. System.out.println("开始消费!");
  7. System.out.println(1/0);
  8. System.out.println("消费完成!");
  9. }
  10. }

我们可以利用 Spring 的 retry 机制,在消费者出现异常时,利用本地重试,而不是无限制的加入到 mq 队列,只需要对消费者的配置文件进行以下配置:

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. prefetch: 1
  6. retry:
  7. enabled: true # 开启消费者失败重试
  8. initial-interval: 1000 # 初始的失败等待时长为1
  9. multiplier: 3 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
  10. max-attempts: 4 # 最大重试次数
  11. stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

在开启重试模式以后,若重试次数耗尽,并且消息依然失败,则需要有 MessageRecoverer 接口来处理,他包含三种不同的实现:

  1. RejectAndDontRequeueRecoverer(默认方式):重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  2. ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  3. RepublishMessageRecoverer(推荐方式):重试耗尽后,将失败消息投递到指定的交换机,再由交换机投递到指定的队列.
     

上述第三种方式比较推荐,如下图:

1.4.2、实践

这里就测试以下推荐方案 RepublishMessageRecoverer

a)首先要定义用来接收失败消息的交换机、队列、绑定关系,最后定义 RepublishMessageRecoverer(Bean 的方式注入,覆盖 Spring 默认的方案):

  1. @Configuration
  2. public class ErrorMessageConfig {
  3. @Bean
  4. public DirectExchange errorMessageExchange() {
  5. return new DirectExchange("error.direct");
  6. }
  7. @Bean
  8. public Queue errorQueue() {
  9. return new Queue("error.queue", true);
  10. }
  11. @Bean
  12. public Binding errorBinding() {
  13. return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
  14. }
  15. @Bean
  16. public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
  17. return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
  18. }
  19. }

b)定义消费者执行的消费任务

  1. @Component
  2. public class SpringRabbitListener {
  3. @RabbitListener(queues = "simple.queue")
  4. public void listenSimpleQueue(String msg) {
  5. System.out.println("消费者接收到消息:" + msg);
  6. System.out.println("开始消费!");
  7. System.out.println(1/0);
  8. System.out.println("消费完成!");
  9. }
  10. }

c)启动消费者,如下:

d)查看失败队列中具体信息(异常栈信息和信息信息)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/805532
推荐阅读
相关标签
  

闽ICP备14008679号