当前位置:   article > 正文

【RabbitMQ笔记08】消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)_消息队列如何保证消息不丢失

消息队列如何保证消息不丢失

这篇文章,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。

目录

一、防止消息丢失

1.1、消息确认机制(生产者)

(1)生产者丢失消息

(2)生产者消息确认机制

1.2、消息确认机制(消费者)

(1)消费者丢失消息

(2)消费者消息确认机制

1.3、消息持久化(RabbitMQ)

(1)RabbitMQ丢失消息

(2)消息持久化机制


一、防止消息丢失

RabbitMQ消息队列,在使用的时候,可能会存在消息丢失的情况,所谓的消息丢失就是生产者发送的消息没办法被消费者正确的消费,消息队列中导致消息丢失的地方有三个,分别是:

  • 第一种情况:生产者发送的消息没有正确的发送到RabbitMQ里面,导致发送的消息丢失。
  • 第二种情况:消费者从RabbitMQ消费消息时候,消费失败,但是RabbitMQ认为消费成功,从而删除了消息。
  • 第三种情况:RabbitMQ中保存的消息还没有被消费者消费,此时RabbitMQ服务宕机,导致内存中的消息丢失。

1.1、消息确认机制(生产者)

(1)生产者丢失消息

生产者丢失消息,是指:当生产者发送消息给RabbitMQ的时候,此时消息发送失败了,并且生产者又没有重新发送这一条消息,所以这个时候,生产者这一条失败的消息就丢失了。

既然是生产者发送消息失败导致这一条消息丢失的,那么我们在处理这个丢失消息问题的时候,就可以这样做:当生产者消息发送失败之后,可以让生产者再次发送这一条消息,这里就有一个问题啦,那就是生产者怎么知道消息有没有发送成功???

RabbitMQ给我们提供了一个机制,即:发布确认机制,大致思想是:当生产者将消息发送到RabbitMQ之后,并且RabbitMQ正确接收到消息并将其放入Queue队列里面时,RabbitMQ会返回一个ACK标识给生产者,生产者接收到ACK标识就可以认为消息发送成功啦;如果消息接收失败,RabbitMQ会返回一个NACK标识,表示接收失败。

(2)生产者消息确认机制

生产者消息确认机制,上一篇文章已经介绍了(【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式),这里就不再重复。

1.2、消息确认机制(消费者)

(1)消费者丢失消息

如果生产者已经将消息正确的发送到RabbitMQ里面了,消费者从Queue队列里面获取消息消费时候,如果消费失败,那么此时就会导致这一条消息丢失,这是因为,默认情况下,RabbitMQ将消息分发给消费者之后,消费者接收到消息时候,就会返回一个ACK标识给消息队列RabbitMQ,此时RabbitMQ就会将这一条消息从Queue队列里面删除,但是这种情况下,消费者是否正确将这条消息消费了,RabbitMQ是不知道的,所以这就有可能导致丢失。

如何解决消费者丢失消息???

  • 既然丢失消息是因为消费者消费失败,并且RabbitMQ把消息删除了,那么我们就可以开启手动确认的方式来告诉RabbitMQ,消费者是否正确的消费消息,是否可以将消息从Queue队列里面删除了。

(2)消费者消息确认机制

  • 消费者进行消息确认,需要关闭自动确认,将【basicConsume()】方法的第二个参数设置为【false】。
  • 消息成功消费之后,主动调用【basicAck()】方法,返回ACK标识给RabbitMQ。
  1. package com.rabbitmq.demo.dropmsg;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. /**
  5. * @version 1.0.0
  6. * @Date: 2023/2/25 16:30
  7. * @Copyright (C) ZhuYouBin
  8. * @Description: 消息消费者
  9. */
  10. public class Consumer {
  11. public static void main(String[] args) {
  12. // 1、创建连接工厂
  13. ConnectionFactory factory = new ConnectionFactory();
  14. // 2、设置连接的 RabbitMQ 服务地址
  15. factory.setHost("127.0.0.1"); // 默认就是本机
  16. factory.setPort(5672); // 默认就是 5672 端口
  17. // 3、获取连接
  18. Connection connection = null; // 连接
  19. Channel channel = null; // 通道
  20. try {
  21. connection = factory.newConnection();
  22. // 4、获取通道
  23. channel = connection.createChannel();
  24. // 5、声明 Exchange,如果不存在,则会创建
  25. String exchangeName = "exchange_dropmsg_2023";
  26. channel.exchangeDeclare(exchangeName, "direct");
  27. // 6、指定需要操作的消息队列,如果队列不存在,则会创建
  28. String queueName = "queue_dropmsg_2023";
  29. channel.queueDeclare(queueName, false, false, false, null);
  30. // 7、绑定 Exchange 和 Queue, 接收 routingKey = "info" 的消息
  31. channel.queueBind(queueName, exchangeName, "key_2023");
  32. // 8、消费消息
  33. Channel finalChannel = channel;
  34. DeliverCallback callback = new DeliverCallback() {
  35. public void handle(String s, Delivery delivery) throws IOException {
  36. // 接收消息
  37. System.out.println("这是接收的消息:" + new String(delivery.getBody()));
  38. // TODO 消费者正确消费消息之后,主动返回 ACK 标识
  39. finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
  40. }
  41. };
  42. // TODO 这第二个参数修改为 false,表示消费者需要手动发送 ACK 标识给 RabbitMQ(默认是true)
  43. channel.basicConsume(queueName, false, callback, i->{});
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }

1.3、消息持久化(RabbitMQ)

(1)RabbitMQ丢失消息

上面介绍了两种丢失消息的情况,分别是生产者和消费者丢失消息,还有一种丢失消息的情况,那就是RabbitMQ消息队列将消息丢失了。假设,现在存在这一种情况,生产者已经正确将消息发送到RabbitMQ里面,正准备将消息发送给消费者的时候,此时RabbitMQ服务宕机了,导致RabbitMQ中的消息丢失了(默认情况下,RabbitMQ是将消息保存在内存中的),由于内存中的数据断电即失,所以这就导致消息丢失情况。

如何解决RabbitMQ出现的消息丢失问题呢???

  • 既然RabbitMQ是将消息保存在内存中的,那么为了避免消息丢失,可以将内存中的消息保存到磁盘文件里面,这样即使RabbitMQ宕机了,重新启动的时候也可以从磁盘文件里面读取消息到内存里面。

(2)消息持久化机制

  • 在调用【queueDeclare()】方法,创建Queue队列的时候,设置第二个参数等于【true】,表示消息允许持久化。
  • 生产者调用【basicPublish()】方法发送消息的时候,设置消息属性等于【MessageProperties.PERSISTENT_TEXT_PLAIN】,表示文本持久化。
  1. // 第二个参数设置为true,表示开启持久化消息
  2. channel.queueDeclare("Queue队列名称", true, false, false, null);
  3. // 生产者发送消息时候,设置消息属性是文本持久化
  4. channel.basicPublish("Exchange交换机名称", "Queue队列名称", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

到此,RabbitMQ消息队列防止消息丢失的三种方式介绍完啦。

综上,这篇文章结束了,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/597453
推荐阅读
相关标签
  

闽ICP备14008679号