赞
踩
使用Spring-RocketMQ
时,只需要引入rocketmq-spring-boot-starter
包,并且定义以下消费者,就可以很简单的实现消息消费
@Component
@RocketMQMessageListener(topic = "first-topic", consumerGroup = "my-producer-group", selectorExpression = "tag1")
public class RocketMQConsumer implements RocketMQListener<String>{
@Override
public void onMessage(String message) {
System.out.println(message);
}
可以看到只需要添加@RocketMQMessageListener
注解,并实现RocketMQListener
接口就可以完成消息的接受、处理逻辑
@RocketMQMessageListener
实现原理可以看到在ListenerContainerConfiguration
中获取了所有加了RocketMQMessageListener
注解的bean
ListenerContainerConfiguration#afterSingletonsInstantiated
//ListenerContainerConfiguration实现了SmartInitializingSingleton接口,会在bean都实例化完之后,触发afterSingletonsInstantiated方法
@Override
public void afterSingletonsInstantiated() {
//获取所有加了RocketMQMessageListener注解的bean
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
//循环调用registerContainer方法
beans.forEach(this::registerContainer);
}
private void registerContainer(String beanName, Object bean) { ...... //拿到RocketMQMessageListener注解 RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); //获取注解上定义的consumerGroup String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup()); //获取注解上定义的topic String topic = this.environment.resolvePlaceholders(annotation.topic()); //定义beanName String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; //注册bean 调用createRocketMQListenerContainer初始化一些属性 genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { //调用start方法 container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); }
createRocketMQListenerContainer
里面就是初始化了DefaultRocketMQListenerContainer
这个对象,并且设置了一些消费相关的属性,比如nameServer
、topic
、tags
、consumerGroup
消费者组,rocketMQListener
我们定义的消费监听者等
可以看到这里面并没有定义具体的消费者实例
//DefaultRocketMQListenerContainer定义 实现了InitializingBean接口,在bean初始化的时候会调用afterPropertiesSet方法
public class DefaultRocketMQListenerContainer implements InitializingBean,
RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
@Override
public void afterPropertiesSet() throws Exception {
//通过方法名可以看到是初始化MQ消费者实例
initRocketMQPushConsumer();
this.messageType = getMessageType();
this.methodParameter = getMethodParameter();
log.debug("RocketMQ messageType: {}", messageType);
}
private void initRocketMQPushConsumer() throws MQClientException { ...... if (Objects.nonNull(rpcHook)) { //初始化DefaultMQPushConsumer对象 consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); consumer.setVipChannelEnabled(false); consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup)); } else { log.debug("Access-key or secret-key not configure in " + this + "."); consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); } //消息模式 广播还是集群 switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); break; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break; default: throw new IllegalArgumentException("Property 'messageModel' was wrong."); } //筛选方式 TAG和SQL92 switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break; default: throw new IllegalArgumentException("Property 'selectorType' was wrong."); } //消费模式 顺序和并发 switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly()); break; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently()); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } }
回到上面registerContainer
,最后拿到DefaultRocketMQListenerContainer
的bean,调用start
方法
DefaultRocketMQListenerContainer#start @Override public void start() { if (this.isRunning()) { throw new IllegalStateException("container already running. " + this.toString()); } try { //当前consumer就是上面分析的DefaultMQPushConsumer consumer.start(); } catch (MQClientException e) { throw new IllegalStateException("Failed to start RocketMQ push consumer", e); } this.setRunning(true); log.info("running container: {}", this.toString()); }
DefaultMQPushConsumer#start
@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
//可以看到调用的是defaultMQPushConsumerImpl.start()方法
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
defaultMQPushConsumerImpl
是什么时候初始化的呢
上面说到初始化DefaultMQPushConsumer
对象时,点进去构造方法
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) { this.consumerGroup = consumerGroup; this.namespace = namespace; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; //可以看到就在构造方法里面初始化的,通过名字可以猜想就是DefaultMQPushConsumer的实现类,但是并不是通过实现接口的方式,而是组合的方式 defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); if (enableMsgTrace) { try { AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); traceDispatcher = dispatcher; this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( new ConsumeMessageTraceHookImpl(traceDispatcher)); } catch (Throwable e) { log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); } } }
接口看DefaultMQPushConsumerImpl
的start
方法
DefaultMQPushConsumerImpl#start public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: //顺序消息 ConsumeMessageOrderlyService处理 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); //并发消息 ConsumeMessageConcurrentlyService处理 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start(); //调用start方法 mQClientFactory.start(); }
MQClientInstance#start public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service 可以看到这里开启拉取消息 this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
ServiceThread#start
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
//new了一个Thread对象,this表示自己就是一个Runnable对象
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
//调用start方法
this.thread.start();
}
//ServiceThread实现了Runnable接口,并且是抽象的,找实现类
public abstract class ServiceThread implements Runnable {
}
可以看到PullMessageService
和我们找的有关,找到它的run
方法
PullMessageService#run @Override public void run() { log.info(this.getServiceName() + " service started"); //通过while循环拉取消息 while (!this.isStopped()) { try { //消息存入LinkedBlockingQueue中,通过take方法阻塞获取 PullRequest pullRequest = this.pullRequestQueue.take(); //调用pullMessage处理消息 this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
private void pullMessage(final PullRequest pullRequest) {
//选择一个消费者实例
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
//转换为DefaultMQPushConsumerImpl对象,应该很熟悉吧
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
//调用pullMessage方法继续处理
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
public void pullMessage(final PullRequest pullRequest) { ...... PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { //调用consumeMessageService的submitConsumeRequest方法 //consumeMessageService上面提到过,包含顺序和并发消费 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); ...... } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }; }
ConsumeMessageOrderlyService#submitConsumeRequest
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
//ConsumeRequest是一个Runnable
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
//提交到线程池中处理
this.consumeExecutor.submit(consumeRequest);
}
}
来看ConsumeRequest
的run
方法
ConsumeMessageOrderlyService.ConsumeRequest#run
@Override
public void run() {
......
//核心在这里消费消息
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
}
DefaultRocketMQListenerContainer.DefaultMessageListenerOrderly#consumeMessage @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); //处理消息 handleMessage(messageExt); long costTime = System.currentTimeMillis() - now; log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}", messageExt, e); context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; }
DefaultRocketMQListenerContainer#handleMessage private void handleMessage( MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException { if (rocketMQListener != null) { //可以看到最终调用到onMessage方法,也就是开头我们实现的接口中的onMessage方法 rocketMQListener.onMessage(doConvertMessage(messageExt)); } else if (rocketMQReplyListener != null) { Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt)); Message<?> message = MessageBuilder.withPayload(replyContent).build(); org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message)); consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { if (sendResult.getSendStatus() != SendStatus.SEND_OK) { log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus()); } else { log.info("Consumer replies message success."); } } @Override public void onException(Throwable e) { log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage()); } }); } }
至此整个流程也就通了
@RocketMQMessageListener
相当于定义一个消费者,topic、consumerGroup、selectorExpression、consumeMode、messageModel
定义了消费者的一些属性
实现RocketMQListener
接口来处理具体消费逻辑
每个消费者初始化了一个DefaultRocketMQListenerContainer
对象,该对象中包含消费实例和消费者的属性
服务启动的时候开启一个线程轮训队列中的消息,如果没有就一直阻塞,拿到消息后,最终会调用自己实现的onMessage
方法
万字长文深度剖析 RocketMQ 设计原理
浅谈如何解决RocketMQ消息堆积的问题
RocketMQ的顺序消息(顺序消费)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。