赞
踩
前言
1.仓库地址
https://gitee.com/JiuLongBingShi/spring-rabbit-king.git
2.maven依赖
- <dependency>
- <groupId>com.king.springboot</groupId>
- <artifactId>king-rabbit</artifactId>
- <version>2.1.0</version>
- </dependency>
一、知识点
1、RabbitListenerEndpoint :
Endpoint为终端,像电脑、手机都是终端,他们都可以接受外部信息并响应,如手机来短信了就有提示。这里也用了终端的概念,例如:被@RabbitListener注解修饰方法也有终端的特点 可以接受外部信息并响应。
- /*
- * Copyright 2002-2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.springframework.amqp.rabbit.listener;
-
- import org.springframework.amqp.support.converter.MessageConverter;
- import org.springframework.lang.Nullable;
-
- /**
- * Model for a Rabbit listener endpoint. Can be used against a
- * {@link org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer
- * RabbitListenerConfigurer} to register endpoints programmatically.
- *
- * @author Stephane Nicoll
- * @author Gary Russell
- * @since 1.4
- */
- public interface RabbitListenerEndpoint {
-
- /**
- * @return the id of this endpoint. The id can be further qualified
- * when the endpoint is resolved against its actual listener
- * container.
- * @see RabbitListenerContainerFactory#createListenerContainer
- */
- String getId();
-
- /**
- * @return the group of this endpoint or null if not in a group.
- * @since 1.5
- */
- String getGroup();
-
- /**
- * @return the concurrency of this endpoint.
- * @since 2.0
- */
- String getConcurrency();
-
- /**
- * Override of the default autoStartup property.
- * @return the autoStartup.
- * @since 2.0
- */
- Boolean getAutoStartup();
-
- /**
- * Setup the specified message listener container with the model
- * defined by this endpoint.
- * <p>This endpoint must provide the requested missing option(s) of
- * the specified container to make it usable. Usually, this is about
- * setting the {@code queues} and the {@code messageListener} to
- * use but an implementation may override any default setting that
- * was already set.
- * @param listenerContainer the listener container to configure
- */
- void setupListenerContainer(MessageListenerContainer listenerContainer);
-
- /**
- * The preferred way for a container factory to pass a message converter
- * to the endpoint's adapter.
- * @param converter the converter.
- * @since 2.0.8
- */
- default void setMessageConverter(MessageConverter converter) {
- // NOSONAR
- }
- /**
- * Used by the container factory to check if this endpoint supports the
- * preferred way for a container factory to pass a message converter
- * to the endpoint's adapter. If null is returned, the factory will
- * fall back to the legacy method of passing the converter via the
- * container.
- * @return the converter.
- * @since 2.0.8
- */
- @Nullable
- default MessageConverter getMessageConverter() {
- return null;
- }
-
- }
这里的终端有很多种,刚才说到 『被@RabbitListener注解修饰方法也是终端』 就是 MethodRabbitListenerEnpoint 方法终端。
接口方法中 void setupListenerContainer(MessageListenerContainer listenerContainer) 方法,中的MessageListenerContainer就是用来接收并处理消息的。
在抽象类中AbstractRabbitListenerEndpoint对setupListenerContainer方法进了实现:
- @Override
- public void setupListenerContainer(MessageListenerContainer listenerContainer) {
- AbstractMessageListenerContainer container = (AbstractMessageListenerContainer) listenerContainer;
-
- boolean queuesEmpty = getQueues().isEmpty();
- boolean queueNamesEmpty = getQueueNames().isEmpty();
- if (!queuesEmpty && !queueNamesEmpty) {
- throw new IllegalStateException("Queues or queue names must be provided but not both for " + this);
- }
- if (queuesEmpty) {
- Collection<String> names = getQueueNames();
- container.setQueueNames(names.toArray(new String[names.size()]));
- }
- else {
- Collection<Queue> instances = getQueues();
- container.setQueues(instances.toArray(new Queue[instances.size()]));
- }
-
- container.setExclusive(isExclusive());
- if (getPriority() != null) {
- Map<String, Object> args = new HashMap<String, Object>();
- args.put("x-priority", getPriority());
- container.setConsumerArguments(args);
- }
-
- if (getAdmin() != null) {
- container.setAmqpAdmin(getAdmin());
- }
- //入口
- setupMessageListener(listenerContainer);
- }
- private void setupMessageListener(MessageListenerContainer container) {
- //
- //MessageListenerContainer为容器,容器是要装东西的,这里显然是装MessageListener。
- //createMessageListener(container)为抽象方法,用于子类覆盖。
- MessageListener messageListener = createMessageListener(container);
- Assert.state(messageListener != null, () -> "Endpoint [" + this + "] must provide a non null message listener");
- container.setupMessageListener(messageListener);
- }
子类MethodRabbitListenerEndpoint中覆盖了createMessageListener。
- @Override
- protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
- Assert.state(this.messageHandlerMethodFactory != null,
- "Could not create message listener - MessageHandlerMethodFactory not set");
- MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
- messageListener.setHandlerAdapter(configureListenerAdapter(messageListener));
- String replyToAddress = getDefaultReplyToAddress();
- if (replyToAddress != null) {
- messageListener.setResponseAddress(replyToAddress);
- }
- MessageConverter messageConverter = getMessageConverter();
- if (messageConverter == null) {
- // fall back to the legacy converter holder in the container
- messageConverter = container.getMessageConverter();
- }
- if (messageConverter != null) {
- messageListener.setMessageConverter(messageConverter);
- }
- if (getBeanResolver() != null) {
- messageListener.setBeanResolver(getBeanResolver());
- }
- return messageListener;
- }
2、RabbitListenerContainerFactory
RabbitListenerContainerFactory为:rabbit 监听器容器工厂。既然为 监听器容器工厂 那一定是生产监听器容器。
监听器容器工厂:
- /*
- * Copyright 2002-2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.springframework.amqp.rabbit.listener;
-
- import org.springframework.lang.Nullable;
-
- /**
- * Factory of {@link MessageListenerContainer}s.
- * @param <C> the container type.
- * @author Stephane Nicoll
- * @author Gary Russell
- * @since 1.4
- * @see RabbitListenerEndpoint
- */
- @FunctionalInterface
- public interface RabbitListenerContainerFactory<C extends MessageListenerContainer> {
-
- /**
- * Create a {@link MessageListenerContainer} for the given
- * {@link RabbitListenerEndpoint}.
- * @param endpoint the endpoint to configure.
- * @return the created container.
- */
- C createListenerContainer(@Nullable RabbitListenerEndpoint endpoint);
-
- /**
- * Create a {@link MessageListenerContainer} with no
- * {@link org.springframework.amqp.core.MessageListener} or queues; the listener must
- * be added later before the container is started.
- * @return the created container.
- * @since 2.1.
- */
- default C createListenerContainer() {
- return createListenerContainer(null);
- }
-
- }
监听器容器:
- /*
- * Copyright 2014-2019 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.springframework.amqp.rabbit.listener;
-
- import org.springframework.amqp.core.MessageListener;
- import org.springframework.amqp.support.converter.MessageConverter;
- import org.springframework.context.SmartLifecycle;
-
- /**
- * Internal abstraction used by the framework representing a message
- * listener container. Not meant to be implemented externally.
- *
- * @author Stephane Nicoll
- * @author Gary Russell
- * @since 1.4
- */
- public interface MessageListenerContainer extends SmartLifecycle {
-
- /**
- * Setup the message listener to use. Throws an {@link IllegalArgumentException}
- * if that message listener type is not supported.
- * @param messageListener the {@code object} to wrapped to the {@code MessageListener}.
- */
- void setupMessageListener(MessageListener messageListener);
-
- /**
- * @return the {@link MessageConverter} that can be used to
- * convert {@link org.springframework.amqp.core.Message}, if any.
- * @deprecated - this converter is not used by the container; it was only
- * used to configure the converter for a {@code @RabbitListener} adapter.
- * That is now handled differently. If you are manually creating a listener
- * container, the converter must be configured in a listener adapter (if
- * present).
- */
- @Deprecated
- MessageConverter getMessageConverter();
-
- /**
- * Do not check for missing or mismatched queues during startup. Used for lazily
- * loaded message listener containers to avoid a deadlock when starting such
- * containers. Applications lazily loading containers should verify the queue
- * configuration before loading the container bean.
- * @since 2.1.5
- */
- default void lazyLoad() {
- // no-op
- }
-
- }
监听器容器工厂:
在其抽象子类AbstractRabbitListenerContainerFactory中对 createListenerContainer 方法已做了实现。
我常用的是其子类SimpleRabbitListenerContainerFactory
- public class SimpleRabbitListenerContainerFactory
- extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer> {
- //...
- }
SimpleRabbitListenerContainerFactory工厂生产的是SimpleMessageListenerContainer
- public abstract class AbstractRabbitListenerContainerFactory<C extends AbstractMessageListenerContainer>
- implements RabbitListenerContainerFactory<C>, ApplicationContextAware, ApplicationEventPublisherAware {
- //...
-
-
- //在使用SimpleRabbitListenerContainerFactory时,下面的 C 就是 SimpleMessageListenerContainer
-
- @SuppressWarnings("deprecation")
- @Override
- public C createListenerContainer(RabbitListenerEndpoint endpoint) {
- //创建SimpleMessageListenerContainer
-
- C instance = createContainerInstance();
-
- JavaUtils javaUtils = JavaUtils.INSTANCE.acceptIfNotNull(this.connectionFactory, instance::setConnectionFactory)
- .acceptIfNotNull(this.errorHandler, instance::setErrorHandler);
- if (this.messageConverter != null) {
- if (endpoint != null) {
- endpoint.setMessageConverter(this.messageConverter);
- if (endpoint.getMessageConverter() == null) {
- instance.setMessageConverter(this.messageConverter);
- }
- }
- else {
- instance.setMessageConverter(this.messageConverter);
- }
- }
- javaUtils
- .acceptIfNotNull(this.acknowledgeMode, instance::setAcknowledgeMode)
- .acceptIfNotNull(this.channelTransacted, instance::setChannelTransacted)
- .acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
- .acceptIfNotNull(this.taskExecutor, instance::setTaskExecutor)
- .acceptIfNotNull(this.transactionManager, instance::setTransactionManager)
- .acceptIfNotNull(this.prefetchCount, instance::setPrefetchCount)
- .acceptIfNotNull(this.defaultRequeueRejected, instance::setDefaultRequeueRejected)
- .acceptIfNotNull(this.adviceChain, instance::setAdviceChain)
- .acceptIfNotNull(this.recoveryBackOff, instance::setRecoveryBackOff)
- .acceptIfNotNull(this.mismatchedQueuesFatal, instance::setMismatchedQueuesFatal)
- .acceptIfNotNull(this.missingQueuesFatal, instance::setMissingQueuesFatal)
- .acceptIfNotNull(this.consumerTagStrategy, instance::setConsumerTagStrategy)
- .acceptIfNotNull(this.idleEventInterval, instance::setIdleEventInterval)
- .acceptIfNotNull(this.failedDeclarationRetryInterval, instance::setFailedDeclarationRetryInterval)
- .acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
- .acceptIfNotNull(this.autoStartup, instance::setAutoStartup)
- .acceptIfNotNull(this.phase, instance::setPhase)
- .acceptIfNotNull(this.afterReceivePostProcessors, instance::setAfterReceivePostProcessors);
- if (endpoint != null) {
- if (endpoint.getAutoStartup() != null) {
- instance.setAutoStartup(endpoint.getAutoStartup());
- }
- instance.setListenerId(endpoint.getId());
-
- //setupListenerContainer方法参数为 MessageListenerContainer类型,
- //SimpleMessageListenerContainer是MessageListenerContainer的子类。
- endpoint.setupListenerContainer(instance);
- }
- if (instance.getMessageListener() instanceof AbstractAdaptableMessageListener) {
- AbstractAdaptableMessageListener messageListener = (AbstractAdaptableMessageListener) instance
- .getMessageListener();
- javaUtils
- .acceptIfNotNull(this.beforeSendReplyPostProcessors, messageListener::setBeforeSendReplyPostProcessors)
- .acceptIfNotNull(this.retryTemplate, messageListener::setRetryTemplate)
- .acceptIfCondition(this.retryTemplate != null && this.recoveryCallback != null, this.recoveryCallback,
- messageListener::setRecoveryCallback)
- .acceptIfNotNull(this.defaultRequeueRejected, messageListener::setDefaultRequeueRejected);
- }
- initializeContainer(instance, endpoint);
-
- if (this.containerConfigurer != null) {
- this.containerConfigurer.accept(instance);
- }
-
- return instance;
- }
- }
二、开始
1、HandlerMethod
在被@RabbitListener标注的方法。该方法会被封装成HanderMethod。
- package org.springframework.messaging.handler;
-
- import java.lang.annotation.Annotation;
- import java.lang.reflect.Method;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.beans.factory.BeanFactory;
- import org.springframework.core.BridgeMethodResolver;
- import org.springframework.core.GenericTypeResolver;
- import org.springframework.core.MethodParameter;
- import org.springframework.core.annotation.AnnotatedElementUtils;
- import org.springframework.core.annotation.SynthesizingMethodParameter;
- import org.springframework.lang.Nullable;
- import org.springframework.util.Assert;
- import org.springframework.util.ClassUtils;
-
- public class HandlerMethod {
- public static final Log defaultLogger = LogFactory.getLog(HandlerMethod.class);
- protected Log logger;
- private final Object bean;
- @Nullable
- private final BeanFactory beanFactory;
- private final Class<?> beanType;
- private final Method method;
- private final Method bridgedMethod;
- private final MethodParameter[] parameters;
- @Nullable
- private HandlerMethod resolvedFromHandlerMethod;
-
- public HandlerMethod(Object bean, Method method) {
- this.logger = defaultLogger;
- Assert.notNull(bean, "Bean is required");
- Assert.notNull(method, "Method is required");
- this.bean = bean;
- this.beanFactory = null;
- this.beanType = ClassUtils.getUserClass(bean);
- this.method = method;
- this.bridgedMethod = BridgeMethodResolver.findBridgedMethod(method);
- this.parameters = this.initMethodParameters();
- }
- }
我们常用其子类InvocableHandlerMethod。该类提供一个Object invoke(Message<?> message, Object... providedArgs) 方法,
可以通过该invoke方法调用handlerMethod 中的method:method.invoke(bean,args)
HandlerAdapter称为hander适配器,该适配器中有InvocableHandlerMethod、DelegatingInvocableHandler这两个hander方法。注:InvocableHandlerMethod上文刚说过。
-
- public class HandlerAdapter {
-
- private final InvocableHandlerMethod invokerHandlerMethod;
-
- private final DelegatingInvocableHandler delegatingHandler;
-
- public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
- this.invokerHandlerMethod = invokerHandlerMethod;
- this.delegatingHandler = null;
- }
-
- public HandlerAdapter(DelegatingInvocableHandler delegatingHandler) {
- this.invokerHandlerMethod = null;
- this.delegatingHandler = delegatingHandler;
- }
-
- public InvocationResult invoke(Message<?> message, Object... providedArgs) throws Exception { // NOSONAR
- if (this.invokerHandlerMethod != null) {
- // //InvocableHandlerMethod不为null,就调用invokerHandlerMethod.invoke方法。
- return new InvocationResult(this.invokerHandlerMethod.invoke(message, providedArgs),
- null, this.invokerHandlerMethod.getMethod().getGenericReturnType());
- }
- else if (this.delegatingHandler.hasDefaultHandler()) {
- // Needed to avoid returning raw Message which matches Object
- Object[] args = new Object[providedArgs.length + 1];
- args[0] = message.getPayload();
- System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
- return this.delegatingHandler.invoke(message, args);
- }
- else {
- return this.delegatingHandler.invoke(message, providedArgs);
- }
- }
- //...
- }
这里可以简单的理解调用HandlerAdapter.invoke方法可以间接调用@RabbitListener修饰的方法。
2、ChannelAwareMessageListener
消费端接收到mq的消息会调用ChannelAwareMessageListener 接口中的onMessage方法。
- /*
- * Copyright 2002-2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.springframework.amqp.rabbit.listener.api;
-
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageListener;
-
- import com.rabbitmq.client.Channel;
-
- /**
- * A message listener that is aware of the Channel on which the message was received.
- *
- * @author Mark Pollack
- * @author Gary Russell
- */
- @FunctionalInterface
- public interface ChannelAwareMessageListener extends MessageListener {
-
- /**
- * 回调函数处理接受的消息
- *
- * Callback for processing a received Rabbit message.
- * <p>Implementors are supposed to process the given Message,
- * typically sending reply messages through the given Session.
- * @param message the received AMQP message (never <code>null</code>)
- * @param channel the underlying Rabbit Channel (never <code>null</code>)
- * @throws Exception Any.
- */
- void onMessage(Message message, Channel channel) throws Exception; // NOSONAR
-
- @Override
- default void onMessage(Message message) {
- throw new IllegalStateException("Should never be called for a ChannelAwareMessageListener");
- }
-
- }
下面要说的是其子类:MessagingMessageListenerAdapter
- public class MessagingMessageListenerAdapter extends AbstractAdaptableMessageListener {
-
- private HandlerAdapter handlerAdapter;
-
- private final MessagingMessageConverterAdapter messagingMessageConverter;
-
- private final boolean returnExceptions;
-
- private final RabbitListenerErrorHandler errorHandler;
-
- public MessagingMessageListenerAdapter() {
- this(null, null);
- }
-
- public MessagingMessageListenerAdapter(Object bean, Method method) {
- this(bean, method, false, null);
- }
-
- public MessagingMessageListenerAdapter(Object bean, Method method, boolean returnExceptions,
- RabbitListenerErrorHandler errorHandler) {
- this.messagingMessageConverter = new MessagingMessageConverterAdapter(bean, method);
- this.returnExceptions = returnExceptions;
- this.errorHandler = errorHandler;
- }
-
- @Override
- public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception { // NOSONAR
-
- //ChannelAwareMessageListener.onMessage()方法中的message是org.springframework.amqp.core.Message类型。
- //这里转换成org.springframework.messaging.Message类型。
- Message<?> message = toMessagingMessage(amqpMessage);
- if (logger.isDebugEnabled()) {
- logger.debug("Processing [" + message + "]");
- }
- InvocationResult result = null;
- try {
- //入口
- result = invokeHandler(amqpMessage, channel, message);
- if (result.getReturnValue() != null) {
- handleResult(result, amqpMessage, channel, message);
- }
- else {
- logger.trace("No result object given - no result to handle");
- }
- }
- catch (ListenerExecutionFailedException e) {
- if (this.errorHandler != null) {
- try {
- message = MessageBuilder.fromMessage(message)
- .setHeader(AmqpHeaders.CHANNEL, channel)
- .build();
- Object errorResult = this.errorHandler.handleError(amqpMessage, message, e);
- if (errorResult != null) {
- handleResult(this.handlerAdapter.getInvocationResultFor(errorResult, message.getPayload()),
- amqpMessage, channel, message);
- }
- else {
- logger.trace("Error handler returned no result");
- }
- }
- catch (Exception ex) {
- returnOrThrow(amqpMessage, channel, message, ex, ex);
- }
- }
- else {
- returnOrThrow(amqpMessage, channel, message, e.getCause(), e);
- }
- }
- }
-
- /**
- * Invoke the handler, wrapping any exception to a {@link ListenerExecutionFailedException}
- * with a dedicated error message.
- * @param amqpMessage the raw message.
- * @param channel the channel.
- * @param message the messaging message.
- * @return the result of invoking the handler.
- */
- private InvocationResult invokeHandler(org.springframework.amqp.core.Message amqpMessage, Channel channel,
- Message<?> message) {
- try {
- return this.handlerAdapter.invoke(message, amqpMessage, channel);
- }
- catch (MessagingException ex) {
- throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
- "be invoked with the incoming message", message.getPayload()), ex, amqpMessage);
- }
- catch (Exception ex) {
- throw new ListenerExecutionFailedException("Listener method '" +
- this.handlerAdapter.getMethodAsString(message.getPayload()) + "' threw exception", ex, amqpMessage);
- }
- }
- //...
-
- }
onMessage->invokehandler->this.handlerMethod.invoke 。this.handlerMethod为 HandlerAdapter类型。上面刚说完:调用HandlerAdapter.invoke方法可以间接调用@RabbitListener修饰的方法。
现在我们就已经梳洗清楚从onMessage到调用被@RabbitListener修饰的方法的整个流程
3、SimpleMessageListenerContainer
SimpleMessageListenerContainer 中有一个内部类AsyncMessageProcessingConsumer :
- //当终端接受到信息时,一个线程会执行该run方法。
- private final class AsyncMessageProcessingConsumer implements Runnable {
-
-
- private static final int ABORT_EVENT_WAIT_SECONDS = 5;
-
- private final BlockingQueueConsumer consumer;
-
- private final CountDownLatch start;
-
- private volatile FatalListenerStartupException startupException;
-
- private int consecutiveIdles;
-
- private int consecutiveMessages;
-
-
- AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
- this.consumer = consumer;
- this.start = new CountDownLatch(1);
- }
-
-
- @Override
- public void run() { // NOSONAR - line count
- if (!isActive()) {
- return;
- }
-
- boolean aborted = false;
-
- this.consumer.setLocallyTransacted(isChannelLocallyTransacted());
-
- String routingLookupKey = getRoutingLookupKey();
- if (routingLookupKey != null) {
- SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey); // NOSONAR both never null
- }
-
- if (this.consumer.getQueueCount() < 1) {
- if (logger.isDebugEnabled()) {
- logger.debug("Consumer stopping; no queues for " + this.consumer);
- }
- SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
- if (getApplicationEventPublisher() != null) {
- getApplicationEventPublisher().publishEvent(
- new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
- }
- this.start.countDown();
- return;
- }
-
- try {
- initialize();
- while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
- //这个 while 可以无限的循环,
- mainLoop();
- }
- }catch (InterruptedException e) {
- }
- //...
- }
-
- private void mainLoop() throws Exception { // NOSONAR Exception
- try {
- boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
- if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
- checkAdjust(receivedOk);
- }
- long idleEventInterval = getIdleEventInterval();
- if (idleEventInterval > 0) {
- if (receivedOk) {
- updateLastReceive();
- }
- else {
- long now = System.currentTimeMillis();
- long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
- long lastReceive = getLastReceive();
- if (now > lastReceive + idleEventInterval
- && now > lastAlertAt + idleEventInterval
- && SimpleMessageListenerContainer.this.lastNoMessageAlert
- .compareAndSet(lastAlertAt, now)) {
- publishIdleContainerEvent(now - lastReceive);
- }
- }
- }
- }
- catch (ListenerExecutionFailedException ex) {
- // Continue to process, otherwise re-throw
- if (ex.getCause() instanceof NoSuchMethodException) {
- throw new FatalListenerExecutionException("Invalid listener", ex);
- }
- }
- catch (AmqpRejectAndDontRequeueException rejectEx) {
- /*
- * These will normally be wrapped by an LEFE if thrown by the
- * listener, but we will also honor it if thrown by an
- * error handler.
- */
- }
- }
- }
- }
run方法中调用了receiveAndExecute方法,方法名直译:接受并执行。
-
- package org.springframework.amqp.rabbit.listener;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.HashSet;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Set;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
- import java.util.concurrent.atomic.AtomicLong;
- import java.util.concurrent.atomic.AtomicReference;
-
- import org.springframework.amqp.AmqpAuthenticationException;
- import org.springframework.amqp.AmqpConnectException;
- import org.springframework.amqp.AmqpException;
- import org.springframework.amqp.AmqpIOException;
- import org.springframework.amqp.AmqpIllegalStateException;
- import org.springframework.amqp.AmqpRejectAndDontRequeueException;
- import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
- import org.springframework.amqp.rabbit.connection.ConsumerChannelRegistry;
- import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
- import org.springframework.amqp.rabbit.connection.RabbitUtils;
- import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
- import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException;
- import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException;
- import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
- import org.springframework.amqp.rabbit.support.ConsumerCancelledException;
- import org.springframework.amqp.rabbit.support.ListenerContainerAware;
- import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
- import org.springframework.jmx.export.annotation.ManagedMetric;
- import org.springframework.jmx.support.MetricType;
- import org.springframework.transaction.PlatformTransactionManager;
- import org.springframework.transaction.support.TransactionSynchronizationManager;
- import org.springframework.transaction.support.TransactionTemplate;
- import org.springframework.util.Assert;
- import org.springframework.util.backoff.BackOffExecution;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.PossibleAuthenticationFailureException;
- import com.rabbitmq.client.ShutdownSignalException;
-
- /**
- * @author Mark Pollack
- * @author Mark Fisher
- * @author Dave Syer
- * @author Gary Russell
- * @author Artem Bilan
- * @author Alex Panchenko
- *
- * @since 1.0
- */
- public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
-
-
- //接受并执行
- private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {
-
- //do接受并执行
- return doReceiveAndExecute(consumer);
- }
- //do接受并执行
- private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable {
-
- Channel channel = consumer.getChannel();
- for (int i = 0; i < this.txSize; i++) {//txSize为一次事务接受的消息个数
- //读取消息,这里阻塞的,但是有一个超时时间。
- Message message = consumer.nextMessage(this.receiveTimeout);
- if (message == null) {//阻塞超时
- break;
- }
- try {
- executeListener(channel, message);//消息接收已完成,现在开始处理消息。
- }
- catch (Exception e) {}
- }
- return consumer.commitIfNecessary(isChannelLocallyTransacted());
- }
-
-
- //处理消息开始。该方法在其父类中
- protected void executeListener(Channel channel, Message messageIn) throws Exception {
- try {
- Message message = messageIn;
- if (……) {
- //批处理信息,这个不研究
- }else {
- invokeListener(channel, message);
- }
- }catch (Exception ex) {}
- }
- //在其父类中
- protected void invokeListener(Channel channel, Message message) throws Exception {
- //这里this.proxy.invokeListener最终会调用actualInvokeListener方法。
- this.proxy.invokeListener(channel, message);
- }
- //在其父类中
- protected void actualInvokeListener(Channel channel, Message message) throws Exception {
- Object listener = getMessageListener();
- if (listener instanceof ChannelAwareMessageListener) {
- doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
- }
- else if (listener instanceof MessageListener) {
- //……
- doInvokeListener((MessageListener) listener, message)
- }else{
- //……
- }
-
- }
-
- protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message)
- throws Exception {
- Channel channelToUse = channel;
- try {
- listener.onMessage(message, channelToUse);
- }
- catch (Exception e) {
- throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
- }
-
- }
- }
在doInvokeListener方中最终调用了onMessage方法。
到这我知道了SimpleMessageListenerContainer内部类AsyncMessageProcessingConsumer被放入线程池中运行,是谁把AsyncMessageProcessingConsumer放入线程池中的呢?
4、RabbitListenerAnnotationBeanPostProcessor
RabbitListenerAnnotationBeanPostProcessor 听名得意:@RabbitListener注解的后置处理器。
- public class RabbitListenerAnnotationBeanPostProcessor implements BeanPostProcessor,SmartInitializingSingleton {
-
- //对接口BeanPostProcessor的实现。
- @Override
- public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
- Class<?> targetClass = AopUtils.getTargetClass(bean);
- final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
- //获取被@RabbitListener修饰的方法。
- for (ListenerMethod lm : metadata.listenerMethods) {
- //获取方法上的@RabbitListener注解。
- for (RabbitListener rabbitListener : lm.annotations) {
- //处理Amqp监听:
- //有这三个参数就以实现监听队列并调用方法。参数:rabbitListener为我们的方法上@RabbitListener注解。参数method和bean可以能过反射的方式调用我们的方法。
- processAmqpListener(rabbitListener, lm.method, bean, beanName);
- }
- }
- return bean;
- }
-
- //处理Amqp监听
- protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
- Method methodToUse = checkProxy(method, bean);
-
- //Endpoint为终端,像电脑、手机都是终端,他们都可以接受外部信息并响应,如手机来短信了就有提示。
- //这里也用了终端的概念,被@RabbitListener注解修饰方法也有终端的特点可以接受外部信息并响应。
- //MethodRabbitListenerEndpoint名子也很形象,叫方法监听终端。
- MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
- endpoint.setMethod(methodToUse);//终端接收到信息时,会调用 methodToUse
- processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
- }
- //注册员
- private final RabbitListenerEndpointRegistrar registrar = new RabbitListenerEndpointRegistrar();
-
- protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
- Object adminTarget, String beanName) {
- //下面是为终端注入一些属性
- endpoint.setBean(bean);
- endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
- endpoint.setId(getEndpointId(rabbitListener));
- endpoint.setQueueNames(resolveQueues(rabbitListener));
- endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
- endpoint.setBeanFactory(this.beanFactory);
- endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
-
- ……
-
- RabbitListenerContainerFactory<?> factory = null;
- String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
- if (StringUtils.hasText(containerFactoryBeanName)) {
- Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
- try {
- factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
- }
- catch (NoSuchBeanDefinitionException ex) {
- throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
- adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
- containerFactoryBeanName + "' was found in the application context", ex);
- }
- }
- //终端准备完成后,还要将终端进行注册。
- //注册员将终端注册到注册处
- this.registrar.registerEndpoint(endpoint, factory);
- }
-
-
-
-
-
- //对SmartInitializingSingleton的实现,该方法在bean的后置处理BeanPostProcessor之后调用。
- @Override
- public void afterSingletonsInstantiated() {
- this.registrar.setBeanFactory(this.beanFactory);
-
- if (this.containerFactoryBeanName != null) {
- //this.containerFactoryBeanName为“rabbitListenerContainerFactory”
- this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
- }
-
- // Actually register all listeners
- this.registrar.afterPropertiesSet();
-
- }
-
- }
RabbitListenerEndpointRegistrar :rabbit 监听器注册员
- //终端注册员
- public class RabbitListenerEndpointRegistrar implements InitializingBean{
-
- private RabbitListenerEndpointRegistry endpointRegistry;//终端注册处
- //终端描述器集合
- private final List<AmqpListenerEndpointDescriptor> endpointDescriptors =
- new ArrayList<AmqpListenerEndpointDescriptor>();
-
- //注册员将终端注册到注册处
- public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
-
- //将 endpoint和 factory 封装成 终端描述器
- AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
- synchronized (this.endpointDescriptors) {
- if (this.startImmediately) { //马下注册
- //将 终端注册到 终端注册处
- this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
- resolveContainerFactory(descriptor), true);
- }
- else {//afterPropertiesSet()方法中统一注册
- //放入终端描述器集合
- this.endpointDescriptors.add(descriptor);
- }
- }
- }
-
- //InitializingBean接口的实现
- @Override
- public void afterPropertiesSet() {
- //统一注册
- registerAllEndpoints();
- }
- //统一注册
- protected void registerAllEndpoints() {
- synchronized (this.endpointDescriptors) {
- for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
- this.endpointRegistry.registerListenerContainer(
- descriptor.endpoint, resolveContainerFactory(descriptor));
- }
- }
- }
-
-
-
-
-
- }
RabbitListenerEndpointRegistry:注册处
-
-
-
-
- public class RabbitListenerEndpointRegistry implements SmartLifecycle{
-
- private final Map<String, MessageListenerContainer> listenerContainers =
- new ConcurrentHashMap<String, MessageListenerContainer>();
- //注册终端
- public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
- boolean startImmediately) {
-
- String id = endpoint.getId();
- synchronized (this.listenerContainers) {
- //创建 listenerContainer
- MessageListenerContainer container = createListenerContainer(endpoint, factory);
- this.listenerContainers.put(id, container);
- ……
-
- if (startImmediately) {
- startIfNecessary(container);
- }
- }
- }
-
- protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
- RabbitListenerContainerFactory<?> factory) {
- //调用RabbitListener容器工厂的createListenerContainer方法获取RabbitListener容器
- MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
-
- return listenerContainer;
- }
-
- //start 是对 smartLifecycle 接口的实现,由 spring 容器调用
- //上面的的代码中已经创建了listenerContainer 并放入了listenerContainers集合中,现在要将集合中的 listenerContainer放入线程池中。
- @Override
- public void start() {
- //遍历listenerContainers集合
- for (MessageListenerContainer listenerContainer : getListenerContainers()) {
- startIfNecessary(listenerContainer);
- }
- }
- //调用集合中的listenerContainer的 start 方法。
- private void startIfNecessary(MessageListenerContainer listenerContainer) {
- if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
- listenerContainer.start();
- }
- }
-
- }
分析了RabbitListenerAnnotationBeanPostProcessor:@RabbitListener注解的后置处理器 。从读取被@RabbitListener修饰的方法 => 创建 endpoint =>创建MessageListenerContainer并放入集合中=>遍历集合中的MessageListenerContainer并调用 start方法。
MessageListenerContainer中的 start 方法做了什么?
其子类 SimpleMessageListenerContainer是我们学习的重点
- public interface MessageListenerContainer extends SmartLifecycle {
-
- /**
- * Setup the message listener to use. Throws an {@link IllegalArgumentException}
- * if that message listener type is not supported.
- */
- void setupMessageListener(Object messageListener);
-
- /**
- * @return the {@link MessageConverter} that can be used to
- * convert {@link org.springframework.amqp.core.Message}, if any.
- */
- MessageConverter getMessageConverter();
-
- }
start 方法在其抽象子类 AbstractMessageListenerContainer 中实现了:
//MessageListenerContainer 同样也继承了 SmartLifecycle接口,SmartLifecycle中的方法是由 Spring 容器调用的,这里我们手动调用了 start 方法,这意味这 start 会被调用两次。
- @Override
- public void start() {
- if (isRunning()) {//第二调用时会直接 return。
- return;
- }
-
- try {
- doStart();
- }
- catch (Exception ex) {
- }
- }
doStart 方法子类会覆盖:
在子类:SimpleMessageListenerContainer中:
- @Override
- protected void doStart() throws Exception {
- ……
- super.doStart();
- synchronized (this.consumersMonitor) {
-
- int newConsumers = initializeConsumers();
-
- Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
- for (BlockingQueueConsumer consumer : this.consumers) {
- //AsyncMessageProcessingConsumer是SimpleMessageListenerContainer的内部类
- AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
- processors.add(processor);
- //放入线程池
- getTaskExecutor().execute(processor);
-
- }
-
- }
- }
RabbitListenerAnnotationBeanPostProcessor调用SimpleMessageListenerContainer是AsyncMessageProcessingConsumer放入线程池中的。
参考链接:https://blog.csdn.net/hong10086/article/details/93672166
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。