当前位置:   article > 正文

RabbitMQ之生产者可靠性_mq在使用时生产者有必要实现确认代码吗

mq在使用时生产者有必要实现确认代码吗

1.RabbitMQ生产者可靠性

主要由生产者重连和生产者确认两种手段解决

2.生产者重连

有的时候由于网络波动,可能会出现客户端连接RabbitMQ失败的情况。通过配置我们可以开启连接失败后的重连机制

# Spring配置信息
spring:
  # Rabbitmq配置
  rabbitmq:
    # 设置RabbitMQ连接超时时间
    connection-timeout: 3s
    template:
      retry:
        # 开启超时重试机制
        enabled: true
        # 失败后的初始等待时间
        initial-interval: 1000ms
        # 失败后下次的等待时长倍数,下次等待时间 = initial-interval * multiplier
        multiplier: 1
        # 最大重试次数
        max-attempts: 3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

注:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制,如果一定要使用,需合理配置等待时长和重试次数,也可以考虑使用异步线程来执行发送消息的代码

3.生产者确认

RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制。开启确认机制后,在RabbitMQ成功收到消息后会返回确认消息给生产者
注:Publisher Return是专门用来返回路由失败的机制
(1)RabbitMQ的返回结果情况种类

  • 消息投递到了RabbitMQ,但是路由失败。此时会通过Publisher Return返回路由异常原因,然后返回ACK,告知投递成功(造成原因:routingKey匹配失败或交换机的配置有问题)
  • 临时消息(非持久化)投递到了RabbitMQ,并且成功放入消息队列,返回ACK,告知投递成功
  • 持久消息投递到了RabbitMQ,并且放入消息队列完成持久化,返回ACK,告知投递成功
  • 其他情况都会返回NACK,告知投递失败

(2)生产者确认代码实现

添加配置文件信息

# Spring配置信息
spring:
  # Rabbitmq配置
  rabbitmq:
    # 开启publisher confirm机制
    # none:关闭publisher confirm机制;simple:同步阻塞等待MQ的回执消息;correlated:MQ异步回调方式返回回执消息
    publisher-confirm-type: correlated
    # 开启publisher return机制
    publisher-returns: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

编写Publisher Return回调函数(注:每个RabbitTemplate只能配置一个ReturnCallback)

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取RabbitTemplate,也可使用@Autowired注解自动注入
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //设置ReturnCallback
         rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息返回:{},{},{},{},{}",
                    message, replyCode, replyText, exchange, routingKey);
        });    
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

编写ConfirmCallback(注:在每一个消息发送时候单独指定)

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Slf4j
@Configuration
public class Producer {
    @Autowired
    RabbitTemplate rabbitTemplate;
    public void sendMessage(Object message){
        // 1.创建CorrelationData对象
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 2.给Future添加ConfirmCallback
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                // Future发生异常时的处理逻辑(发生在Spring内部处理),基本不会触发
                log.error("handle message ack fail",ex);
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                // Future接收到回执的处理逻辑,参数中的result就是回执内容
                if(result.isAck()){ // result.isAck(),boolean类型,true代表收到ack,false代表收到nack
                    log.debug("发送消息成功,收到 ack!");
                }else{// result.getReason(),String类型,返回nack时的异常原因
                    log.error("发送消息失败,收到 nack,reason : {}" , result.getReason());
                }
            }
        });
        // 发送消息
        rabbitTemplate.convertAndSend("交换机名称", "routingKey值", message,correlationData);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

4.如何处理生产者的确认消息

(1)生产者确认需要额外的网络和系统资源开销,尽量不要使用
(2)如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己的业务问题
(3)对于nack消息可以有限次数重试,依然失败则记录异常信息

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/951376
推荐阅读
相关标签
  

闽ICP备14008679号