当前位置:   article > 正文

MQ如何确保消息的不丢失?_mq如何保证消息不丢失

mq如何保证消息不丢失

MQ如何确保消息的不丢失?

面试官心理剖析

当面试官问到关于如何确保消息不丢失的问题时,他们可能正在评估面试者对消息队列(MQ)的理解、故障处理机制以及他们如何设计健壮的系统来避免数据丢失。以下是可能的心理剖析:

  1. 基础理解:面试官想要确认面试者是否理解MQ的基本概念,如生产者、消费者、队列、消息持久性等。
  2. 故障处理:他们想要了解面试者是否有处理MQ可能遇到的故障的经验或策略。例如,当MQ服务器宕机、网络中断或磁盘空间不足时,如何确保消息不会丢失。
  3. 系统健壮性:面试官可能希望了解面试者如何确保整个系统的健壮性,特别是在面对失败时。他们可能希望听到关于冗余、备份、负载均衡和故障转移等策略的讨论。
  4. 经验:如果面试官了解到面试者曾经处理过与MQ相关的项目,他们可能会询问具体是如何确保消息不丢失的。这有助于评估面试者的实际经验和解决问题的能力。
  5. 长期策略:他们也可能想知道面试者是否有长期策略来防止消息丢失,例如,是否进行定期的测试和审查,或者是否有监控和警报系统来及时发现问题。
  6. 思维逻辑:面试官可能还会通过这个问题来评估面试者的逻辑思维和解决问题的能力。他们可能希望看到面试者能够有条不紊地分析问题,并提出有效的解决方案。

总的来说,面试官问这个问题是为了了解面试者对于确保消息不丢失的深入理解和实际经验,以及他们如何设计和维护一个健壮的、能够处理各种故障的系统。

MQ实现策略(以RabbitMQ为例)

消息持久化

RabbitMQ 默认将消息存储在内存中,但可以通过配置来实现消息的持久化。这包括将 Exchange、Queue 和 Message 都设置为持久化。当设置了持久化后,即使 RabbitMQ 节点重启或发生崩溃,消息也不会丢失,而是会重新加载到内存中。

  • Exchange 持久化:在声明 Exchange 时,将 durable 参数设置为 true。
  • Queue 持久化:在声明 Queue 时,同样将 durable 参数设置为 true。
  • Message 持久化:在发送消息时,设置消息的 deliveryMode 属性为 2,表示消息是持久的。

示例如下:

<dependency>  
    <groupId>com.rabbitmq</groupId>  
    <artifactId>amqp-client</artifactId>  
    <version>5.13.1</version> <!-- 请检查并使用最新版本 -->  
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

然后,可以使用以下Java代码来创建一个持久的Exchange、Queue,并发送一个持久的消息:

import com.rabbitmq.client.AMQP;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
import java.io.IOException;  
import java.util.HashMap;  
import java.util.Map;  
  
public class RabbitMQPersistentMessageExample {  
  
    private final static String QUEUE_NAME = "persistent_queue";  
    private final static String EXCHANGE_NAME = "persistent_exchange";  
  
    public static void main(String[] argv) throws IOException, TimeoutException {  
        // 创建连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost"); // 设置RabbitMQ服务器地址  
        try (Connection connection = factory.newConnection();  
             Channel channel = connection.createChannel()) {  
              
            // 声明一个持久的direct exchange  
            Map<String, Object> args = new HashMap<>();  
            args.put("x-durable", true);  
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, args);  
  
            // 声明一个持久的queue  
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
  
            // 将queue绑定到exchange,并指定一个routing key  
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing_key");  
  
            // 发送一个持久的消息  
            String message = "Hello World!";  
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()  
                    .deliveryMode(2) // 设置消息为持久化  
                    .build();  
            channel.basicPublish(EXCHANGE_NAME, "routing_key", properties, message.getBytes("UTF-8"));  
            System.out.println(" [x] Sent '" + message + "'");  
        }  
    }  
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

在这个例子中,做了以下几件事情:

  1. 创建了一个ConnectionFactory实例,并设置了RabbitMQ服务器的地址。
  2. 使用ConnectionFactory创建了一个Connection和Channel。
  3. 通过channel.exchangeDeclare方法声明了一个持久的Exchange,并设置了x-durable参数为true。
  4. 通过channel.queueDeclare方法声明了一个持久的Queue,将durable参数设置为true。
  5. 使用channel.queueBind方法将Queue绑定到Exchange,并指定了一个routing key。
  6. 创建一个AMQP.BasicProperties对象,设置deliveryMode为2,表示消息是持久的。
  7. 使用channel.basicPublish方法发送了一个持久的消息。

确保RabbitMQ服务器正在运行,并且的应用程序有权限连接到服务器和指定的Exchange/Queue。运行这段代码后,即使RabbitMQ服务器重启,之前声明的持久化Exchange和Queue以及发送的消息也不会丢失。

ACK 确认机制

RabbitMQ 支持消息确认机制,即消费者在处理完消息后会向 RabbitMQ 发送一个确认消息(ACK)。RabbitMQ 只有在收到消费者的确认消息后,才会将该消息从队列中删除。如果消费者在处理消息时发生异常或宕机,RabbitMQ 会认为该消息未被正确处理,因此会重新将该消息发送到其他消费者进行处理,从而确保消息不会丢失。 下面是一个使用Java的RabbitMQ客户端库来演示消息确认机制的例子。这个例子使用了手动确认模式(manual acknowledgment mode),这意味着消费者需要显式地发送确认信号。 首先,确保的项目中已经添加了RabbitMQ的Java客户端依赖。如果使用Maven,可以在pom.xml文件中添加以下依赖:

<dependency>  
    <groupId>com.rabbitmq</groupId>  
    <artifactId>amqp-client</artifactId>  
    <version>5.13.1</version> <!-- 请检查并使用最新版本 -->  
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

然后,可以使用以下Java代码来创建一个消费者,并演示手动确认消息的过程:

import com.rabbitmq.client.*;  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
public class RabbitMQConsumerWithAck {  
  
    private final static String QUEUE_NAME = "task_queue";  
  
    public static void main(String[] argv) throws IOException, TimeoutException {  
        // 创建连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost"); // 设置RabbitMQ服务器地址  
        try (Connection connection = factory.newConnection();  
             Channel channel = connection.createChannel()) {  
              
            // 声明一个队列  
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  
            // 创建消费者  
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {  
                String message = new String(delivery.getBody(), "UTF-8");  
                System.out.println(" [x] Received '" + message + "'");  
  
                // 模拟处理消息的时间  
                try {  
                    Thread.sleep(2000);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
  
                // 发送确认信号  
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
                System.out.println(" [x] Done");  
            };  
  
            // 开始消费消息,并指定手动确认模式  
            boolean autoAck = false;  
            channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });  
        }  
    }  
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

在这个例子中,做了以下几件事情:

  1. 创建了一个ConnectionFactory实例,并设置了RabbitMQ服务器的地址。
  2. 使用ConnectionFactory创建了一个Connection和Channel。
  3. 声明了一个队列,用于接收消息。
  4. 定义了一个DeliverCallback,这是当消费者从队列中接收到消息时会调用的回调函数。
  5. 在回调函数中,处理消息(在这个例子中只是简单地将消息打印到控制台,并模拟处理时间),然后发送一个确认信号(channel.basicAck)给RabbitMQ,表示该消息已经被成功处理。
  6. 在basicConsume方法中,将autoAck参数设置为false,表示将使用手动确认模式。这意味着RabbitMQ不会自动确认消息,而是等待显式地发送确认信号。

运行这个消费者程序后,它将开始从队列中接收消息,并在处理完每条消息后发送一个确认信号。如果消费者在处理消息时崩溃或抛出异常,那么RabbitMQ将不会收到确认信号,它会认为该消息没有被成功处理,并会将该消息重新放入队列中,等待其他消费者重新尝试处理它。

死信队列:

在RabbitMQ中,死信队列(Dead-Letter-Exchange,简称DLX)是一个特殊的队列,当消息在队列中变成死信(dead message)后,会被重新路由到这个死信队列中。消息变成死信的条件有以下几种:

  1. 消息被拒绝(basic.reject 或 basic.nack),并且设置requeue=false。
  2. 消息在队列中的TTL(Time-To-Live)过期。
  3. 队列达到最大长度,消息无法入队。

下面是一个使用Java的RabbitMQ客户端库来演示死信队列的例子: 首先,需要配置RabbitMQ来启用死信队列。这通常通过交换机、队列和绑定来实现。

java复制代码



import com.rabbitmq.client.*;  
  
import java.io.IOException;  
import java.util.HashMap;  
import java.util.Map;  
  
public class RabbitMQDeadLetterExample {  
  
    private static final String EXCHANGE_NAME = "dlx_exchange";  
    private static final String QUEUE_NAME = "dlx_queue";  
    private static final String DEAD_LETTER_EXCHANGE_NAME = "dead_letter_exchange";  
    private static final String DEAD_LETTER_QUEUE_NAME = "dead_letter_queue";  
    private static final String ROUTING_KEY = "dlx.key";  
  
    public static void main(String[] argv) throws IOException, TimeoutException {  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        try (Connection connection = factory.newConnection();  
             Channel channel = connection.createChannel()) {  
  
            // 创建交换机  
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);  
  
            // 创建死信交换机  
            channel.exchangeDeclare(DEAD_LETTER_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);  
  
            // 创建队列,并设置死信交换机  
            Map<String, Object> args = new HashMap<>();  
            args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);  
            channel.queueDeclare(QUEUE_NAME, false, false, false, args);  
  
            // 绑定队列和交换机  
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);  
  
            // 创建死信队列  
            channel.queueDeclare(DEAD_LETTER_QUEUE_NAME, false, false, false, null);  
  
            // 绑定死信队列和死信交换机  
            channel.queueBind(DEAD_LETTER_QUEUE_NAME, DEAD_LETTER_EXCHANGE_NAME, ROUTING_KEY);  
  
            // 发送消息到交换机  
            String message = "Hello, this is a dead letter message!";  
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes("UTF-8"));  
            System.out.println(" [x] Sent '" + message + "'");  
  
            // 接收死信队列中的消息  
            DeliverCallback deadLetterCallback = (consumerTag, delivery) -> {  
                String deadLetterMessage = new String(delivery.getBody(), "UTF-8");  
                System.out.println(" [x] Received dead letter message: '" + deadLetterMessage + "'");  
            };  
  
            boolean autoAck = true; // 根据实际情况,可能希望设置为false以进行手动确认  
            channel.basicConsume(DEAD_LETTER_QUEUE_NAME, autoAck, deadLetterCallback, consumerTag -> { });  
        }  
    }  
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

在这个例子中,做了以下几件事情:

  1. 创建了一个普通交换机EXCHANGE_NAME和一个队列QUEUE_NAME,并设置队列的x-dead-letter-exchange参数为死信交换机DEAD_LETTER_EXCHANGE_NAME。
  2. 创建了一个死信交换机DEAD_LETTER_EXCHANGE_NAME和一个死信队列DEAD_LETTER_QUEUE_NAME,并将死信队列绑定到死信交换机上。
  3. 发送了一条消息到普通交换机,路由键为dlx.key,该消息会路由到队列QUEUE_NAME。
  4. 由于队列QUEUE_NAME设置了死信交换机,如果这条消息被拒绝(requeue=false)或者过期,它将被路由到死信交换机DEAD_LETTER_EXCHANGE_NAME,进而被路由到死信队列DEAD_LETTER_QUEUE_NAME。
  5. 设置了一个消费者来接收死信队列中的消息,并打印出来。

要模拟消息变成死信的情况,可以:

  • 在消费者中拒绝消息,并设置requeue=false。
  • 或者给队列设置TTL,

消息重试机制

在某些情况下,消费者可能会因为某些原因暂时无法处理消息,但稍后可能能够恢复处理。为了避免这种情况下的消息丢失,RabbitMQ 支持消息重试机制。当消费者处理消息失败时,可以将该消息重新发送到队列中,稍后再次尝试处理。 在Java中,实现消息重试机制通常涉及到处理可能失败的操作,并在操作失败时提供重试逻辑。这种机制在多种场景下都很有用,例如网络请求、数据库操作或消息队列处理。 下面是一个简单的例子,展示了如何在Java中实现一个消息重试机制: 假设有一个发送消息到消息队列的方法,这个方法可能会因为各种原因(如网络中断、服务不可用等)而失败。可以使用一个简单的循环来实现重试逻辑:

java复制代码



public class MessageSender {  
  
    private static final int MAX_RETRIES = 5; // 最大重试次数  
    private static final long RETRY_DELAY_MS = 1000; // 每次重试之间的延迟(毫秒)  
  
    public void sendMessage(String message) {  
        int retries = 0;  
        while (retries < MAX_RETRIES) {  
            try {  
                // 尝试发送消息  
                sendToMessageQueue(message);  
                // 如果发送成功,则退出循环  
                break;  
            } catch (Exception e) {  
                // 发送失败,处理异常  
                System.err.println("Failed to send message: " + e.getMessage());  
                retries++;  
                  
                // 如果重试次数达到最大值,则抛出异常  
                if (retries == MAX_RETRIES) {  
                    throw new RuntimeException("Failed to send message after " + MAX_RETRIES + " retries.", e);  
                }  
                  
                // 等待一段时间后重试  
                try {  
                    Thread.sleep(RETRY_DELAY_MS);  
                } catch (InterruptedException ie) {  
                    Thread.currentThread().interrupt();  
                    throw new RuntimeException("Thread was interrupted during retry delay.", ie);  
                }  
            }  
        }  
    }  
  
    private void sendToMessageQueue(String message) throws Exception {  
        // 这里是发送消息到消息队列的实际代码  
        // 可能会抛出异常,如网络异常、服务不可用等  
        System.out.println("Sending message: " + message);  
        // 模拟可能的失败情况  
        if (Math.random() < 0.1) {  
            throw new RuntimeException("Simulated exception for retry.");  
        }  
    }  
  
    public static void main(String[] args) {  
        MessageSender sender = new MessageSender();  
        try {  
            sender.sendMessage("Hello, this is a test message.");  
        } catch (Exception e) {  
            System.err.println("Could not send message due to exception: " + e.getMessage());  
        }  
    }  
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

在这个例子中,sendMessage 方法尝试发送消息到消息队列。如果发送操作失败(在这个例子中,通过抛出异常来模拟失败情况),该方法会捕获异常,并在达到最大重试次数之前等待一段时间后再次尝试。如果所有重试都失败了,它会抛出一个运行时异常。 在实际应用中,可能需要根据具体的业务逻辑来调整重试策略,例如使用指数退避策略来逐步增加重试之间的延迟,或者根据异常类型来决定是否重试。此外,如果消息发送是异步的,可能需要使用CompletableFuture、RxJava或其他异步处理框架来管理重试逻辑。

镜像队列

在 RabbitMQ 的集群环境中,可以通过设置镜像队列来确保消息的不丢失。镜像队列会在多个节点上创建副本,当主节点发生故障时,其他节点可以接管处理消息,从而保证消息的高可用性。

综上所述,RabbitMQ 通过消息持久化、ACK 确认机制、死信队列、消息重试机制和镜像队列等多种机制来确保消息的不丢失。开发者可以根据具体的需求和场景选择合适的机制来保障消息的可靠性。

面试话术

RabbitMQ针对消息传递过程中可能发生问题的各个地方,给出了针对性的解决方案:

  • RabbitMQ提供了publisher confirm机制

    - 生产者发送消息后,可以编写ConfirmCallback函数
    - 消息成功到达交换机后,RabbitMQ会调用ConfirmCallback通知消息的发送者,返回ACK
    - 消息如果未到达交换机,RabbitMQ也会调用ConfirmCallback通知消息的发送者,返回NACK
    - 消息超时未发送成功也会抛出异常
    
    • 1
    • 2
    • 3
    • 4

消息到达交换机后,如果未能到达队列,也会导致消息丢失:

  • RabbitMQ提供了publisher return机制
- 生产者可以定义ReturnCallback函数
- 消息到达交换机,未到达队列,RabbitMQ会调用ReturnCallback通知发送者,告知失败原因
  • 1
  • 2

消息到达队列后,MQ宕机也可能导致丢失消息:

  • RabbitMQ提供了持久化功能,集群的主从备份功能
- 消息持久化,RabbitMQ会将交换机、队列、消息持久化到磁盘,宕机重启可以恢复消息
- 镜像集群,仲裁队列,都可以提供主从备份功能,主节点宕机,从节点会自动切换为主,数据依然在
  • 1
  • 2

消息投递给消费者后,如果消费者处理不当,也可能导致消息丢失

  • SpringAMQP基于RabbitMQ提供了消费者确认机制、消费者重试机制,消费者失败处理策略:

    - 消费者的确认机制: 
       - 消费者处理消息成功,未出现异常时,Spring返回ACK给RabbitMQ,消息才被移除
       - 消费者处理消息失败,抛出异常,宕机,Spring返回NACK或者不返回结果,消息不被异常
    - 消费者重试机制: 
       - 默认情况下,消费者处理失败时,消息会再次回到MQ队列,然后投递给其它消费者。Spring提供的消费者重试机制,则是在处理失败后不返回NACK,而是直接在消费者本地重试。多次重试都失败后,则按照消费者失败处理策略来处理消息。避免了消息频繁入队带来的额外压力。
    - 消费者失败策略: 
       - 当消费者多次本地重试失败时,消息默认会丢弃。
       - Spring提供了Republish策略,在多次重试都失败,耗尽重试次
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/986102
推荐阅读
相关标签
  

闽ICP备14008679号