当前位置:   article > 正文

rocketmq-spring-boot-starter导致的多消费者实例重复消费问题_springboot rocketmq 重复消费

springboot rocketmq 重复消费

问题描述

在使用rocketMQ集群模式消费的时候(通过springboot整合的依赖操作),发现在一台服务器启动多个消费者实例后,消息进行了重复消费,但是在不同的服务器部署消费者实例后,消息能正常消费。

背景知识

rocketMQ的消费模式有两种:

  1. 负载均衡模式 消费者采用负载均衡方式消费消息,多个消费者(服务启动多个,本地多个springboot加载类启动)共同消费队列消息,每个消费者处理的消息不同
  2. 广播模式 消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的

本地代码

依赖版本

    <properties>
        <rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
    </properties>
  • 1
  • 2
  • 3
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq-spring-boot-starter-version}</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

消费者:

package cn.lpf.mq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @auth lipf
 * @date 2021/1/16 16:27
 */
@Slf4j
@Component
//,messageModel = MessageModel.CLUSTERING
//,consumeMode = ConsumeMode.ORDERLY
@RocketMQMessageListener(consumerGroup =  "huanlv-group", topic = "miniapp_service_topic")
public class HotelServiceConsumer implements RocketMQListener<String> {

    private AtomicInteger count=new AtomicInteger(0);
    @Override
    public void onMessage(String message) {
        log.info("HotelServiceConsumer-->Receive message:{}",message);
        long start = System.currentTimeMillis();
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        log.info("HotelServiceConsumer-->handler message:{} ms,{} s",(end-start),(end-start)/1000);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

消息积压并重复消息

观察console客户端,如下:
在这里插入图片描述

原因分析

通过使用rocket客户端消费是没问题的:

/**
 * 消息的接受者
 */
public class BaseConsumer {

    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("huanlv-group");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("172.16.100.11:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("miniapp_service_topic", "*");

        //设定消费模式:负载均衡|广播模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
//        consumer.setMessageModel(MessageModel.BROADCASTING);

        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

所以这个时候怀疑是springboot整合rockemq的版本有问题,把版本有2.0.3换成了2.2.0问题解决掉,客户端注入ClientId规则发现变化,消息正常消费:

<properties>
        <rocketmq-spring-boot-starter-version>2.2.0</rocketmq-spring-boot-starter-version>
         </properties>
  • 1
  • 2
  • 3

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/550808
推荐阅读
相关标签
  

闽ICP备14008679号