赞
踩
消息丢失:下图是消息从生产者发送到消费者接收的关系图。通过图片可以看出,消息在生产者、MQ、消费者这三个环节都有可能丢失。
配置
package com.qiangesoft.rabbitmq.producer; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 消息发送配置 * * @author qiangesoft * @date 2024-05-08 */ @Configuration public class MessageConfig { public static final String EXCHANGE = "simple.exchange"; public static final String QUEUE = "simple.queue"; public static final String ROUTING_KEY = "simple"; @Bean public DirectExchange simpleExchange() { return ExchangeBuilder .directExchange(EXCHANGE) // 持久化交换机 .durable(true) .build(); } @Bean public Queue simpleQueue() { return QueueBuilder // 持久化队列 .durable(QUEUE) // 避免消息堆积、懒加载 .lazy() .build(); } @Bean public Binding simpleBinding(Queue simpleQueue, DirectExchange simpleExchange) { return BindingBuilder.bind(simpleQueue).to(simpleExchange).with(ROUTING_KEY); } }
背景:生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
解决方案:配置连接超时时间、重试机制。
spring:
rabbitmq:
# 设置MQ的连接超时时间
connection-timeout: 1s
template:
# 连接重试机制
retry:
enabled: true
# 失败后的初始等待时间
initial-interval: 1000ms
# 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
multiplier: 1
# 最大重试次数
max-attempts: 3
背景:
解决方案:配置Publisher Confirm机制、Publisher Return机制。
spring:
rabbitmq:
# 开启publisher confirm机制,并设置confirm类型,确保消息到达交换机
publisher-confirm-type: correlated
# 开启publisher return机制,确保消息到达队列
publisher-returns: true
package com.qiangesoft.rabbitmq.producer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.nio.charset.StandardCharsets; import java.util.UUID; /** * 生产者 * * @author qiangesoft * @date 2024-05-08 */ @Slf4j @RequestMapping("/producer") @RestController public class ProducerController { @Autowired public RabbitTemplate rabbitTemplate; @GetMapping("/send") public void send(String content) { CorrelationData correlation = getCorrelationData(); Message message = MessageBuilder .withBody(content.getBytes(StandardCharsets.UTF_8)) .setMessageId(UUID.randomUUID().toString()) // 消息持久化 .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); // 正常发送 rabbitTemplate.convertAndSend(MessageConfig.EXCHANGE, MessageConfig.ROUTING_KEY, message, correlation); } private static CorrelationData getCorrelationData() { // 异步回调返回回执,开启publisher confirm机制【确保消息到达交换机】 CorrelationData correlation = new CorrelationData(); correlation.getFuture().addCallback(new ListenableFutureCallback<>() { @Override public void onFailure(Throwable ex) { log.error("消息发送异常,ID:{},原因:{}", correlation.getId(), ex.getMessage()); } @Override public void onSuccess(CorrelationData.Confirm result) { log.info("触发【publisher confirm】机制"); if (result.isAck()) { log.info("消息发送成功到达交换机,ID:{}", correlation.getId()); } else { // 消息发送失败 log.error("消息发送失败未到达交换机,ID:{},原因:{}", correlation.getId(), result.getReason()); } } }); return correlation; } }
package com.qiangesoft.rabbitmq.producer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; /** * 消息路由失败回退配置 * * @author qiangesoft * @date 2024-05-08 */ @Slf4j @Configuration public class ReturnsCallbackConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 消息路由失败退回,设置ReturnsCallback【消息到达交换机没有达到队列】 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { log.info("触发【publisher return】机制"); log.error("消息投递失败未到达队列,应答码:{},原因:{},交换机:{},路由键:{},消息:{}", returned.getReplyCode(), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey(), returned.getMessage()); } }); } }
@Bean
public DirectExchange simpleExchange() {
return ExchangeBuilder
.directExchange(EXCHANGE)
// 持久化交换机
.durable(true)
.build();
}
@Bean
public Queue simpleQueue() {
return QueueBuilder
// 持久化队列
.durable(QUEUE)
// 避免消息堆积、懒加载
.lazy()
.build();
}
Message message = MessageBuilder
.withBody(content.getBytes(StandardCharsets.UTF_8))
.setMessageId(UUID.randomUUID().toString())
// 消息持久化
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 发送
rabbitTemplate.convertAndSend(MessageConfig.EXCHANGE, MessageConfig.ROUTING_KEY, message, correlation);
spring:
rabbitmq:
listener:
simple:
# 自动ack
acknowledge-mode: auto
spring:
rabbitmq:
listener:
simple:
# 消费者失败重试机制
retry:
enabled: true
# 初始的失败等待时长为1秒
initial-interval: 1000ms
# 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
multiplier: 1
# 最大重试次数
max-attempts: 3
# true无状态;false有状态。如果业务中包含事务,这里改为false
stateless: true
package com.qiangesoft.rabbitmq.consumer; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 消息消费失败配置 * ps:配置处理失败消息的交换机和队列 * * @author qiangesoft * @date 2024-05-08 */ @Configuration public class ErrorMessageConfig { public static final String EXCHANGE = "error.exchange"; public static final String QUEUE = "error.queue"; public static final String ROUTING_KEY = "error"; @Bean public DirectExchange errorMessageExchange() { return new DirectExchange(EXCHANGE); } @Bean public Queue errorQueue() { return new Queue(QUEUE, true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) { return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(ROUTING_KEY); } @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, EXCHANGE, ROUTING_KEY); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。