当前位置:   article > 正文

rabbitmq unacked消息如何处理_RabbitMQ如何保证消息的可靠投递?

spring-boot-starter-amqp unacked
adc5234a23cc666a5040506f651e16b1.png

在这里插入图片描述

Spring Boot整合RabbitMQ

github地址:
https://github.com/erlieStar/rabbitmq-examples

Spring有三种配置方式

  1. 基于XML
  2. 基于JavaConfig
  3. 基于注解

当然现在已经很少使用XML来做配置了,只介绍一下用JavaConfig和注解的配置方式

RabbitMQ整合Spring Boot,我们只需要增加对应的starter即可

    org.springframework.boot   spring-boot-starter-amqp 

基于注解

在application.yaml的配置如下

spring:  rabbitmq:    host: myhost    port: 5672    username: guest    password: guest    virtual-host: /log:  exchange: log.exchange  info:    queue: info.log.queue    binding-key: info.log.key  error:    queue: error.log.queue    binding-key: error.log.key  all:    queue: all.log.queue    binding-key: '*.log.key'

消费者代码如下

@Slf4j@Componentpublic class LogReceiverListener {    /**     * 接收info级别的日志     */    @RabbitListener(            bindings = @QueueBinding(                    value = @Queue(value = "${log.info.queue}", durable = "true"),                    exchange = @Exchange(value = "${log.exchange}"type = ExchangeTypes.TOPIC),                    key = "${log.info.binding-key}"            )    )    public void infoLog(Message message) {        String msg = new String(message.getBody());        log.info("infoLogQueue 收到的消息为: {}", msg);    }    /**     * 接收所有的日志     */    @RabbitListener(            bindings = @QueueBinding(                    value = @Queue(value = "${log.all.queue}", durable = "true"),                    exchange = @Exchange(value = "${log.exchange}"type = ExchangeTypes.TOPIC),                    key = "${log.all.binding-key}"            )    )    public void allLog(Message message) {        String msg = new String(message.getBody());        log.info("allLogQueue 收到的消息为: {}", msg);    }}

生产者如下

@RunWith(SpringRunner.class)@SpringBootTestpublic class MsgProducerTest {    @Autowired    private AmqpTemplate amqpTemplate;    @Value("${log.exchange}")    private String exchange;    @Value("${log.info.binding-key}")    private String routingKey;    @SneakyThrows    @Test    public void sendMsg() {        for (int i = 0; i 

Spring Boot针对消息ack的方式和原生api针对消息ack的方式有点不同

原生api消息ack的方式

消息的确认方式有2种

自动确认(autoAck=true)
手动确认(autoAck=false)

消费者在消费消息的时候,可以指定autoAck参数

String basicConsume(String queue, boolean autoAck, Consumer callback)

autoAck=false: RabbitMQ会等待消费者显示回复确认消息后才从内存(或者磁盘)中移出消息

autoAck=true: RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的消费了这些消息

手动确认的方法如下,有2个参数

basicAck(long deliveryTag, boolean multiple)

deliveryTag: 用来标识信道中投递的消息。RabbitMQ 推送消息给Consumer时,会附带一个deliveryTag,以便Consumer可以在消息确认时告诉RabbitMQ到底是哪条消息被确认了。
RabbitMQ保证在每个信道中,每条消息的deliveryTag从1开始递增

multiple=true: 消息id<=deliveryTag的消息,都会被确认

myltiple=false: 消息id=deliveryTag的消息,都会被确认

消息一直不确认会发生啥?

如果队列中的消息发送到消费者后,消费者不对消息进行确认,那么消息会一直留在队列中,直到确认才会删除。
如果发送到A消费者的消息一直不确认,只有等到A消费者与rabbitmq的连接中断,rabbitmq才会考虑将A消费者未确认的消息重新投递给另一个消费者

Spring Boot中针对消息ack的方式

有三种方式,定义在AcknowledgeMode枚举类中

7221a8c3feb6722c0100074b87d00b9c.png

spring boot针对消息默认的ack的方式为AUTO。

在实际场景中,我们一般都是手动ack。

application.yaml的配置改为如下

spring:  rabbitmq:    host: myhost    port: 5672    username: guest    password: guest    virtual-host: /    listener:      simple:        acknowledge-mode: manual # 手动ack,默认为auto

相应的消费者代码改为

@Slf4j@Componentpublic class LogListenerManual {    /**     * 接收info级别的日志     */    @RabbitListener(            bindings = @QueueBinding(                    value = @Queue(value = "${log.info.queue}", durable = "true"),                    exchange = @Exchange(value = "${log.exchange}"type = ExchangeTypes.TOPIC),                    key = "${log.info.binding-key}"            )    )    public void infoLog(Message message, Channel channel) throws Exception {        String msg = new String(message.getBody());        log.info("infoLogQueue 收到的消息为: {}", msg);        try {            // 这里写各种业务逻辑            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        } catch (Exception e) {            channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsefalse);        }    }}

我们上面用到的注解,作用如下

83bb680e466712d98872b5b024ad9087.png

基于JavaConfig

既然用注解这么方便,为啥还需要JavaConfig的方式呢?
JavaConfig方便自定义各种属性,比如同时配置多个virtual host等

具体代码看GitHub把

RabbitMQ如何保证消息的可靠投递

一个消息往往会经历如下几个阶段

1ed4aa7057c559f57e7daec87ff11d40.png

在这里插入图片描述


所以要保证消息的可靠投递,只需要保证这3个阶段的可靠投递即可

生产阶段

这个阶段的可靠投递主要靠ConfirmListener(发布者确认)和ReturnListener(失败通知)
前面已经介绍过了,一条消息在RabbitMQ中的流转过程为
producer -> rabbitmq broker cluster -> exchange -> queue -> consumer

ConfirmListener可以获取消息是否从producer发送到brokerReturnListener可以获取从exchange路由不到queue的消息

我用Spring Boot Starter 的api来演示一下效果

application.yaml

spring:  rabbitmq:    host: myhost    port: 5672    username: guest    password: guest    virtual-host: /    listener:      simple:        acknowledge-mode: manual # 手动ack,默认为autolog:  exchange: log.exchange  info:    queue: info.log.queue    binding-key: info.log.key

发布者确认回调

@Componentpublic class ConfirmCallback implements RabbitTemplate.ConfirmCallback {    @Autowired    private MessageSender messageSender;    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        String msgId = correlationData.getId();        String msg = messageSender.dequeueUnAckMsg(msgId);        if (ack) {            System.out.println(String.format("消息 {%s} 成功发送给mq", msg));        } else {            // 可以加一些重试的逻辑            System.out.println(String.format("消息 {%s} 发送mq失败", msg));        }    }}

失败通知回调

@Componentpublic class ReturnCallback implements RabbitTemplate.ReturnCallback {    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {        String msg = new String(message.getBody());        System.out.println(String.format("消息 {%s} 不能被正确路由,routingKey为 {%s}", msg, routingKey));    }}

@Configurationpublic class RabbitMqConfig {    @Bean    public ConnectionFactory connectionFactory(            @Value("${spring.rabbitmq.host}"String host,            @Value("${spring.rabbitmq.port}") int port,            @Value("${spring.rabbitmq.username}"String username,            @Value("${spring.rabbitmq.password}"String password,            @Value("${spring.rabbitmq.virtual-host}"String vhost) {        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);        connectionFactory.setPort(port);        connectionFactory.setUsername(username);        connectionFactory.setPassword(password);        connectionFactory.setVirtualHost(vhost);        connectionFactory.setPublisherConfirms(true);        connectionFactory.setPublisherReturns(true);        return connectionFactory;    }    @Bean    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,                                         ReturnCallback returnCallback, ConfirmCallback confirmCallback) {        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);        rabbitTemplate.setReturnCallback(returnCallback);        rabbitTemplate.setConfirmCallback(confirmCallback);        // 要想使 returnCallback 生效,必须设置为true        rabbitTemplate.setMandatory(true);        return rabbitTemplate;    }}

这里我对RabbitTemplate做了一下包装,主要就是发送的时候增加消息id,并且保存消息id和消息的对应关系,因为RabbitTemplate.ConfirmCallback只能拿到消息id,并不能拿到消息内容,所以需要我们自己保存这种映射关系。在一些可靠性要求比较高的系统中,你可以将这种映射关系存到数据库中,成功发送删除映射关系,失败则一直发送

@Componentpublic class MessageSender {    @Autowired    private RabbitTemplate rabbitTemplate;    public final Map unAckMsgQueue = new ConcurrentHashMap<>();    public void convertAndSend(String exchange, String routingKey, String message) {        String msgId = UUID.randomUUID().toString();        CorrelationData correlationData = new CorrelationData();        correlationData.setId(msgId);        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);        unAckMsgQueue.put(msgId, message);    }    public String dequeueUnAckMsg(String msgId) {        return unAckMsgQueue.remove(msgId);    }}

测试代码为

@RunWith(SpringRunner.class)@SpringBootTestpublic class MsgProducerTest {    @Autowired    private MessageSender messageSender;    @Value("${log.exchange}")    private String exchange;    @Value("${log.info.binding-key}")    private String routingKey;    /**     * 测试失败通知     */    @SneakyThrows    @Test    public void sendErrorMsg() {        for (int i = 0; i 

先来测试失败者通知

输出为

消息 {this is error message 0} 不能被正确路由,routingKey为 {test}消息 {this is error message 0} 成功发送给mq消息 {this is error message 2} 不能被正确路由,routingKey为 {test}消息 {this is error message 2} 成功发送给mq消息 {this is error message 1} 不能被正确路由,routingKey为 {test}消息 {this is error message 1} 成功发送给mq

消息都成功发送到broker,但是并没有被路由到queue中

再来测试发布者确认

输出为

消息 {this is info message 0} 成功发送给mqinfoLogQueue 收到的消息为: {this is info message 0}infoLogQueue 收到的消息为: {this is info message 1}消息 {this is info message 1} 成功发送给mqinfoLogQueue 收到的消息为: {this is info message 2}消息 {this is info message 2} 成功发送给mq

消息都成功发送到broker,也成功被路由到queue中

存储阶段

这个阶段的高可用还真没研究过,毕竟集群都是运维搭建的,后续有时间的话会把这快的内容补充一下

消费阶段

消费阶段的可靠投递主要靠ack来保证。
前文已经介绍了原生api ack的方式和Spring Boot框架ack的方式

总而言之,在生产环境中,我们一般都是单条手动ack,消费失败后不会重新入队(因为很大概率还会再次失败),而是将消息重新投递到死信队列,方便以后排查问题

总结一下各种情况

  1. ack后消息从broker中删除
  2. nack或者reject后,分为如下2种情况
    (1) reque=true,则消息会被重新放入队列
    (2) reque=fasle,消息会被直接丢弃,如果指定了死信队列的话,会被投递到死信队列

相关阅读

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/520923
推荐阅读
相关标签
  

闽ICP备14008679号