赞
踩
在使用rocketMQ集群模式消费的时候(通过springboot整合的依赖操作),发现在一台服务器启动多个消费者实例后,消息进行了重复消费,但是在不同的服务器部署消费者实例后,消息能正常消费。
rocketMQ的消费模式有两种:
依赖版本
<properties>
<rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
</properties>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter-version}</version>
</dependency>
消费者:
package cn.lpf.mq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.util.concurrent.atomic.AtomicInteger; /** * @auth lipf * @date 2021/1/16 16:27 */ @Slf4j @Component //,messageModel = MessageModel.CLUSTERING //,consumeMode = ConsumeMode.ORDERLY @RocketMQMessageListener(consumerGroup = "huanlv-group", topic = "miniapp_service_topic") public class HotelServiceConsumer implements RocketMQListener<String> { private AtomicInteger count=new AtomicInteger(0); @Override public void onMessage(String message) { log.info("HotelServiceConsumer-->Receive message:{}",message); long start = System.currentTimeMillis(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); log.info("HotelServiceConsumer-->handler message:{} ms,{} s",(end-start),(end-start)/1000); } }
观察console客户端,如下:
通过使用rocket客户端消费是没问题的:
/** * 消息的接受者 */ public class BaseConsumer { public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("huanlv-group"); //2.指定Nameserver地址 consumer.setNamesrvAddr("172.16.100.11:9876"); //3.订阅主题Topic和Tag consumer.subscribe("miniapp_service_topic", "*"); //设定消费模式:负载均衡|广播模式 consumer.setMessageModel(MessageModel.CLUSTERING); // consumer.setMessageModel(MessageModel.BROADCASTING); //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者consumer consumer.start(); } }
所以这个时候怀疑是springboot整合rockemq的版本有问题,把版本有2.0.3换成了2.2.0问题解决掉,客户端注入ClientId规则发现变化,消息正常消费:
<properties>
<rocketmq-spring-boot-starter-version>2.2.0</rocketmq-spring-boot-starter-version>
</properties>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。