赞
踩
RabbitMQ的一大特色就是其自身保证消息的可靠性,那么RabbitMQ是如何保证消息的可靠性呢?
RabbitMQ的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。
所以就要对消息进行持久化处理,持久化的条件(缺一不可):
完成了上述操作,当服务重启时可以保证交换器、队列、队列中的消息被还原至重启之前的状态。但是这样并不能保障服务运行时消息不丢失,例如:autoAck=true消费者接受消息之后还没正确完成处理就抛出异常或者消费者的服务器者直接crash了,这样也算数据丢失;即使正确消息已经被正确处理,但是后序代码抛出异常,使用Spring进行管理的话消费端业务逻辑会进行回滚,这也造成了实际意义上的消息丢失。为了确保这种情况下的数据不丢失,RabbitMQ支持消息确认——ACK。
什么是ACK消息确认机制?
ACK确认机制就是消费者收到消息并处理完成后要通知服务端,服务端才把消息从队列中删除:
消费者如何通知RabbitMQ消息成功消费?
如何进行手动消息确认?
在全局配置文件中设置:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
或者在配置类中设置:
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory =new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack
return factory;
}
然后运行一个Springboot应用,service方法如下:
@RabbitListener(queues = "nynu.news")
public void msgAndHeader(Message msg){
System.out.println(msg.getBody());
System.out.println(msg.getMessageProperties());
}
运行一个测试方法,发送一条消息到nynu.news队列中去:
@Test
// 点对点单播测试
void directDemo(){
rabbitTemplate.convertAndSend("exchange.direct","nynu.news",new Shoes(2,"李宁","全城7"));
}
运行测试方法之后,service方法接收到消息并在控制台打印出来,但是由于没有手动确认方法:
可以看到,队列中没有准备好的可以被接收的消息,但是有一个未确认的消息。这时候停止应用来模拟抛出异常或者消费者服务器crash:
没有被确认的消息重新放回了消息队列中。
在service方法中添加确认消息语句:
@RabbitListener(queues = "nynu.news")
public void msgAndHeader(Message msg, Shoes shoes, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel){
System.out.println(shoes);
System.out.println(msg.getBody());
System.out.println(msg.getMessageProperties());
try {
channel.basicAck(tag,false); //确认消息
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
需要注意的 basicAck 方法需要传递两个参数:
Channel类会提供了很多方法:
注意:如果忘记了ACK,那么后果很严重。当Consumer退出时,Message会一直重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个“内存泄漏“是致命的。
解决方法:
使用try-catch块捕获消费者中的异常。
设置重试次数(在全局配置文件添加如下配置):
#开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry. max-attempts=5
上述的应答方式主要都是消费者告诉消息队列已获取到消息并处理完毕。其实当生产者发布消息到RabbitMQ中,生产者需要知道是否真的已经发送到RabbitMQ中,需要RabbitMQ告诉生产者消息队列已收到消息:
Confirm机制:生产者将信道设置为confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所匹配的队列之后,RabbitMQ就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,RabbitMQr回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外RabbitMQ也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
Confirm模式最大的好处就在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条basic.nack来代替basic.ack的消息,在这个情形下,basic.nack中各域值的含义与basic.ack中相应各域含义是相同的,同时requeue域的值应该被忽略。通过nack一条或多条消息, Broker表明自身无法对相应消息完成处理,并拒绝为这些消息的处理负责。在这种情况下,client可以选择将消息re-publish。
channel.confirmSelect():将当前Channel设置为Confirm模式。
channel.waitForConfirms():发一个或一批消息,等待确认返回一个boolean值(true发送成功,false发送失败),如果出错了会返回本次发送的所有消息。
客户端实现生产者confirm有三种编程方式:
普通Confirm模式:
channel.confirmSelect();
String message = "Hello RabbitMQ:";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, (message + i).getBytes("UTF-8"));
boolean isPublished = channel.waitForConfirms();
每发送一条消息后,调用waitForConfirms()方法,等待服务器端Confirm。实际上是一种串行Confirm了,每publish一条消息之后就等待服务端Confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。
批量Confirm模式,:
channel.confirmSelect();
String message = "Hello RabbitMQ:";
for (int i = 0; i < 5; i++) {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, (message + i).getBytes("UTF-8"));
}
boolean isAllPublished = channel.waitForConfirms();
发送一批消息之后,调用waitForConfirms()方法,等待服务端Confirm,但是服务器并不是对每一条消息都进行ack,而是批量处理,如果使用wireshark等软件抓包之后可以发现在某些basic.ack数据报文中multiple的值为true,这与前面我们讲解的一致,为true时将确定所有比指定的delivery-tag参数都小的消息都得到了确认。这种批量确认的模式极大的提高了Confirm效率,但是如果一旦出现Confirm返回false或者超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降。
异步Confirm模式:提供一个回调方法,服务端Confirm了一条或者多条消息后Client端会回调这个方法。
先看一下waitForConfirms方法的源码:
public boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException { if (this.nextPublishSeqNo == 0L) { throw new IllegalStateException("Confirms not selected"); } else { long startTime = System.currentTimeMillis(); synchronized(this.unconfirmedSet) { while(this.getCloseReason() == null) { if (this.unconfirmedSet.isEmpty()) { boolean aux = this.onlyAcksReceived; this.onlyAcksReceived = true; return aux; } if (timeout == 0L) { this.unconfirmedSet.wait(); } else { long elapsed = System.currentTimeMillis() - startTime; if (timeout <= elapsed) { throw new TimeoutException(); } this.unconfirmedSet.wait(timeout - elapsed); } } throw (ShutdownSignalException)Utility.fixStackTrace(this.getCloseReason()); } } }
在waitForConfirms()方法内部维护了一个同步块代码,而unconfirmedSet就是存储delivery-tag标识的。该方法为每一个Channel维护一个unconfirmedSet的消息序号集合,每publish一条数据,集合中元素加1,每回调一次ack方法,unconfirmedSet就删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirmedSet集合最好采用有序集合SortedSet存储结构。而且waitForConfirmsOrDie()方法内部其实就是调用了waitForConfirms()方法。
事务机制:通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案。
channel.txSelect():用于将当前Channel设置为Transaction模式。
channel.txCommit():用于提交事务。
channel.txRollback():用于回滚事务。
String message = "Hello RabbitMQ:";
try {
channel.txSelect();
for (int i = 0; i < 5; i++) {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, (message + i).getBytes("UTF-8"));
}
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
}
注意:事务机制是非常非常非常消耗性能的,最好使用Confirm机制,Confirm机制相比事务机制性能上要好很多。
每个队列只能设置为一种模式confirm或者transaction,不能混用否则会抛异常。
RabbitMQ常用的三种部署模式:
为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。下面自己画了一张图介绍普通集群丢失消息情况:
如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。
下面介绍下三种HA策略模式:
命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
但是:HA 镜像队列有一个很大的缺点就是: 系统的吞吐量会有所下降
为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,
但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。
比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?
产线网络环境太复杂,所以不知数太多,消息补偿机制需要建立在消息要写入DB日志,发送日志,接受日志,两者的状态必须记录。
然后根据DB日志记录check 消息发送消费是否成功,不成功,进行消息补偿措施,重新发送消息处理。
菜鸟博主只是主要学习了前两种方法。后两种作为拔高,待羽翼丰满之后在做深入学习,先搬运flyrock的博客中的内容作为记录了解混个“耳熟”。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。