赞
踩
有的时候由于网络波动,可能会出现发送者连接MQ失败的情况,通过配置我们可以开启连接失败后的重连机制:
注意: 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
SpringAMOP提供了 Publisher Confirm 和 Publisher Return 两种确认机制。开启确机制认后,当发送者发送消息给MQ后,MO会返回确认结果给发送者。返回的结果有以下几种情况:
配置说明:
发送消息,指定消息ID、消息ConfirmCallback
发送成功与否都会执行 ConfirmCallback 发送失败才会执行 ReturnCallback 并且ReturnCallback只需要初始化一次就行,所以就在配置类中初始化一次,使用PostConstruct
发送者:
@Slf4j @Configuration @RequiredArgsConstructor public class MqConfig { private final RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { log.error("监听到了消息return callback"); log.debug("exchange:{}",returnedMessage.getExchange()); log.debug("routingKey:{}",returnedMessage.getRoutingKey()); log.debug("message:{}",returnedMessage.getMessage()); log.debug("replyCode:{}",returnedMessage.getReplyCode()); log.debug("replyText:{}",returnedMessage.getReplyText()); } }); } }
@Test public void testConfirmCallback() throws InterruptedException { //0. 创建correlationData CorrelationData cd = new CorrelationData(UUID.randomUUID().toString()); cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { log.error("Spring amqp 处理确认结果异常",ex); } @Override public void onSuccess(CorrelationData.Confirm result) { // 判断是否成功 if(result.isAck()){ //成功 log.debug("收到ConfirmCallback ack,消息发送成功"); }else { //失败 log.error("收到ConfirmCallback nack,消息发送失败!reason:{}",result.getReason()); } } }); //1. 交换机名 String exchangeName = "hmall.direct"; //2. 消息 String message = "hello,Spring amqp"; //3. 发送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message + "red",cd); rabbitTemplate.convertAndSend(exchangeName, "yellow", message + "yellow",cd); rabbitTemplate.convertAndSend(exchangeName, "blue2", message + "blue",cd); //路由失败 Thread.sleep(2000); }
结果:
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
RabbitMQ实现数据持久化包括三个方面:
交换机持久化
队列持久化
消息持久化(如果交换机队列是持久的,但是消息是临时,那么MQ重启后消息也会丢失)
java代码:将消息转化为非持久的
@Test
public void testSendMessage() {
//1. 自定义构建消息
Message msg = MessageBuilder.withBody("hello,SpringAMQP".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //非持久
//.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久
.build();
//2. 发送消息
rabbitTemplate.convertAndSend("object.queue",msg);
}
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut
. PageOut
会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。
为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。
控制台操作: 要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:
Java客户端操作:
消费者确实机制(Consumer Acknowledgement)是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:
ack:成功处理消息,RabbitMQ从队列中删除该消息
nack:消息处理失败,RabbitMO需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.
当业务出现异常时,根据异常判断返回不同结果:
SpringAMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的requeue到mg。我们可以通过在application.yaml文件中添加配置来开启重试机制:
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
失败消息处理策略
将失败处理策略改为RepublishMessageRecoverer:
@Configuration @RequiredArgsConstructor public class ErrorMessageConfiguration { private final RabbitTemplate rabbitTemplate; @Bean public DirectExchange errorExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue"); } @Bean public Binding errorQueueBinding(){ return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error"); } @Bean public MessageRecoverer messageRecoverer(){ return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error"); } }
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x))
,例如求绝对值函数。
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:
但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
所以,我们要尽可能避免业务被重复执行。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:
所以,我们要尽可能避免业务被重复执行。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:
我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。
举例:
因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:
方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息
SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
此时接收对象可以直接用Message接收
方案二:是结合业务逻辑,基于业务本身做判断。以余额支付业务为例:
java代码多了第1、2步
public void listenPaySuccess(Long orderId){
//1.查询订单
Order order = orderService.getById(orderId);
//2. 判断订单状态 是否为未支付
if(order == null || order.getStatus() != 1){
//不做处理
return;
}
//3. 标记订单状态为已支付
orderService.markOrderPaySuccess(orderId);
}
延迟消息: 发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务: 设置在一定时间之后才执行的任务。
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
如果 队列 通过dead-letter-exchange属性指定了一个 交换机 ,那么该队列中的死信就会投递到这个交换机中。这个交换机 称为死信交换机(Dead LetterExchange,简称DLX)。
dlx交换机代码
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dlx.queue", durable = "true"),
exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT),
key = {"hi"}
))
public void listendlxQueue(String message) {
log.info("消费者监听到dlx.queue的消息:{}", message);
}
normal交换机
@Slf4j @Configuration public class NormalConfiguration { @Bean public DirectExchange normalExchange(){ return new DirectExchange("normal.direct"); } @Bean public Queue normalQueue(){ return QueueBuilder.durable("normal.queue").deadLetterExchange("dlx.direct").build(); } @Bean public Binding normalExchangeBinding(Queue normalQueue,DirectExchange normalExchange){ return BindingBuilder.bind(normalQueue).to(normalExchange).with("hi"); } }
值得一提的是,两个交换机中绑定的key要一致,比如例中都是"hi"
发送方添加了延迟时间的消息
@Test public void testSendDelayMessage() { // //1. 自定义构建消息 // Message msg = MessageBuilder.withBody("hello,SpringAMQP".getBytes(StandardCharsets.UTF_8)) // .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //非持久化 .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化 // .build(); //2. 发送消息 rabbitTemplate.convertAndSend("normal.direct", "hi", "hello", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("10000"); return message; } }); }
注意,自定义构建的消息是不具备将消息转化为JSON格式的,而我们之前设置了发送消息是JSON格式的,所以这里不能自定义消息。
当消息从normal.direct交换机到normal.queue队列,过了10s后,发现还没有人接收,则会将消息由dlx.direct交换机发送给dlx.queue队列,这样就真正的消费者就可以接受延迟了10s后的消息。
这个插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列
基于注解方式:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true",type = ExchangeTypes.DIRECT),
key = {"hi"}
))
public void listenDelayQueue(String message) {
log.info("消费者监听到dlx.queue的消息:{}", message);
}
基于@Bean方式
发送消息时需要通过消息头x-delay来设置过期时间:
@Test
public void testSendDelayMessageByPlugin() {
//发送消息
rabbitTemplate.convertAndSend("delay.direct", "hi", "hello", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10000);
return message;
}
});
}
延迟插件更方便,只要声明一个延迟交换机、队列及其消费者即可,不用声明两套,值得注意的是发送消息中延迟函数变成了setDelay
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。