赞
踩
文章最后附带rabbitmq生产环境的一些参数设置参考。
实现消息可靠性投递有两种方法,一个是开启事务,一个是使用确认机制,具体参考下图讲解。
通过上图可知消息投递失败将会发生在三个地方,生产者到交换机,交换机到队列,队列到消费者。
所以为了保证消息的可靠性,需要开启消息确认机制(confirmCallback、returnCallback)以及消费端手动确认模式(手动ack)或者消费者重试机制。
spring:
rabbitmq:
host: 192.168.25.131
port: 5672
virtual-host: /
#开启发送端确认,mq服务是否收到消息,收到消息触发confirmCallback方法回调
publisher-confirm-type: correlated
#开启发送端消息抵达队列的确认,消息是否从【交换机/其他】到达指定队列,触发returnCallback进行回调,详情看配置文件描述
publisher-returns: true
#只要抵达队列,以异步发送优先回调我们这个return confirm【跟publisher-returns是一组都设置成true】
template:
mandatory: true
username: admin
password: 123456
参数讲解:
ConfirmCallback:生产者—broker;
ReturnCallback:交换机—队列;
public void initRabbitTemplate(){
//设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id)
* @param ack 消息是否成功收到,只要消息抵达Broker服务器,ack就等于true,不管消息之后的状态是否成功
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
/**
* 1、做好消息确认机制(publisher、consumer【手动ack】)
* 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一遍
*/
if (!ack){
System.out.println("confirm...correlationData["+correlationData+"]==>ack["+ack+"]==>cause["+cause+"]");
}
}
});
//设置消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* @param message 投递失败的消息详细信息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 当时这个消息发给哪个交换机
* @param routingKey 当时这个消息用的哪个路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//报错误了。修改数据库当前消息的状态-》错误。
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
}
});
}
配置完之消息投递失败就会进入对应回调方法,根据业务需求,可在方法里进行补偿策略。
这里直接测试,不再进行项目搭建准备的工作,队列交换机的创建可参考个人主页里的其他文章。
准备一个交换机和一个队列
@Bean("order_event_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
public Exchange oderEventExchange(){
//durable:是否持久化 autoDelete:是否自动删除
DirectExchange directExchange = new DirectExchange("order_event_exchange",true, false);
return directExchange;
}
@Bean("order_release_queue")
public Queue orderReleaseQueue() {
Queue queue = new Queue("order_release_queue", true, false, false);
return queue;
}
@Bean
public Binding orderReleaseOrderBinging(@Qualifier("order_release_queue") Queue queue,
@Qualifier("order_event_exchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.release.order").noargs();
}
分别测试两个阶段:生产者-broker、交换机-队列。
@PostMapping("/sendOrder")
public String sentOrder(@RequestBody Order order){
//CorrelationData:给消息设置一个唯一id
rabbitTemplate.convertAndSend("aaaaa","order.release.order", order,new CorrelationData(order.getId()));
return "ok";
}
测试结果
因为找不到对应的交换机,所以投递消息到broker失败,触发消息确认回调方法。
@PostMapping("/sendOrder")
public String sentOrder(@RequestBody Order order){
rabbitTemplate.convertAndSend("order_event_exchange","222222", order,new CorrelationData(order.getId()));
return "ok";
}
测试结果,收到返回消息
消息消费确认主要会用到三个方法:
basicAck()、basicNack()/basicReject();
basicAck是确认消息收到,另两个方法是拒绝消息。
basicNack()和basicReject()方法的区别就是basicNack可以批量拒绝消息,比如:由于某些原因造成队列里有其他未确认消息,使用basicNack设置是否批量拒绝参数为true,就会把本次及之前的所有未确认消息全部拒绝。
具体看代码,我们可以在消费端这样写:
public void handleStockLockedRelease(SkuUnlockInfoTo skuUnlockInfoTo, Message message, Channel channel) throws IOException {
try {
orderService.unLockStock(skuUnlockInfoTo);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
//是否是被重试的消息
if (message.getMessageProperties().isRedelivered()) {
/*
* @param deliveryTag:消息的唯一标签,按消息入队顺序生成,比如:1、2、3
* @param multiple:是否开启消息批量拒绝
* @param requeue:是否将消息重新放入队列
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
//TODO 防止异常死循环,最好放在数据库或记录一下,用定时任务定时重试
} else {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
利用try…catch即可针对异常来做出处理方法。如果想更精细一些,那么我们就可以利用消费者重试机制了,它可以设置重试次数,每次重试的间隔时间等。
1、重试机制是基于本地的,跟rabbitmq队列没有任何关系,并不会将消息反复投递到队列,然后重复消费。
2、消费者端的代码中不能使用try{}catch(){}捕获异常,否则异常无法抛出,也就无法触发重试机制,非得捕获已知异常,也要在catch中重新抛出未知异常来触发重试机制。
3、消息重试次数用完之后,消息就需要被处理,这里rabbit提供了3个恢复器。
各恢复器含义:
RejectAndDontRequeueRecoverer:拒绝并且不会将消息重新发回队列;
RepublishMessageRecoverer:重新发布消息(自定义将消息转发到指定的其他交换机、队列);
ImmediateRequeueMessageRecoverer:立即重新返回原队列。
常用 RepublishMessageRecoverer;
默认 RejectAndDontRequeueRecoverer。
4、除了可以使用mq提供的恢复器来实现消息的自定义处理,也可以利用死信队列来达到目的。
接下来,我们分别利用mq恢复器和死信队列来实现消息的转发。
spring:
rabbitmq:
host: 192.168.25.131
port: 5672
virtual-host: /
#开启发送端确认,mq服务是否收到消息,收到消息触发confirmCallback方法回调
publisher-confirm-type: correlated
#开启发送端消息抵达队列的确认,消息是否从【交换机/其他】到达指定队列,触发returnCallback进行回调,详情看配置文件描述
publisher-returns: true
#只要抵达队列,以异步发送优先回调我们这个return confirm【跟publisher-returns是一组都设置成true】
template:
mandatory: true
listener:
simple:
# 消息确认模式 manual:手动ack auto:自动
acknowledge-mode: auto
# 消费者消息重试机制配置
retry:
enabled: true # 开启消费者失败重试
initial-interval: 3000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
username: admin
password: 123456
需要再多定义一个交换机和队列,专门用来接收处理超出重试机制次数的消息,通过MessageRecoverer的实现类RepublishMessageRecoverer来设置。
在上文正常的交换机(order_event_exchange)和队列(order_release_queue)的基础下,再多创建一套
@Bean("error_event_exchange")
public DirectExchange errorMessageExchange(){
DirectExchange directExchange = new DirectExchange("error_event_exchange",true, false);
return directExchange;
}
@Bean("error_queue")
public Queue errorQueue(){
Queue queue = new Queue("error_queue", true, false, false);
return queue;
}
@Bean
public Binding errorBinding(@Qualifier("error_queue") Queue queue,
@Qualifier("error_event_exchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("error").noargs();
}
/**
* 设置消息重试机制完成之后,异常依然存在,消息应该到达哪里。
*/
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error_event_exchange", "error");
}
@Service
@RabbitListener(queues = {"order_release_queue"})
public class OrderCloseListener {
@RabbitHandler
public void listener(Order order) {
System.out.println("收到order信息");
System.out.println(order);
//模拟异常
int a = 1 / 0;
}
}
@Service
@RabbitListener(queues = {"error_queue"})
public class OrderErrorListener {
@RabbitHandler
public void listener(Order order) throws IOException {
System.out.println("收到error-order信息");
System.out.println(order);
}
}
死信队列就是普通的队列,换了一种叫法而已,当然,也要清楚什么情况下,消息才会被转发到死信队列:
1、过期的消息;2、超出队列长度的消息;3、被拒绝且不会被重新入队的消息。
这里,我们就要利用死信的第三个特点来实现消息自动转发效果,上文我们说过,消息重试机制的默认恢复器是RejectAndDontRequeueRecoverer,而这个恢复器的特点就是将消息拒绝并删除掉(不会重新入队)。所以,与其说用死信队列来实现,倒不如说用消息重试机制的默认恢复器+死信队列来实现消息的转发。
首先注释掉前边配置的RepublishMessageRecoverer,否则会覆盖默认恢复器,另外,给正常队列order_release_queue设置死信交换机和队列,当然,这里的死信交换机和队列就是我们之前创建的error系列。
完整代码
@Configuration
public class MyMQConfig {
/**
*功能描述: 创建交换机
* @author zhouwenjie
* @date 2022/5/6 0:36
* @param
* @return org.springframework.amqp.core.Exchange
*/
@Bean("order_event_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
public Exchange oderEventExchange(){
//durable:是否持久化 autoDelete:是否自动删除
DirectExchange directExchange = new DirectExchange("order_event_exchange",true, false);
return directExchange;
}
/**
* 功能描述: 【订单相关】声明正常收取队列
*
* @param
* @return org.springframework.amqp.core.Queue
* @author zhouwenjie
*/
@Bean("order_release_queue")
public Queue orderReleaseQueue() {
Map<String, Object> arguments = new HashMap<>();
//配置队列的死信应该发送给哪个交换机
arguments.put("x-dead-letter-exchange", "error_event_exchange");
//发送给交换机使用的路由key
arguments.put("x-dead-letter-routing-key", "error");
Queue queue = new Queue("order_release_queue", true, false, false,arguments);
return queue;
}
@Bean
public Binding orderReleaseOrderBinging(@Qualifier("order_release_queue") Queue queue,
@Qualifier("order_event_exchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.release.order").noargs();
}
@Bean("error_event_exchange")
public DirectExchange errorMessageExchange(){
DirectExchange directExchange = new DirectExchange("error_event_exchange",true, false);
return directExchange;
}
@Bean("error_queue")
public Queue errorQueue(){
Queue queue = new Queue("error_queue", true, false, false);
return queue;
}
@Bean
public Binding errorBinding(@Qualifier("error_queue") Queue queue,
@Qualifier("error_event_exchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("error").noargs();
}
}
基于之前的监听代码不动,接口也一样,另外,注意一下,重启项目之前一定要先删除原来的正常队列,让程序自动基于现在的配置重建,否则会报异常,如下图即可。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。