赞
踩
这篇文章,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。
目录
RabbitMQ消息队列,在使用的时候,可能会存在消息丢失的情况,所谓的消息丢失就是生产者发送的消息没办法被消费者正确的消费,消息队列中导致消息丢失的地方有三个,分别是:
生产者丢失消息,是指:当生产者发送消息给RabbitMQ的时候,此时消息发送失败了,并且生产者又没有重新发送这一条消息,所以这个时候,生产者这一条失败的消息就丢失了。
既然是生产者发送消息失败导致这一条消息丢失的,那么我们在处理这个丢失消息问题的时候,就可以这样做:当生产者消息发送失败之后,可以让生产者再次发送这一条消息,这里就有一个问题啦,那就是生产者怎么知道消息有没有发送成功???
RabbitMQ给我们提供了一个机制,即:发布确认机制,大致思想是:当生产者将消息发送到RabbitMQ之后,并且RabbitMQ正确接收到消息并将其放入Queue队列里面时,RabbitMQ会返回一个ACK标识给生产者,生产者接收到ACK标识就可以认为消息发送成功啦;如果消息接收失败,RabbitMQ会返回一个NACK标识,表示接收失败。
生产者消息确认机制,上一篇文章已经介绍了(【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式),这里就不再重复。
如果生产者已经将消息正确的发送到RabbitMQ里面了,消费者从Queue队列里面获取消息消费时候,如果消费失败,那么此时就会导致这一条消息丢失,这是因为,默认情况下,RabbitMQ将消息分发给消费者之后,消费者接收到消息时候,就会返回一个ACK标识给消息队列RabbitMQ,此时RabbitMQ就会将这一条消息从Queue队列里面删除,但是这种情况下,消费者是否正确将这条消息消费了,RabbitMQ是不知道的,所以这就有可能导致丢失。
如何解决消费者丢失消息???
- package com.rabbitmq.demo.dropmsg;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
-
- /**
- * @version 1.0.0
- * @Date: 2023/2/25 16:30
- * @Copyright (C) ZhuYouBin
- * @Description: 消息消费者
- */
- public class Consumer {
- public static void main(String[] args) {
- // 1、创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 2、设置连接的 RabbitMQ 服务地址
- factory.setHost("127.0.0.1"); // 默认就是本机
- factory.setPort(5672); // 默认就是 5672 端口
- // 3、获取连接
- Connection connection = null; // 连接
- Channel channel = null; // 通道
- try {
- connection = factory.newConnection();
- // 4、获取通道
- channel = connection.createChannel();
- // 5、声明 Exchange,如果不存在,则会创建
- String exchangeName = "exchange_dropmsg_2023";
- channel.exchangeDeclare(exchangeName, "direct");
- // 6、指定需要操作的消息队列,如果队列不存在,则会创建
- String queueName = "queue_dropmsg_2023";
- channel.queueDeclare(queueName, false, false, false, null);
- // 7、绑定 Exchange 和 Queue, 接收 routingKey = "info" 的消息
- channel.queueBind(queueName, exchangeName, "key_2023");
- // 8、消费消息
- Channel finalChannel = channel;
- DeliverCallback callback = new DeliverCallback() {
- public void handle(String s, Delivery delivery) throws IOException {
- // 接收消息
- System.out.println("这是接收的消息:" + new String(delivery.getBody()));
- // TODO 消费者正确消费消息之后,主动返回 ACK 标识
- finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
- }
- };
- // TODO 这第二个参数修改为 false,表示消费者需要手动发送 ACK 标识给 RabbitMQ(默认是true)
- channel.basicConsume(queueName, false, callback, i->{});
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
上面介绍了两种丢失消息的情况,分别是生产者和消费者丢失消息,还有一种丢失消息的情况,那就是RabbitMQ消息队列将消息丢失了。假设,现在存在这一种情况,生产者已经正确将消息发送到RabbitMQ里面,正准备将消息发送给消费者的时候,此时RabbitMQ服务宕机了,导致RabbitMQ中的消息丢失了(默认情况下,RabbitMQ是将消息保存在内存中的),由于内存中的数据断电即失,所以这就导致消息丢失情况。
如何解决RabbitMQ出现的消息丢失问题呢???
- // 第二个参数设置为true,表示开启持久化消息
- channel.queueDeclare("Queue队列名称", true, false, false, null);
-
- // 生产者发送消息时候,设置消息属性是文本持久化
- channel.basicPublish("Exchange交换机名称", "Queue队列名称", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
到此,RabbitMQ消息队列防止消息丢失的三种方式介绍完啦。
综上,这篇文章结束了,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。