赞
踩
主要由生产者重连和生产者确认两种手段解决
有的时候由于网络波动,可能会出现客户端连接RabbitMQ失败的情况。通过配置我们可以开启连接失败后的重连机制
# Spring配置信息 spring: # Rabbitmq配置 rabbitmq: # 设置RabbitMQ连接超时时间 connection-timeout: 3s template: retry: # 开启超时重试机制 enabled: true # 失败后的初始等待时间 initial-interval: 1000ms # 失败后下次的等待时长倍数,下次等待时间 = initial-interval * multiplier multiplier: 1 # 最大重试次数 max-attempts: 3
注:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制,如果一定要使用,需合理配置等待时长和重试次数,也可以考虑使用异步线程来执行发送消息的代码
RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制。开启确认机制后,在RabbitMQ成功收到消息后会返回确认消息给生产者
注:Publisher Return是专门用来返回路由失败的机制
(1)RabbitMQ的返回结果情况种类
(2)生产者确认代码实现
添加配置文件信息
# Spring配置信息
spring:
# Rabbitmq配置
rabbitmq:
# 开启publisher confirm机制
# none:关闭publisher confirm机制;simple:同步阻塞等待MQ的回执消息;correlated:MQ异步回调方式返回回执消息
publisher-confirm-type: correlated
# 开启publisher return机制
publisher-returns: true
编写Publisher Return回调函数(注:每个RabbitTemplate只能配置一个ReturnCallback)
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { //获取RabbitTemplate,也可使用@Autowired注解自动注入 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); //设置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("消息返回:{},{},{},{},{}", message, replyCode, replyText, exchange, routingKey); }); } }
编写ConfirmCallback(注:在每一个消息发送时候单独指定)
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFutureCallback; @Slf4j @Configuration public class Producer { @Autowired RabbitTemplate rabbitTemplate; public void sendMessage(Object message){ // 1.创建CorrelationData对象 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 2.给Future添加ConfirmCallback correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { // Future发生异常时的处理逻辑(发生在Spring内部处理),基本不会触发 log.error("handle message ack fail",ex); } @Override public void onSuccess(CorrelationData.Confirm result) { // Future接收到回执的处理逻辑,参数中的result就是回执内容 if(result.isAck()){ // result.isAck(),boolean类型,true代表收到ack,false代表收到nack log.debug("发送消息成功,收到 ack!"); }else{// result.getReason(),String类型,返回nack时的异常原因 log.error("发送消息失败,收到 nack,reason : {}" , result.getReason()); } } }); // 发送消息 rabbitTemplate.convertAndSend("交换机名称", "routingKey值", message,correlationData); } }
(1)生产者确认需要额外的网络和系统资源开销,尽量不要使用
(2)如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己的业务问题
(3)对于nack消息可以有限次数重试,依然失败则记录异常信息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。