赞
踩
RabbitMQ相信大家都非常熟悉了,今天咱们来聊聊怎么保证RabbitMQ的可靠性。
那什么时候会出现问题呢?
第一种是生产端出现的问题。我们向队列中发送消息的时候,消息不一定可以发送到MQ中,这个时候如果我们不做任何处理,这样消息丢失了。
第二种则是RabbitMQ出现的问题。也就是说现在生产端的成功将消息发送到了RabbitMQ,但由于MQ并没有做持久化,这样宕机重启之后消息可能就丢失了。
第三种则是消费端的问题。消费端处理消息时如果出现异常,默认的解决方式是在重复消费多次,当次数超过阈值时直接删除消息,这也导致消息丢失。
接下来咱们就看看怎么应对以上三种问题。
这里我们需要清楚发送的一个大体流程。
生产端发送消息到MQ之后,会收到一个结果,这个结果有ack和nack两种。
其中ack代表消息成功到达了交换机,但并不意味者消息到达了队列。不过ack的情况下消息未送达队列,会有相应的错误信息提醒。
nack就代表消息并未送达交换机。
那么,怎么才能知道消息发送情况呢?
可以设置callback来获取消息发送结果。
局部callback设置如下
@GetMapping("testmq") public Result testmq(){ String orderId = String.valueOf(UUID.randomUUID()); String messageData = "下订单!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("orderId",orderId); map.put("messageData",messageData); map.put("createTime",createTime); // 设置发送的callback CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); correlationData.getFuture().addCallback(result -> { // 判断结果 if (result.isAck()) { log.info("发送成功"); } else { log.error("消息未达到交换机,发送失败"); } }, ex -> { log.error("出现异常,发送失败"); }); rabbitTemplate.convertAndSend(RabbitMQConfig.NORMALEXCHANGE, RabbitMQConfig.TESTROUTING, map, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, correlationData); return Result.succ("ok"); }
消息发送成功
交换机名称有误
队列路由出错
虽然没有错误,但给了我们warning。
这里就比较简单了,那就是做下持久化就可以了
首先是交换机,队列和消息的持久化
交换机
@Bean
DirectExchange normalExchange() {
/**
* durable 是否持久化
* autoDelete 没有queue绑定时是否自动删除
*/
return new DirectExchange(NORMALEXCHANGE, true, false);
}
队列
@Bean
public Queue cleanDQueue() {
return QueueBuilder.durable(CLEANQUEUE)
.build();
}
消息的持久化
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMALEXCHANGE, RabbitMQConfig.TESTROUTING, map, message -> {
// 设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}, correlationData);
消费端出现错误时,会进行重试,当重试次数超过阈值之后有三种解决方案,如下
这里我们以RepublishMessageRecoverer为例做下演示。
首先需要声明消费消息失败后传递的交换机和队列
@Bean DirectExchange normalExchange() { /** * durable 是否持久化 * autoDelete 没有queue绑定时是否自动删除 */ return new DirectExchange(NORMALEXCHANGE, true, false); } // 用于处理消费失败消息的队列 @Bean public Queue republishQueue() { return QueueBuilder.durable(REPULISHQUEUE) .build(); } // 绑定失败消费消息队列 @Bean Binding bindingRepublish() { return BindingBuilder.bind(republishQueue()).to(normalExchange()).with(REPULISHROUTING); }
然后配置下RepublishMessageRecoverer策略,随便找个config注入下bean就可以。
// 设置RepublishMessageRecoverer,消费失败的消息转移到另一队列中,交给管理员手动处理
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
/**
* NORMALEXCHANGE 接收消费失败消息的交换机
* REPULISHROUTING 接收消费失败消息的路由key
*/
return new RepublishMessageRecoverer(rabbitTemplate, NORMALEXCHANGE, REPULISHROUTING);
}
咱们看看如果消费出错会咋样
我们可以看到被消费的队列中信息被删除了。
然后我们设置的转入队列中的消息数加一,这时候我们可以接收下该队列中的信息,存储到数据库中,方便维护人员手动进行处理。
从生产端、RabbitMQ以及消费端三方面介绍了一下怎么保证RabbitMQ的可靠性,另外还有关于死信队列和延迟队列的内容在这篇博客中,大家有兴趣可以看一下。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。