赞
踩
RabbitMQ发布确认机制确保消息从生产者成功传输到交换机和队列,提高系统可靠性。在Spring Boot项目中,通过配置publisher-confirm-type
和publisher-returns
,启用发布确认和消息返回机制。配置RabbitTemplate
的确认回调和返回回调,可以捕捉消息传输状态,处理不同传输结果。测试场景包括消息无法到达交换机、消息到达交换机但无法到达队列以及消息成功到达队列。通过合理设置和优化,可以确保高并发环境下的消息可靠传输,适用于金融支付、电商系统等对消息传输可靠性要求高的场景。
发布确认(Publisher Confirms)是RabbitMQ提供的一种机制,用于确保消息从生产者发送到RabbitMQ服务器并被成功处理。与事务机制不同,发布确认的性能开销更小,非常适合高吞吐量的场景。发布确认机制提供了两种类型的确认:
在Spring Boot项目中,通过配置文件来启用发布确认机制非常方便。以下是需要添加到application.properties
或application.yml
中的配置:
# 消息到达交换机后会回调发送者
spring.rabbitmq.publisher-confirm-type=correlated
# 消息无法路由到队列时回调发送者
spring.rabbitmq.publisher-returns=true
配置解释:
publisher-confirm-type
:设置为correlated
表示使用CorrelationData
来关联确认与发送的消息。publisher-returns
:设置为true
表示启用消息返回机制,当消息无法路由到队列时会触发回调。在Spring AMQP中,发布确认类型通过ConfirmType
枚举类来定义:
public enum ConfirmType {
SIMPLE, // 使用 RabbitTemplate#waitForConfirms() 或 waitForConfirmsOrDie()
CORRELATED, // 使用 CorrelationData 关联确认与发送的消息
NONE // 不启用发布确认
}
为了使用发布确认机制,需要配置RabbitTemplate
,包括设置确认回调和返回回调:
@Slf4j @Configuration public class RabbitTemplateConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); // 设置mandatory为true,当找不到队列时,broker会调用basic.return方法将消息返还给生产者 rabbitTemplate.setMandatory(true); // 设置确认回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息已经到达Exchange"); } else { log.info("消息没有到达Exchange"); } if (correlationData != null) { log.info("相关数据:" + correlationData); } if (cause != null) { log.info("原因:" + cause); } }); // 设置返回回调 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("消息无法到达队列时触发"); log.info("ReturnCallback: " + "消息:" + message); log.info("ReturnCallback: " + "回应码:" + replyCode); log.info("ReturnCallback: " + "回应信息:" + replyText); log.info("ReturnCallback: " + "交换机:" + exchange); log.info("ReturnCallback: " + "路由键:" + routingKey); }); return rabbitTemplate; } }
为了测试发布确认机制,我们需要配置相应的交换机和队列:
@Slf4j @Configuration public class ConfirmConfig { @Bean public Queue confirmQueue() { return new Queue(Constant.CONFIRM_QUEUE, false); } @Bean DirectExchange confirmExchange() { DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, false, false); directExchange.addArgument("alternate-exchange", Constant.CONFIRM_BACKUP_EXCHANGE); return directExchange; } @Bean Binding bindingConfirm() { return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(Constant.CONFIRM_ROUTING_KEY); } @Bean FanoutExchange backupExchange() { return new FanoutExchange(Constant.CONFIRM_BACKUP_EXCHANGE, false, false); } @Bean public Queue backupQueue() { return new Queue(Constant.CONFIRM_BACKUP_QUEUE, false); } @Bean public Queue warningQueue() { return new Queue(Constant.CONFIRM_WARNING_QUEUE, false); } @Bean Binding bindingConfirmBackup() { return BindingBuilder.bind(backupQueue()).to(backupExchange()); } @Bean Binding bindingConfirmWarning() { return BindingBuilder.bind(warningQueue()).to(backupExchange()); } }
测试代码:
@Autowired
RabbitTemplate rabbitTemplate;
String msg = "一条用于发布确认的消息";
@GetMapping("/noExchange")
public void noExchange() {
rabbitTemplate.convertAndSend("noExchange", "noExchange", msg);
}
配置了rabbitTemplate.setMandatory(true)
,当消息无法到达交换机时会回调:
ConfirmCallback 消息没有到达Exchange
ConfirmCallback 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noExchange' in vhost '/', class-id=60, method-id=40)
测试代码:
@GetMapping("/toExchange")
public void toExchange() {
rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, "xxx.xxx.xxx", msg);
}
输出:
ConfirmCallback 消息已经到达Exchange
没有收到无法到达队列的消息,是因为配置了备份队列,消息被路由到了备份队列。
修改配置:
@Bean
DirectExchange confirmExchange() {
DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, true, false);
return directExchange;
}
测试结果:
消息无法到达队列时触发
ReturnCallback: 消息:(Body:'一条用于发布确认的消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback: 回应码:312
ReturnCallback: 回应信息:NO_ROUTE
ReturnCallback: 交换机:myConfirmExchange
ReturnCallback: 路由键:xxx.xxx.xxx
ConfirmCallback 消息已经到达Exchange
此时,ConfirmCallback
和ReturnCallback
都被调用了。
测试代码:
@GetMapping("/toQueue")
public void toQueue() {
rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, Constant.CONFIRM_ROUTING_KEY, msg);
}
输出:
ConfirmCallback 消息已经到达Exchange
下图展示了RabbitMQ发布确认流程:
事务机制和发布确认机制都是确保消息可靠投递的手段,但它们在实现和性能方面有明显区别:
txSelect
、txCommit
和txRollback
实现,性能开销较大,不适合高并发场景。失。
本文详细介绍了RabbitMQ消息的发布确认机制,包括配置、实现及其在不同场景下的表现。通过合理配置和使用发布确认机制,可以有效提高消息传输的可靠性,确保消息在高并发环境下的可靠投递。希望本文能够帮助读者深入理解并应用RabbitMQ的发布确认机制,提高系统的可靠性和性能。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。