当前位置:   article > 正文

日常记录-SpringBoot整合RabbitMQ第四节(消费者确认)_boot rabbitmq 配置消费者

boot rabbitmq 配置消费者

一、auto模式(自动ACK)

RabbitMQ默认是auto模式,当监听消费者方法正常执行完毕后,由Spring自动向RabbitMQ返回ack确认;如果出现异常,就给RabbitMQ返回nack消费失败。

application.yml配置RabbitMQ消费者ACK应答模式

spring:
  rabbitmq:
    listener:
      simple:
        # none(无应答模式) auto(自动应答模式) manual(手动应答模式)
        acknowledge-mode: auto
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

二、none模式(无ACK)

RabbitMQ认为所有消息都会被成功消费,所以RabbitMQ投递消息后会立即删除消息

application.yml配置RabbitMQ消费者ACK应答模式

spring:
  rabbitmq:
    listener:
      simple:
        # none(无应答模式) auto(自动应答模式) manual(手动应答模式)
        acknowledge-mode: none
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

三、manual模式(手动ACK)

开发人员在处理完业务后,调用RabbitMQ封装好的API,向RabbitMQ返回ack确认消费成功或者消费失败

application.yml配置RabbitMQ消费者ACK应答模式

spring:
  rabbitmq:
    listener:
      simple:
        # none(无应答模式) auto(自动应答模式) manual(手动应答模式)
        acknowledge-mode: manual
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

四、消费者失败重试

可以利用Spring本身自动重试的机制,当消费者出现异常后,在消费者内部进行本地重试;而不是让消息立刻重新回到队列,然后让RabbitMQ重新投递,会导致CPU飙升。(这样会导致无限循环 -> 消息一旦出现异常会不断投放回到队列,再重新发送给消费者)。

application.yml配置
在这里插入图片描述

定义队列和交换机

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RetryQueueConfig {

    //定义direct类型交换机
    @Bean
    public DirectExchange retryExchange() {
        return ExchangeBuilder.directExchange("retry.exchange").build();
    }


    //定义持久化队列
    @Bean
    public Queue retryQueue() {
        return new Queue("retry.queue",true,false,false);
    }

    @Bean
    public Binding retryQueueBinding(Queue retryQueue, DirectExchange retryExchange) {
        return BindingBuilder.bind(retryQueue).to(retryExchange).with("retry");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

模拟生产者

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RetryQueueTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //模拟生产者
    @Test
    public void test(){
        rabbitTemplate.convertAndSend("retry.exchange","retry","模拟消息异常");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

控制台输出
重试三次后就把消息删除了
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

重试失败后的恢复策略

在刚刚的本地重试中,在达到最大次数后,消息会被丢弃,这是Spring内部机制决定的。

但是,其实在重试多次消费仍然失败后,SpringAMQP提供了MessageRecoverer接口,定义了不同的恢复策略可以用来进一步处理消息:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢失消息。是默认的处理策略

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

实际开发中,比较优雅的一个方案是RepublishMessageRecoverer,将失败消息重新投递到一个专门用于存储异常消息的队列中,等待后续人工处理。

RepublishMessageRecoverer策略代码

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



@Configuration
public class RepublishMessageRecovererConfig {

    /*
     * 消息消费失败后的恢复策略:使用RepublishMessageRecoverer策略:重试次数耗尽后,将失败消息投递到指定的交换机
     */
    @Bean
    public MessageRecoverer republishMsgRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
    }


    //定义Topic类型交换机
    @Bean
    public TopicExchange errorExchange() {
        return ExchangeBuilder.topicExchange("error.exchange").build();
    }

    //定义队列
    @Bean
    public Queue errorQueue() {
        return QueueBuilder.durable("error.queue").build();
    }

    //队列和交换机绑定
    @Bean
    public Binding errorQueueBinding(TopicExchange errorExchange, Queue errorQueue) {
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error.#");
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

在这里插入图片描述
这样就实现了异常消息重试耗尽后,就会投递到指定的异常队列中去,等待人工处理了

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/386652
推荐阅读
相关标签
  

闽ICP备14008679号