当前位置:   article > 正文

别让消息再来一次!RocketMQ的幂等性解决方案揭秘_rocketmq如何保证消息幂等性

rocketmq如何保证消息幂等性

在这里插入图片描述

知识储备

幂等性简介

幂等性是指对同一操作进行多次执行,结果与仅执行一次的效果相同。简单来说,无论执行多少次,结果都是一致的。

在消息系统中,幂等性非常重要。当涉及到消息的发送、处理和消费时,保持消息的幂等性可以确保系统在面对重复消息时不会产生副作用或不一致的结果。

具体来说,实现幂等性需要满足以下条件:

  1. 相同输入产生相同的输出:对于给定的输入,无论执行多少次,结果始终相同。

  2. 操作具有唯一标识:每个操作都应该有一个唯一的标识,用于在系统中进行跟踪和区分。

在消息处理过程中,通过使用唯一标识对消息进行识别和去重,可以避免重复处理相同的消息。这样,即使消息被重复消费或多次处理,最终的结果也与仅处理一次的情况相同。

确保消息的幂等性对于保证系统的可靠性和数据一致性非常重要。无论是在消息队列、分布式系统还是网络通信中,都需要考虑并实现相关的幂等性机制,以应对可能出现的消息重复问题。

RocketMQ消息重复消费原因

RocketMQ消息重复消费可能由以下几种原因引起:

  1. 客户端重复发送:在某些情况下,客户端可能由于网络问题或其他原因导致消息发送失败,然后再次尝试发送相同的消息。这将导致消息在 RocketMQ 中重复消费。

  2. 重试机制:RocketMQ 提供了消息重试机制,如果消息在消费过程中发生异常,消费者可能会重新拉取相同的消息进行重试。如果消费者的处理逻辑没有处理幂等性(即多次处理不会产生不一致结果),则消息可能会重复消费。

  3. 消息消费者宕机和重启:当消息消费者由于宕机、重启或其他原因停止消费一段时间后再次启动时,它可能会重新从上次的消费位置开始消费。如果消息消费者没有正确维护消费进度或消费进度存储出现问题,就可能导致消息被重复消费。

  4. 集群模式下的消息重复消费:在 RocketMQ 的集群模式下,如果多个消费者组订阅相同的主题,并且每个消费者组都独立消费消息,那么同一条消息就有可能被不同的消费者组重复消费。

避免消息重复消费措施

为了避免消息重复消费,可以采取以下措施:

1. 在发送端实现消息的幂等性

发送端应该保证消息的唯一性,可以在业务层面使用唯一的消息标识来避免重复发送相同的消息。

当在发送端实现消息的幂等性时,可以使用唯一的消息标识来确保消息的唯一性。以下是一个简单的代码案例,展示如何在Java中实现消息的幂等性:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQProducer {
    private static final String GROUP_NAME = "Your_Group_Name";
    private static final String NAMESRV_ADDR = "Your_Namesrv_Addr";
    private static final String TOPIC_NAME = "Your_Topic_Name";

    private static final String MESSAGE_ID_PREFIX = "MESSAGE_ID_"; // 消息ID前缀

    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer(GROUP_NAME);

        // 设置NameServer地址
        producer.setNamesrvAddr(NAMESRV_ADDR);

        // 启动生产者
        producer.start();

        try {
            // 构建消息内容
            String messageContent = "Your_Message_Content";
            byte[] messageBody = messageContent.getBytes(RemotingHelper.DEFAULT_CHARSET);

            // 构建消息实例
            Message message = new Message(TOPIC_NAME, messageBody);

            // 设置自定义的消息ID
            String messageId = MESSAGE_ID_PREFIX + System.currentTimeMillis();
            message.setKeys(messageId);

            // 发送消息
            SendResult sendResult = producer.send(message);
            
            // 根据发送结果判断是否发送成功
            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                System.out.println("消息发送成功");
            } else {
                System.out.println("消息发送失败");
            }
        } finally {
            // 关闭生产者实例
            producer.shutdown();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

在上述代码中,通过设置自定义的消息ID来实现消息的幂等性。每次发送消息时,生成一个唯一的消息ID,并将其设置到消息的Keys属性中。这样,相同的消息在重复发送时会被识别为同一条消息,从而避免重复消费。

请注意,以上代码仅为演示如何在发送端实现消息的幂等性,具体的实现方式可能因业务需求而异。在实际应用中,可以根据实际情况选择更精细的幂等性策略,例如使用数据库记录消息状态或使用分布式锁等。

2. 消费端实现消息的幂等性

消费者应该保证消费逻辑的幂等性,即多次消费不会产生副作用。可以通过在消费者端记录已经处理过的消息标识来判断是否重复消费。

在消费端实现消息的幂等性通常涉及到对消息的处理结果进行记录,以防止相同消息的重复处理。以下是一个简单的Java代码案例,展示如何在RocketMQ消费端实现消息的幂等性:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class RocketMQConsumer {
    private static final String GROUP_NAME = "Your_Group_Name";
    private static final String NAMESRV_ADDR = "Your_Namesrv_Addr";
    private static final String TOPIC_NAME = "Your_Topic_Name";
    private static final Set<String> processedMessageIds = new HashSet<>(); // 记录已处理的消息ID

    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_NAME);

        // 设置NameServer地址
        consumer.setNamesrvAddr(NAMESRV_ADDR);

        // 订阅要消费的Topic和Tag
        consumer.subscribe(TOPIC_NAME, "*");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // 获取消息ID
                    String messageId = message.getMsgId();

                    // 判断消息是否已处理过
                    if (processedMessageIds.contains(messageId)) {
                        // 如果已处理过,则直接返回成功
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }

                    // 处理消息的业务逻辑
                    boolean success = processMessage(message);

                    // 根据处理结果更新记录
                    if (success) {
                        processedMessageIds.add(messageId);
                    }
                }
                
                // 返回成功消费标识
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();

        System.out.println("消费者已启动");
    }

    private static boolean processMessage(MessageExt message) {
        // 实现消息的业务处理逻辑
        // ...

        // 返回处理结果(根据实际情况进行判断)
        return true;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

在上述代码中,通过维护一个记录已处理消息ID的Set集合(processedMessageIds)来实现消息的幂等性。每次消费消息时,首先获取消息的ID,然后判断该ID是否出现在已处理消息集合中。如果存在,则说明该消息已经被处理过,直接返回成功。如果不存在,则进行消息的业务处理逻辑,并根据处理结果更新记录。

请注意,以上代码仅为演示如何在消费端实现消息的幂等性,具体的实现方式可能因业务需求而异。在实际应用中,可以根据实际情况选择更精细的幂等性策略,例如使用数据库记录消息状态或使用分布式锁等。另外,还应该考虑消息的顺序性和并发性问题,以及如何处理消息处理失败的情况。

3. 合理设置消费进度存储策略

确保消费进度得到正确存储并能够恢复,避免消费者宕机或重启后从错误的位置开始消费。

为了避免RocketMQ消息的重复消费,可以合理设置消费进度存储策略。消费进度存储策略用于记录消费者已经消费到的消息的位置信息,以便在消费者发生重启或者异常停止后,能够从上次消费的位置继续进行消费,避免重复消费。下面是一个简单的Java代码案例,展示如何在RocketMQ中使用内存存储策略来存储消费进度:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class RocketMQConsumer {
    private static final String GROUP_NAME = "Your_Group_Name";
    private static final String NAMESRV_ADDR = "Your_Namesrv_Addr";
    private static final String TOPIC_NAME = "Your_Topic_Name";

    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_NAME);

        // 设置NameServer地址
        consumer.setNamesrvAddr(NAMESRV_ADDR);

        // 设置消费者从上次消费的位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        // 设置消费进度存储类型为内存存储
        consumer.setOffsetStore(new MemoryOffsetStore());

        // 订阅要消费的Topic和Tag
        consumer.subscribe(TOPIC_NAME, "*");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // 处理消息的业务逻辑
                    processMessage(message);
                }
                
                // 返回成功消费标识
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();

        System.out.println("消费者已启动");
    }

    private static void processMessage(MessageExt message) {
        // 实现消息的业务处理逻辑
        // ...
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

在上述代码中,我们通过自定义一个内存存储策略(MemoryOffsetStore)来设置消费进度的存储类型。内存存储策略将消费进度数据保存在消费者的内存中,这样就不需要使用磁盘文件来存储消费进度。通过设置consumer.setOffsetStore(new MemoryOffsetStore())来启用该存储策略。

另外,通过consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET)设置消费者从上次消费的位置开始消费,这样在消费者重启后会根据记录的消费进度进行继续消费,避免了重复消费的问题。

请注意,上述代码中的存储策略仅仅是一种简单的示例。在实际生产环境中,可以根据需求选择适合的消费进度存储策略,例如使用数据库、文件存储或者自定义存储等,以确保消息的幂等性和顺序性。

4. 集群模式下设置不同的消费者组

如果多个消费者组订阅相同的主题,确保每个消费者组都有独立的消费进度,避免重复消费。

为了避免RocketMQ消息的重复消费,在集群模式下可以设置不同的消费者组。通过设置不同的消费者组,每个消费者组内的消费者实例会共同负责消费一部分消息,这样可以实现消息的负载均衡和并行处理,同时避免消息重复消费。下面是一个简单的Java代码案例,展示如何在RocketMQ中设置不同的消费者组:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class RocketMQConsumer {
    private static final String GROUP_NAME1 = "Consumer_Group_1";
    private static final String GROUP_NAME2 = "Consumer_Group_2";
    private static final String NAMESRV_ADDR = "Your_Namesrv_Addr";
    private static final String TOPIC_NAME = "Your_Topic_Name";

    public static void main(String[] args) throws Exception {
        // 创建消费者实例 1
        DefaultMQPushConsumer consumer1 = new DefaultMQPushConsumer(GROUP_NAME1);

        // 设置NameServer地址
        consumer1.setNamesrvAddr(NAMESRV_ADDR);

        // 设置消费者从上次消费的位置开始消费
        consumer1.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        // 订阅要消费的Topic和Tag
        consumer1.subscribe(TOPIC_NAME, "*");

        // 注册消息监听器
        consumer1.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // 处理消息的业务逻辑
                    processMessage(message);
                }
                
                // 返回成功消费标识
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者 1
        consumer1.start();

        System.out.println("消费者 1 已启动");


        // 创建消费者实例 2
        DefaultMQPushConsumer consumer2 = new DefaultMQPushConsumer(GROUP_NAME2);

        // 设置NameServer地址
        consumer2.setNamesrvAddr(NAMESRV_ADDR);

        // 设置消费者从上次消费的位置开始消费
        consumer2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        // 订阅要消费的Topic和Tag
        consumer2.subscribe(TOPIC_NAME, "*");

        // 注册消息监听器
        consumer2.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // 处理消息的业务逻辑
                    processMessage(message);
                }
                
                // 返回成功消费标识
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者 2
        consumer2.start();

        System.out.println("消费者 2 已启动");
    }

    private static void processMessage(MessageExt message) {
        // 实现消息的业务处理逻辑
        // ...
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85

在上述代码中,我们创建了两个消费者实例,分别属于不同的消费者组(GROUP_NAME1GROUP_NAME2)。每个消费者实例都负责订阅相同的Topic和Tag,并注册相同的消息监听器。当有新的消息到达时,RocketMQ会将消息均匀地分发给不同的消费者组内的消费者实例进行处理。

通过设置不同的消费者组,RocketMQ能够保证在集群模式下,每个消费者组内的消费者实例只消费一部分消息,避免了重复消费的问题,并且能够实现消息的负载均衡和并行处理。在实际应用中,你可以根据需求设置更多的消费者组来进一步扩展消费能力。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/618512
推荐阅读
相关标签
  

闽ICP备14008679号