赞
踩
@RabbitListener
注解,对应的方法便可以消费MQ的消息?
核心思想:
读取注解配置时机:创建bean对象,执行BeanPostProcessor
的postProcessAfterInitialization
方法时,将bean对象及其方法的注解配置读取到缓存中。
//bean执行BeanPostProcessor的方法
public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName) throws BeansException {
Object result = existingBean;
for (BeanPostProcessor processor: getBeanPostProcessors()) {
Object current = processor.postProcessAfterInitialization(result, beanName);
if (current == null) {
return result;
}
result = current;
}
return result;
}
会被org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor
处理,解析注解配置。
RabbitListenerAnnotationBeanPostProcessor
类被@EnableRabbit
注解加入到本项目的Spring容器中,所以若想MQ的注解生效,项目启动类需要使用@EnableRabbit
注解。
注:
RabbitListenerAnnotationBeanPostProcessor
处理后并不会生成代理对象,这个处理器仅仅是解析注解。
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { //获取到目标对象(作为Map缓存的key) Class < ?>targetClass = AopUtils.getTargetClass(bean); //在Map中若取不到,那么执行buildMetadata()方法生成。 final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this: :buildMetadata); //第一层是获取到ListenerMethod对象(见下文) for (ListenerMethod lm: metadata.listenerMethods) { //一个方法上可能有多个注解,于是循环多个注解配置。 for (RabbitListener rabbitListener: lm.annotations) { processAmqpListener(rabbitListener, lm.method, bean, beanName); } } if (metadata.handlerMethods.length > 0) { processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName); } return bean; }
private TypeMetadata buildMetadata(Class <?>targetClass) { //读取目标类的注解 Collection <RabbitListener> classLevelListeners = findListenerAnnotations(targetClass); final boolean hasClassLevelListeners = classLevelListeners.size() > 0; final List <ListenerMethod> methods = new ArrayList <>(); final List <Method> multiMethods = new ArrayList <>(); //遍历该类上满足USER_DECLARED_METHODS条件的方法(用户定义的方法) ReflectionUtils.doWithMethods(targetClass, method - >{ //解析方法上的注解 Collection <RabbitListener> listenerAnnotations = findListenerAnnotations(method); if (listenerAnnotations.size() > 0) { //放入到List集合中 methods.add(new ListenerMethod(method, listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()]))); } //若是类上有@RabbitListener注解,那么取解析@RabbitHandler注解 if (hasClassLevelListeners) { RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class); if (rabbitHandler != null) { multiMethods.add(method); } } }, ReflectionUtils.USER_DECLARED_METHODS); if (methods.isEmpty() && multiMethods.isEmpty()) { return TypeMetadata.EMPTY; } //返回对象 return new TypeMetadata(methods.toArray(new ListenerMethod[methods.size()]), multiMethods.toArray(new Method[multiMethods.size()]), classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
而TypeMetadata实际上是多个集合的对象
private static class TypeMetadata {
//方法上带有@RabbitListener
final ListenerMethod[] listenerMethods; // NOSONAR
//方法上带有@RabbitHandler
final Method[] handlerMethods; // NOSONAR
//类上带有@RabbitListener
final RabbitListener[] classAnnotations; // NOSONAR
static final TypeMetadata EMPTY = new TypeMetadata();
....
}
而ListenerMethod对象就是保存了Method对象和上面的注解配置。
private static class ListenerMethod {
final Method method; // NOSONAR
final RabbitListener[] annotations; // NOSONAR
...
}
MethodRabbitListenerEndpoint
保存了方法信息和注解配置信息的对象(可以看做临时对象)
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
//检查是否是JDK代理,若是JDK代理是否实现接口。
Method methodToUse = checkProxy(method, bean);
//创建端点对象
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
//填充方法对象
endpoint.setMethod(methodToUse);
//填充端点对象
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}
注意,registrar
在属性上new RabbitListenerEndpointRegistrar()
创建的。
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean, Object adminTarget, String beanName) {
endpoint.setBean(bean);
...//填充配置的set方法
resolveAdmin(endpoint, rabbitListener, adminTarget);
RabbitListenerContainerFactory < ?>factory = resolveContainerFactory(rabbitListener, adminTarget, beanName);
//端点信息进行注册
this.registrar.registerEndpoint(endpoint, factory);
}
对应的类RabbitListenerEndpointRegistrar
public void registerEndpoint(RabbitListenerEndpoint endpoint, @Nullable RabbitListenerContainerFactory < ?>factory) { Assert.notNull(endpoint, "Endpoint must be set"); Assert.hasText(endpoint.getId(), "Endpoint id must be set"); Assert.state(!this.startImmediately || this.endpointRegistry != null, "No registry available"); // Factory may be null, we defer the resolution right before actually creating the container AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory); synchronized(this.endpointDescriptors) { //是否注册的时候立即启动? if (this.startImmediately) { // Register and start immediately this.endpointRegistry.registerListenerContainer(descriptor.endpoint, // NOSONAR never null resolveContainerFactory(descriptor), true); } else { //不是立即启动,那么放入到List中 this.endpointDescriptors.add(descriptor); } } }
注意:RabbitListenerEndpointRegistrar
实现了InitializingBean
接口,在bean创建中会执行回调方法afterPropertiesSet()
。SpringBoot2.x基础篇—Bean的生命周期方法(与容器耦合)
上面说到,注册时因为不是立即启动,将descriptor
存放到了List中,而是回调方法中统一进行处理。
@Override public void afterPropertiesSet() { registerAllEndpoints(); } //注册所有端点 protected void registerAllEndpoints() { Assert.state(this.endpointRegistry != null, "No registry available"); synchronized(this.endpointDescriptors) { for (AmqpListenerEndpointDescriptor descriptor: this.endpointDescriptors) { //创建监听容器 this.endpointRegistry.registerListenerContainer( // NOSONAR never null descriptor.endpoint, resolveContainerFactory(descriptor)); } this.startImmediately = true; // trigger immediate startup } }
对象信息:RabbitListenerEndpointRegistry
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory < ?>factory) {
//立即启动为false
registerListenerContainer(endpoint, factory, false);
}
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory < ?>factory, boolean startImmediately) { Assert.notNull(endpoint, "Endpoint must not be null"); Assert.notNull(factory, "Factory must not be null"); String id = endpoint.getId(); Assert.hasText(id, "Endpoint id must not be empty"); synchronized(this.listenerContainers) { Assert.state(!this.listenerContainers.containsKey(id), "Another endpoint is already registered with id '" + id + "'"); //核心方法。创建容器 MessageListenerContainer container = createListenerContainer(endpoint, factory); //核心操作:将容器放入到List中 this.listenerContainers.put(id, container); ...//根据groupId分组,不关心 if (startImmediately) { //默认false,不关心 startIfNecessary(container); } } }
上面说到MethodRabbitListenerEndpoint
可以看做临时对象(临时存储对象信息和注解配置信息)。目的就是创建监听对象,注意监听对象最终也是放入了list对象中。
RabbitMQ使用默认的
SimpleRabbitListenerContainerFactory
监听工厂。
AbstractRabbitListenerContainerFactory实现的方法:
将endpoint的配置信息存入SimpleMessageListenerContainer
对象中。
public C createListenerContainer(RabbitListenerEndpoint endpoint) { C instance = createContainerInstance(); JavaUtils javaUtils = JavaUtils.INSTANCE.acceptIfNotNull(this.connectionFactory, instance: :setConnectionFactory).acceptIfNotNull(this.errorHandler, instance: :setErrorHandler); if (this.messageConverter != null) { if (endpoint != null) { endpoint.setMessageConverter(this.messageConverter); if (endpoint.getMessageConverter() == null) { instance.setMessageConverter(this.messageConverter); } } else { instance.setMessageConverter(this.messageConverter); } } javaUtils.acceptIfNotNull(this.acknowledgeMode, instance: :setAcknowledgeMode).acceptIfNotNull(this.channelTransacted, instance: :setChannelTransacted).acceptIfNotNull(this.applicationContext, instance: :setApplicationContext).acceptIfNotNull(this.taskExecutor, instance: :setTaskExecutor).acceptIfNotNull(this.transactionManager, instance: :setTransactionManager).acceptIfNotNull(this.prefetchCount, instance: :setPrefetchCount).acceptIfNotNull(this.defaultRequeueRejected, instance: :setDefaultRequeueRejected).acceptIfNotNull(this.adviceChain, instance: :setAdviceChain).acceptIfNotNull(this.recoveryBackOff, instance: :setRecoveryBackOff).acceptIfNotNull(this.mismatchedQueuesFatal, instance: :setMismatchedQueuesFatal).acceptIfNotNull(this.missingQueuesFatal, instance: :setMissingQueuesFatal).acceptIfNotNull(this.consumerTagStrategy, instance: :setConsumerTagStrategy).acceptIfNotNull(this.idleEventInterval, instance: :setIdleEventInterval).acceptIfNotNull(this.failedDeclarationRetryInterval, instance: :setFailedDeclarationRetryInterval).acceptIfNotNull(this.applicationEventPublisher, instance: :setApplicationEventPublisher).acceptIfNotNull(this.autoStartup, instance: :setAutoStartup).acceptIfNotNull(this.phase, instance: :setPhase).acceptIfNotNull(this.afterReceivePostProcessors, instance: :setAfterReceivePostProcessors); if (endpoint != null) { if (endpoint.getAutoStartup() != null) { instance.setAutoStartup(endpoint.getAutoStartup()); } instance.setListenerId(endpoint.getId()); //绑定队列信息 endpoint.setupListenerContainer(instance); } if (instance.getMessageListener() instanceof AbstractAdaptableMessageListener) { AbstractAdaptableMessageListener messageListener = (AbstractAdaptableMessageListener) instance.getMessageListener(); javaUtils.acceptIfNotNull(this.beforeSendReplyPostProcessors, messageListener: :setBeforeSendReplyPostProcessors).acceptIfNotNull(this.retryTemplate, messageListener: :setRetryTemplate).acceptIfCondition(this.retryTemplate != null && this.recoveryCallback != null, this.recoveryCallback, messageListener: :setRecoveryCallback); } //个性化处理。 initializeContainer(instance, endpoint); if (this.containerConfigurer != null) { this.containerConfigurer.accept(instance); } return instance; }
方法上每一个@RabbitListener
注解都会创建一个SimpleMessageListenerContainer
容器,并放入到List集合中。
RabbitListenerEndpointRegistry对象的结构图:
注意RabbitListenerEndpointRegistry
接口实现了Lifecycle
类,即Spring容器初始化完毕,会执行start()方法。
SpringBoot2.x基础篇—Bean的生命周期方法(实现Lifecycle接口)
执行RabbitListenerEndpointRegistry#start()
方法,实际上是遍历所有的监听容器对象,执行监听容器的start()方法开启监听。
@Override
public void start() {
//bean创建完毕后,遍历存储ListenerContainer的集合,并且开启监听容器
for (MessageListenerContainer listenerContainer: getListenerContainers()) {
startIfNecessary(listenerContainer);
}
}
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
监听容器的start()方法:
//对应源码:org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#start
@Override public void start() {
....
try {
logger.debug("Starting Rabbit listener container.");
configureAdminIfNeeded();
checkMismatchedQueues();
//子类实现,开启监听容器
doStart();
} catch(Exception ex) {
throw convertRabbitAccessException(ex);
}
}
子类开启监听容器:
消费者线程一旦开启启动,那么便会一直去监听消息,并且去处理消息。
//org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#doStart @Override protected void doStart() { checkListenerContainerAware(); super.doStart(); synchronized(this.consumersMonitor) { if (this.consumers != null) { throw new IllegalStateException("A stopped container should not have consumers"); } //根据配置的concurrentConsumers参数,创建消费者并存储到Set中 int newConsumers = initializeConsumers(); ... Set <AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer> (); //根据配置的concurrentConsumers创建消费者线程 for (BlockingQueueConsumer consumer: this.consumers) { //创建消费者线程 AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer); processors.add(processor); //使用线程池去执行消费者线程 getTaskExecutor().execute(processor); if (getApplicationEventPublisher() != null) { getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer)); } } //等待消费者线程执行成功 waitForConsumersToStart(processors); } }
@RabbitListener生效的核心流程是:读取@RabbitListener
配置,创建SimpleMessageListenerContainer
对象。并且调用SimpleMessageListenerContainer
对象的start()方法,创建消费者线程并且启动。
AbstractMessageListenerContainer
接口实现了Lifecycle
接口,将其放入到Spring容器后,会执行生命周期的回调方法,即自动执行start()方法,开启队列监听。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。