当前位置:   article > 正文

Rabbit MQ如何保证消息不丢失_rabbitmq如何保证消息不丢

rabbitmq如何保证消息不丢

RabbitMQ模型

上面的图是官网中关于一条消息发送的整个流程,消息会经历下面几个流程:

  1. 生产者将消息发送到Exchange
  2.  Exchange根据Routing Key路由到Queue
  3. 消费者订阅Queue,从Queue中获取数据消费

可能发生消息丢失的情况

通过上面的RabbitMQ发送消息的模型我们可以知道在下面几个过程中消息可能会丢失:

  • 生产者将消息发送到Exchange时丢失。例如在发送过程中因为网络原因发送失败,亦或者是因为发送到了一个不存在的Exchange。
  • 路由失败。这种情况就是消息已经发送到Exchange了,但是Exchange将消息根据Routing Key路由到对应的Queue时失败,例如这个Exchange根本就没有绑定Queue等等。
  • 客户端在处理消息时失败。客户端已经获取了消息,但是在处理消息过程中出现异常,没有对异常做处理,导致消息丢失了。

上面这几种情况都是消息在向不同的模块传递时失败导致消息丢失了,如果上面的情况都能解决就真的消息不会丢失了吗?然而结果并非如此,如果RabbitMQ服务宕机了,如果这些消息没有被持久化,等RabbitMQ服务重启之后,这些没有持久化的消息也将丢失。

分析了这么多的情况可能会导致消息丢失,下面将根据各种情况对应的分析来解决。

生产者发送消息到Exchange失败

对于网络原因导致消息发送到Exchange失败这个我们很好感知,我们只需要对发送异常做处理即可。排除这个原因,默认情况下生产者将消息发送到Exchange是不会返回任何信息给生产者,至于消息是不是真的到了服务端作为生产者根本无从可知。

对于这个问题RabbitMQ中有两种方式可以用来解决问题:

  • 通过事务机制实现
  • 通过发送方确认机制实现

事务

RabbitMQ中我们可以使用channel.txSelect开启事务,使用channel.txCommit和channel.txRollback分别用来提交事务和回滚事务。与数据库的事务有稍许不同,数据库每次都需要打开事务,且最后与之对应的有commit或者rollback,而RabbitMQ中channel中的事务只需要开启一次,可以多次commit或者rollback。

  1. //channel开启事务
  2. channel.txSelect();
  3. //发送3条消息
  4. String msgTemplate = "测试事务消息内容[%d]";
  5. channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,1).getBytes(StandardCharsets.UTF_8));
  6. channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,2).getBytes(StandardCharsets.UTF_8));
  7. channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,3).getBytes(StandardCharsets.UTF_8));
  8. //消息回滚
  9. channel.txRollback();
  10. //成功提交
  11. channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,4).getBytes(StandardCharsets.UTF_8));
  12. channel.txCommit();

上面的方法中一共发送了4次消息,前三次发送后最后调用了txRollback,这将导致前三条消息回滚而没有发送成功。而第四次发送之后调用commit,最后在RabbitMQ中只会有一条消息。
虽然事务可以保证消息一定被提交到服务器,而且在客户端编码方面足够简单。但是它也不是那么完美,在性能方面事务会带来较大的性能影响。如果对性能要求不是特别高的采用事务方式也是可以的,如果有性能方面的要求,可以使用Channel的确认机制。

confirm机制

 

confirm机制是为了解决事务性能问题的一种方案,我们可以通过使用channel.confirmSelect方法开启confirm模式,需要注意的是confirm机制与事务是不能共存的,简单的说就是开启事务就无法使用confirm,开启confirm就无法使用事务。当开启confirm之后,每次发送消息时都会生成一个唯一的ID,如果消息投递成功RabbitMQ就会给客户端发送一个ACK确认,通过唯一ID我们就知道哪个消息发送成功了。与事务方式不同的是事务需要每次发送完成之后commit或者rollback,这会导致不能继续发送必须等待RabbitMQ的响应。而confirm它的发送与ACK是不冲突的,发送之后不需要等待ACK完成之后才能进行,这样大大发送消息的效率。

  1. //创建Exchange
  2. channel.exchangeDeclare("confirm.exchange", BuiltinExchangeType.DIRECT, true, false, new HashMap<>());
  3. //创建Queue
  4. channel.queueDeclare("confirm.queue", true, false, false, new HashMap<>());
  5. //绑定路由
  6. channel.queueBind("confirm.queue", "confirm.exchange", "confirm");
  7. channel.confirmSelect();
  8. channel.addConfirmListener(new ConfirmListener() {
  9. @Override
  10. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  11. log.info("ack : deliveryTag = {},multiple = {}", deliveryTag, multiple);
  12. }
  13. @Override
  14. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  15. log.error("nack : deliveryTag = {},multiple = {}", deliveryTag, multiple);
  16. }
  17. });
  18. String msgTemplate = "测试消息[%d]";
  19. for (int i = 0; i < 10; i++) {
  20. channel.basicPublish("confirm.exchange", "confirm", new AMQP.BasicProperties(), String.format(msgTemplate, i).getBytes(StandardCharsets.UTF_8));
  21. }

上面代码运行后打印的日志信息如下:

14:10:03.537 [AMQP Connection 127.0.0.1:5672] INFO com.buydeem.share.rabbit.confirm.ConfirmDemo - ack : deliveryTag = 8,multiple = true
14:10:03.541 [AMQP Connection 127.0.0.1:5672] INFO com.buydeem.share.rabbit.confirm.ConfirmDemo - ack : deliveryTag = 10,multiple = true

最后打印了两条信息,可能你运行的时候打印的不是两条。从结果可以看出deliveryTag分别为8和10,同时两条日志的multiple都为true。这代表了第一条编号为8或者小于8的消息都已经confirm,这里面的multiple代表是意思是是不是confirm了多条。

Exchange路由到队列失败

在生产者将消息推送到RabbitMQ时,我们可以通过事务或者confirm模式来保证消息不会丢失。但是这两种措施只能保证消息到达Exchange,如果我们的消息无法根据RoutingKey到达对应的Queue中,那么我们的消息最后就会丢失。

对于这种情况,RabbitMQ中在发送消息时提供了mandatory参数。如果mandatory为true时,Exchange根据自身的类型和RoutingKey无法找到对应的Queue,它将不会丢掉该消息,而是会将消息返回给生产者。

  1. //创建Exchange
  2. channel.exchangeDeclare("mandatory.exchange", BuiltinExchangeType.DIRECT, true, false, new HashMap<>());
  3. //创建Queue
  4. channel.queueDeclare("mandatory.queue", true, false, false, new HashMap<>());
  5. //绑定路由
  6. channel.queueBind("mandatory.queue", "mandatory.exchange", "mandatory");
  7. channel.addReturnListener(new ReturnListener() {
  8. @Override
  9. public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
  10. log.error("replyCode = {},replyText ={},exchange={},routingKey={},body={}",replyCode,replyText,exchange,routingKey,new String(body));
  11. }
  12. });
  13. //设置mandatory = true
  14. //void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
  15. channel.basicPublish("mandatory.exchange", "mandatory-1",true, new AMQP.BasicProperties(), "测试mandatory的消息".getBytes(StandardCharsets.UTF_8));

在我们调用BasicPublish方法的时候,我们设置了mandatory为true,同时还给channel设置了ReturnListener用来监听路由到队列失败的消息。最后该程序运行结果如下:

14:16:36.197 [AMQP Connection 127.0.0.1:5672] ERROR com.buydeem.share.rabbit.mandatory.MandatoryDemo - replyCode = 312,replyText =NO_ROUTE,exchange=mandatory.exchange,routingKey=mandatory-1,body=测试mandatory的消息

 从打印结果我们可以看出,这条消息确实没有路由到对应的队列,通过该设置当消息无法路由到对应的队列时,他将会返回给消费者而不是被丢弃,让消费知道消息已经被退回了。

消息未持久化服务重启消息丢失

上面这两点我们是站在生成者的角度来考量的,通过将Channel设置成confirm或者事务模式,并且在发送消息时设置mandatory为true来保证消息不丢失。但是这样就真的不会丢失了吗?虽然消息已经推送到RabbitMQ中,但是如果我们没有将消息持久化,服务器重启之后那么我们的消息还是会丢掉。对于持久化这一点,我们不仅仅是需要将消息持久化,同时我们还要把Exchange和Queue都持久化。在RabbitMQ中,我们可以通过将durable的值设置为true来保证持久化。

消费者获取消息后处理消息失败

通过上面两个点我们保证了从生产者到RabbitMQ消息不会丢失,现在到了消费者消费消息了。在消费者处理业务时,可能由于我们业务代码的异常导致消息没有被正常处理完,但是消息已经从RabbitMQ中的队列移除了,这样我们的消息就丢失了。

在生产者发送消息到RabbitMQ时我们可以通过ack来确认消息是否到达了服务端,与之类似的是,消费者在消费消息时同样提供手动ack模式。默认情况下,消费者从队列中获取消息后会自动ack,我们可以通过手动ack来保证消费者主动的控制ack行为,这样我们可以避免业务异常导致消息丢失的情况。

  1. DeliverCallback deliverCallback = new DeliverCallback() {
  2. @Override
  3. public void handle(String consumerTag, Delivery message) throws IOException {
  4. try {
  5. byte[] body = message.getBody();
  6. String messageContent = new String(body, StandardCharsets.UTF_8);
  7. if("error".equals(messageContent)){
  8. throw new RuntimeException("业务异常");
  9. }
  10. log.info("收到的消息内容:{}",messageContent);
  11. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  12. }catch (Exception e){
  13. log.info("消费消息失败!重回队列!");
  14. channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
  15. }
  16. }
  17. };
  18. CancelCallback cancelCallback = new CancelCallback() {
  19. @Override
  20. public void handle(String consumerTag) throws IOException {
  21. log.info("取消订阅:{}",consumerTag);
  22. }
  23. };
  24. channel.basicConsume("confirm.queue",false,deliverCallback,cancelCallback);

上面的代码我们通过手动ack来控制消息是否被成功消费,当收到的消息内容为error时,我们手动抛出异常。在异常处理中将requeue设置成true,这将使该消息重新回到队列。运行代码可以看到,该消息一直在重复的打印出来。

总结

通过上面三点,我们知道了如何保证消息不丢失。其实这个过程很自然,首先是将消息推送到服务器时我们要保证消息的确是到了服务器。然后就是存在服务器中的消息要保证持久化,这样能解决服务器重启导致的内存中的消息不会被丢失。最后就是消费者在消费消息时,我们通过手动ack来告诉服务器是不是应该将消息移除队列。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/597447
推荐阅读
相关标签
  

闽ICP备14008679号