赞
踩
正在学RabbitMQ,特此记录一下,这里就不讲RabbitMQ基础了,直接进入主题。
TODO:记录下具体的springboot中具体如何配置
我们都知道,消息从生产端到消费端消费要经过大致4个步骤(AMQP协议):
这4个步骤中的每一步都有可能导致消息丢失,消息丢失不可怕,可怕的是丢失了我们还不知道,所以要有一些措施来保证系统的可靠性。这里的可靠并不是一定就100%不丢失了,磁盘损坏,机房爆炸等等都能导致数据丢失,当然这种都是极小概率发生,能做到99.999999%消息不丢失,就是可靠的了。下面来具体分析一下问题以及解决方案。
生产端可靠性投递,即生产端要确保将消息正确投递到RabbitMQ中。生产端投递的消息丢失的原因有很多,比如消息在网络传输的过程中发生网络故障消息丢失,或者消息投递到RabbitMQ时RabbitMQ挂了,那消息也可能丢失,而我们根本不知道发生了什么。针对以上情况,RabbitMQ本身提供了一些机制。
事务消息机制由于会严重降低性能,所以一般不采用这种方法,我就不介绍了,而采用另一种轻量级的解决方案——confirm消息确认机制。
confirm消息确认机制
什么是confirm消息确认机制?即发送方确认机制
(publisher confirm)顾名思义,就是生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。
通过下面这句代码来开启确认模式:
channel.confirmSelect();// 开启发送方确认模式
然后异步监听确认和未确认的消息:
channel.addConfirmListener(new ConfirmListener() {
//消息正确到达broker
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("已收到消息");
//做一些其他处理
}
//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息,标识:" + deliveryTag);
//做一些其他处理,比如消息重发等
}
});
springboot中配置
spring.rabbitmq.publisher-confirm-type=correlated
使用RabbitTemplate里需要实现内部接口ConfirmCallback
/** * @author mayj@xx.com.cn * @version v1.0 * @description MyCallBack * @date 2022/7/5 10:09 */ @Slf4j @Component public class MyCallBack implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct // 在Bean创建并完成赋值,执行初始化之前调用 public void init() { // 注入到RabbitTemplate rabbitTemplate.setConfirmCallback(this); } /** * correlationData 保存回调消息的ID及相关信息 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : null; if (ack) { log.info("交换机收到ID为:{}的消息!", id); } else { log.info("交换机还未收到ID为:{}的消息!", id); } } }
这样就可以让生产端感知到消息是否投递到RabbitMQ中了,当然这样还不够,稍后我会说一下极端情况。
当消息准确到达交换机后,由于路由键错误等问题,造成无法将消息路由到所绑定的队列,而此时生产者也无法感知,那么会出现消息丢失问题。我们可以使用以下两种方式来解决:
1)回退消息
springboot中通过设置 mandatory
参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。同时需要开启rabbitmq的消息返回模式。
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
/** * @author mayj@xx.com.cn * @version v1.0 * @description MyCallBack * @date 2022/7/5 10:09 */ @Slf4j @Component public class MyCallBack implements RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct // 在Bean创建并完成赋值,执行初始化之前调用 public void init() { // 注入到RabbitTemplate rabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { // 可以在当消息传递过程中不可达目的地时将消息返回给生产者 log.info("消息{},被交换机{}退回,退回原因:{},路由Key:{}", new String(message.getBody()), exchange, replyText, routingKey); } }
原生api是添加ReturnListener或实现ReturnCallback
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP
.BasicProperties basicProperties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("Basic.Return返回的结果是:" + message);
}
});
2)备份交换机
备份交换器,英文名称Alternate Exchange,简称AE,或者更直白的可以称之为“备胎交换器”。生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失,如果设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者的代码将变得复杂化。如果你不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。 可以通过在声明交换器(调用channel.exchangeDeclare方法)的时候添加alternate-exchange参数来实现,也可以通过策略的方式实现。如果两者同时使用的话,前者的优先级更高,会覆盖掉Policy的设置。(如果同时配置mandatory参数和备份交换机,则只有备份交换机生效)
以下不包含生产者和消费者代码:
/** * @author mayj@xx.com.cn * @version v1.0 * @description ConfirmCOnfig * @date 2022/7/6 17:40 */ public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; public static final String BACKUP_EXCHANGE_NAME = "backup.exchange"; public static final String BACKUP_QUEUE_NAME = "backup.queue"; public static final String WARNING_QUEUE_NAME = "warning.queue"; // 声明确认队列 @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } //声明确认队列绑定关系 @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("key1"); } //声明备份 Exchange @Bean("backupExchange") public FanoutExchange backupExchange(){ return new FanoutExchange(BACKUP_EXCHANGE_NAME); } //声明确认 Exchange 交换机的备份交换机 @Bean("confirmExchange") public DirectExchange confirmExchange(){ ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME) .durable(true) //设置该交换机的备份交换机 .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); return (DirectExchange)exchangeBuilder.build(); } // 声明警告队列 @Bean("warningQueue") public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } // 声明报警队列绑定关系 @Bean public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){ return BindingBuilder.bind(queue).to(backupExchange); } // 声明备份队列 @Bean("backQueue") public Queue backQueue(){ return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } // 声明备份队列绑定关系 @Bean public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){ return BindingBuilder.bind(queue).to(backupExchange); } }
当消息到达队列后,进行存放等待消费者获取,但我们都知道,RabbitMQ的实现是基于内存的,那这就会有个问题,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到磁盘中,这样就算RabbitMQ重启后也可以到硬盘中取数据恢复。那如何持久化呢?
message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给消费端。就需要给exchange、queue和message都进行持久化:
exchange持久化(exchange非必要情况可以选择不作持久化)
//第三个参数true表示这个exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
queue持久化:
//第二个参数true表示这个queue持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
message持久化:
//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
这样,如果RabbitMQ收到消息后挂了,重启后会自行恢复消息。但是却无法避免单机故障
且无法修复(比如磁盘损毁)而引起的消息丢失,这里就需要引入镜像队列。镜像队列相当于配置了副本,绝大多数分布式的东西都有多副本的概念来确保HA。在镜像队列中,如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(slave),这样有效的保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全的保证RabbitMQ消息不丢失(比如机房被炸。。。),但是配置了镜像队列要比没有配置镜像队列的可靠性要高很多,在实际生产环境中的关键业务队列一般都会设置镜像队列。
参考文章
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。