当前位置:   article > 正文

Spring-RabbitMQ源码解读_org.springframework.amqp.rabbit.rabbitlistenerendp

org.springframework.amqp.rabbit.rabbitlistenerendpointcontainer

前言

1.仓库地址

https://gitee.com/JiuLongBingShi/spring-rabbit-king.git

2.maven依赖

  1. <dependency>
  2. <groupId>com.king.springboot</groupId>
  3. <artifactId>king-rabbit</artifactId>
  4. <version>2.1.0</version>
  5. </dependency>

一、知识点  
1、RabbitListenerEndpoint :
Endpoint为终端,像电脑、手机都是终端,他们都可以接受外部信息并响应,如手机来短信了就有提示。这里也用了终端的概念,例如:被@RabbitListener注解修饰方法也有终端的特点 可以接受外部信息并响应。

  1. /*
  2. * Copyright 2002-2018 the original author or authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * https://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package org.springframework.amqp.rabbit.listener;
  17. import org.springframework.amqp.support.converter.MessageConverter;
  18. import org.springframework.lang.Nullable;
  19. /**
  20. * Model for a Rabbit listener endpoint. Can be used against a
  21. * {@link org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer
  22. * RabbitListenerConfigurer} to register endpoints programmatically.
  23. *
  24. * @author Stephane Nicoll
  25. * @author Gary Russell
  26. * @since 1.4
  27. */
  28. public interface RabbitListenerEndpoint {
  29. /**
  30. * @return the id of this endpoint. The id can be further qualified
  31. * when the endpoint is resolved against its actual listener
  32. * container.
  33. * @see RabbitListenerContainerFactory#createListenerContainer
  34. */
  35. String getId();
  36. /**
  37. * @return the group of this endpoint or null if not in a group.
  38. * @since 1.5
  39. */
  40. String getGroup();
  41. /**
  42. * @return the concurrency of this endpoint.
  43. * @since 2.0
  44. */
  45. String getConcurrency();
  46. /**
  47. * Override of the default autoStartup property.
  48. * @return the autoStartup.
  49. * @since 2.0
  50. */
  51. Boolean getAutoStartup();
  52. /**
  53. * Setup the specified message listener container with the model
  54. * defined by this endpoint.
  55. * <p>This endpoint must provide the requested missing option(s) of
  56. * the specified container to make it usable. Usually, this is about
  57. * setting the {@code queues} and the {@code messageListener} to
  58. * use but an implementation may override any default setting that
  59. * was already set.
  60. * @param listenerContainer the listener container to configure
  61. */
  62. void setupListenerContainer(MessageListenerContainer listenerContainer);
  63. /**
  64. * The preferred way for a container factory to pass a message converter
  65. * to the endpoint's adapter.
  66. * @param converter the converter.
  67. * @since 2.0.8
  68. */
  69. default void setMessageConverter(MessageConverter converter) {
  70. // NOSONAR
  71. }
  72. /**
  73. * Used by the container factory to check if this endpoint supports the
  74. * preferred way for a container factory to pass a message converter
  75. * to the endpoint's adapter. If null is returned, the factory will
  76. * fall back to the legacy method of passing the converter via the
  77. * container.
  78. * @return the converter.
  79. * @since 2.0.8
  80. */
  81. @Nullable
  82. default MessageConverter getMessageConverter() {
  83. return null;
  84. }
  85. }


这里的终端有很多种,刚才说到 『被@RabbitListener注解修饰方法也是终端』 就是 MethodRabbitListenerEnpoint 方法终端。

接口方法中 void setupListenerContainer(MessageListenerContainer listenerContainer) 方法,中的MessageListenerContainer就是用来接收并处理消息的。

在抽象类中AbstractRabbitListenerEndpoint对setupListenerContainer方法进了实现:

  1. @Override
  2. public void setupListenerContainer(MessageListenerContainer listenerContainer) {
  3. AbstractMessageListenerContainer container = (AbstractMessageListenerContainer) listenerContainer;
  4. boolean queuesEmpty = getQueues().isEmpty();
  5. boolean queueNamesEmpty = getQueueNames().isEmpty();
  6. if (!queuesEmpty && !queueNamesEmpty) {
  7. throw new IllegalStateException("Queues or queue names must be provided but not both for " + this);
  8. }
  9. if (queuesEmpty) {
  10. Collection<String> names = getQueueNames();
  11. container.setQueueNames(names.toArray(new String[names.size()]));
  12. }
  13. else {
  14. Collection<Queue> instances = getQueues();
  15. container.setQueues(instances.toArray(new Queue[instances.size()]));
  16. }
  17. container.setExclusive(isExclusive());
  18. if (getPriority() != null) {
  19. Map<String, Object> args = new HashMap<String, Object>();
  20. args.put("x-priority", getPriority());
  21. container.setConsumerArguments(args);
  22. }
  23. if (getAdmin() != null) {
  24. container.setAmqpAdmin(getAdmin());
  25. }
  26. //入口
  27. setupMessageListener(listenerContainer);
  28. }
  1. private void setupMessageListener(MessageListenerContainer container) {
  2. //
  3. //MessageListenerContainer为容器,容器是要装东西的,这里显然是装MessageListener。
  4. //createMessageListener(container)为抽象方法,用于子类覆盖。
  5. MessageListener messageListener = createMessageListener(container);
  6. Assert.state(messageListener != null, () -> "Endpoint [" + this + "] must provide a non null message listener");
  7. container.setupMessageListener(messageListener);
  8. }


    


    
   
子类MethodRabbitListenerEndpoint中覆盖了createMessageListener。

  1. @Override
  2. protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
  3. Assert.state(this.messageHandlerMethodFactory != null,
  4. "Could not create message listener - MessageHandlerMethodFactory not set");
  5. MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
  6. messageListener.setHandlerAdapter(configureListenerAdapter(messageListener));
  7. String replyToAddress = getDefaultReplyToAddress();
  8. if (replyToAddress != null) {
  9. messageListener.setResponseAddress(replyToAddress);
  10. }
  11. MessageConverter messageConverter = getMessageConverter();
  12. if (messageConverter == null) {
  13. // fall back to the legacy converter holder in the container
  14. messageConverter = container.getMessageConverter();
  15. }
  16. if (messageConverter != null) {
  17. messageListener.setMessageConverter(messageConverter);
  18. }
  19. if (getBeanResolver() != null) {
  20. messageListener.setBeanResolver(getBeanResolver());
  21. }
  22. return messageListener;
  23. }


    
    

2、RabbitListenerContainerFactory
RabbitListenerContainerFactory为:rabbit 监听器容器工厂。既然为 监听器容器工厂 那一定是生产监听器容器。

监听器容器工厂:

  1. /*
  2. * Copyright 2002-2018 the original author or authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * https://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package org.springframework.amqp.rabbit.listener;
  17. import org.springframework.lang.Nullable;
  18. /**
  19. * Factory of {@link MessageListenerContainer}s.
  20. * @param <C> the container type.
  21. * @author Stephane Nicoll
  22. * @author Gary Russell
  23. * @since 1.4
  24. * @see RabbitListenerEndpoint
  25. */
  26. @FunctionalInterface
  27. public interface RabbitListenerContainerFactory<C extends MessageListenerContainer> {
  28. /**
  29. * Create a {@link MessageListenerContainer} for the given
  30. * {@link RabbitListenerEndpoint}.
  31. * @param endpoint the endpoint to configure.
  32. * @return the created container.
  33. */
  34. C createListenerContainer(@Nullable RabbitListenerEndpoint endpoint);
  35. /**
  36. * Create a {@link MessageListenerContainer} with no
  37. * {@link org.springframework.amqp.core.MessageListener} or queues; the listener must
  38. * be added later before the container is started.
  39. * @return the created container.
  40. * @since 2.1.
  41. */
  42. default C createListenerContainer() {
  43. return createListenerContainer(null);
  44. }
  45. }


监听器容器: 

  1. /*
  2. * Copyright 2014-2019 the original author or authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * https://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package org.springframework.amqp.rabbit.listener;
  17. import org.springframework.amqp.core.MessageListener;
  18. import org.springframework.amqp.support.converter.MessageConverter;
  19. import org.springframework.context.SmartLifecycle;
  20. /**
  21. * Internal abstraction used by the framework representing a message
  22. * listener container. Not meant to be implemented externally.
  23. *
  24. * @author Stephane Nicoll
  25. * @author Gary Russell
  26. * @since 1.4
  27. */
  28. public interface MessageListenerContainer extends SmartLifecycle {
  29. /**
  30. * Setup the message listener to use. Throws an {@link IllegalArgumentException}
  31. * if that message listener type is not supported.
  32. * @param messageListener the {@code object} to wrapped to the {@code MessageListener}.
  33. */
  34. void setupMessageListener(MessageListener messageListener);
  35. /**
  36. * @return the {@link MessageConverter} that can be used to
  37. * convert {@link org.springframework.amqp.core.Message}, if any.
  38. * @deprecated - this converter is not used by the container; it was only
  39. * used to configure the converter for a {@code @RabbitListener} adapter.
  40. * That is now handled differently. If you are manually creating a listener
  41. * container, the converter must be configured in a listener adapter (if
  42. * present).
  43. */
  44. @Deprecated
  45. MessageConverter getMessageConverter();
  46. /**
  47. * Do not check for missing or mismatched queues during startup. Used for lazily
  48. * loaded message listener containers to avoid a deadlock when starting such
  49. * containers. Applications lazily loading containers should verify the queue
  50. * configuration before loading the container bean.
  51. * @since 2.1.5
  52. */
  53. default void lazyLoad() {
  54. // no-op
  55. }
  56. }


 监听器容器工厂:

 在其抽象子类AbstractRabbitListenerContainerFactory中对 createListenerContainer 方法已做了实现。

我常用的是其子类SimpleRabbitListenerContainerFactory

  1. public class SimpleRabbitListenerContainerFactory
  2. extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer> {
  3. //...
  4. }


SimpleRabbitListenerContainerFactory工厂生产的是SimpleMessageListenerContainer

  1. public abstract class AbstractRabbitListenerContainerFactory<C extends AbstractMessageListenerContainer>
  2. implements RabbitListenerContainerFactory<C>, ApplicationContextAware, ApplicationEventPublisherAware {
  3. //...
  4. //在使用SimpleRabbitListenerContainerFactory时,下面的 C 就是 SimpleMessageListenerContainer
  5. @SuppressWarnings("deprecation")
  6. @Override
  7. public C createListenerContainer(RabbitListenerEndpoint endpoint) {
  8. //创建SimpleMessageListenerContainer
  9.         
  10. C instance = createContainerInstance();
  11. JavaUtils javaUtils = JavaUtils.INSTANCE.acceptIfNotNull(this.connectionFactory, instance::setConnectionFactory)
  12. .acceptIfNotNull(this.errorHandler, instance::setErrorHandler);
  13. if (this.messageConverter != null) {
  14. if (endpoint != null) {
  15. endpoint.setMessageConverter(this.messageConverter);
  16. if (endpoint.getMessageConverter() == null) {
  17. instance.setMessageConverter(this.messageConverter);
  18. }
  19. }
  20. else {
  21. instance.setMessageConverter(this.messageConverter);
  22. }
  23. }
  24. javaUtils
  25. .acceptIfNotNull(this.acknowledgeMode, instance::setAcknowledgeMode)
  26. .acceptIfNotNull(this.channelTransacted, instance::setChannelTransacted)
  27. .acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
  28. .acceptIfNotNull(this.taskExecutor, instance::setTaskExecutor)
  29. .acceptIfNotNull(this.transactionManager, instance::setTransactionManager)
  30. .acceptIfNotNull(this.prefetchCount, instance::setPrefetchCount)
  31. .acceptIfNotNull(this.defaultRequeueRejected, instance::setDefaultRequeueRejected)
  32. .acceptIfNotNull(this.adviceChain, instance::setAdviceChain)
  33. .acceptIfNotNull(this.recoveryBackOff, instance::setRecoveryBackOff)
  34. .acceptIfNotNull(this.mismatchedQueuesFatal, instance::setMismatchedQueuesFatal)
  35. .acceptIfNotNull(this.missingQueuesFatal, instance::setMissingQueuesFatal)
  36. .acceptIfNotNull(this.consumerTagStrategy, instance::setConsumerTagStrategy)
  37. .acceptIfNotNull(this.idleEventInterval, instance::setIdleEventInterval)
  38. .acceptIfNotNull(this.failedDeclarationRetryInterval, instance::setFailedDeclarationRetryInterval)
  39. .acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
  40. .acceptIfNotNull(this.autoStartup, instance::setAutoStartup)
  41. .acceptIfNotNull(this.phase, instance::setPhase)
  42. .acceptIfNotNull(this.afterReceivePostProcessors, instance::setAfterReceivePostProcessors);
  43. if (endpoint != null) {
  44. if (endpoint.getAutoStartup() != null) {
  45. instance.setAutoStartup(endpoint.getAutoStartup());
  46. }
  47. instance.setListenerId(endpoint.getId());
  48.    
  49. //setupListenerContainer方法参数为 MessageListenerContainer类型,
  50.             //SimpleMessageListenerContainer是MessageListenerContainer的子类。
  51. endpoint.setupListenerContainer(instance);
  52. }
  53. if (instance.getMessageListener() instanceof AbstractAdaptableMessageListener) {
  54. AbstractAdaptableMessageListener messageListener = (AbstractAdaptableMessageListener) instance
  55. .getMessageListener();
  56. javaUtils
  57. .acceptIfNotNull(this.beforeSendReplyPostProcessors, messageListener::setBeforeSendReplyPostProcessors)
  58. .acceptIfNotNull(this.retryTemplate, messageListener::setRetryTemplate)
  59. .acceptIfCondition(this.retryTemplate != null && this.recoveryCallback != null, this.recoveryCallback,
  60. messageListener::setRecoveryCallback)
  61. .acceptIfNotNull(this.defaultRequeueRejected, messageListener::setDefaultRequeueRejected);
  62. }
  63. initializeContainer(instance, endpoint);
  64. if (this.containerConfigurer != null) {
  65. this.containerConfigurer.accept(instance);
  66. }
  67. return instance;
  68. }
  69. }


 

二、开始
 

1、HandlerMethod
  在被@RabbitListener标注的方法。该方法会被封装成HanderMethod。

  1. package org.springframework.messaging.handler;
  2. import java.lang.annotation.Annotation;
  3. import java.lang.reflect.Method;
  4. import org.apache.commons.logging.Log;
  5. import org.apache.commons.logging.LogFactory;
  6. import org.springframework.beans.factory.BeanFactory;
  7. import org.springframework.core.BridgeMethodResolver;
  8. import org.springframework.core.GenericTypeResolver;
  9. import org.springframework.core.MethodParameter;
  10. import org.springframework.core.annotation.AnnotatedElementUtils;
  11. import org.springframework.core.annotation.SynthesizingMethodParameter;
  12. import org.springframework.lang.Nullable;
  13. import org.springframework.util.Assert;
  14. import org.springframework.util.ClassUtils;
  15. public class HandlerMethod {
  16. public static final Log defaultLogger = LogFactory.getLog(HandlerMethod.class);
  17. protected Log logger;
  18. private final Object bean;
  19. @Nullable
  20. private final BeanFactory beanFactory;
  21. private final Class<?> beanType;
  22. private final Method method;
  23. private final Method bridgedMethod;
  24. private final MethodParameter[] parameters;
  25. @Nullable
  26. private HandlerMethod resolvedFromHandlerMethod;
  27. public HandlerMethod(Object bean, Method method) {
  28. this.logger = defaultLogger;
  29. Assert.notNull(bean, "Bean is required");
  30. Assert.notNull(method, "Method is required");
  31. this.bean = bean;
  32. this.beanFactory = null;
  33. this.beanType = ClassUtils.getUserClass(bean);
  34. this.method = method;
  35. this.bridgedMethod = BridgeMethodResolver.findBridgedMethod(method);
  36. this.parameters = this.initMethodParameters();
  37. }
  38. }


我们常用其子类InvocableHandlerMethod。该类提供一个Object invoke(Message<?> message, Object... providedArgs) 方法,
 可以通过该invoke方法调用handlerMethod 中的method:method.invoke(bean,args)

HandlerAdapter称为hander适配器,该适配器中有InvocableHandlerMethod、DelegatingInvocableHandler这两个hander方法。注:InvocableHandlerMethod上文刚说过。

  1. public class HandlerAdapter {
  2. private final InvocableHandlerMethod invokerHandlerMethod;
  3. private final DelegatingInvocableHandler delegatingHandler;
  4. public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
  5. this.invokerHandlerMethod = invokerHandlerMethod;
  6. this.delegatingHandler = null;
  7. }
  8. public HandlerAdapter(DelegatingInvocableHandler delegatingHandler) {
  9. this.invokerHandlerMethod = null;
  10. this.delegatingHandler = delegatingHandler;
  11. }
  12. public InvocationResult invoke(Message<?> message, Object... providedArgs) throws Exception { // NOSONAR
  13. if (this.invokerHandlerMethod != null) {
  14. // //InvocableHandlerMethod不为null,就调用invokerHandlerMethod.invoke方法。
  15. return new InvocationResult(this.invokerHandlerMethod.invoke(message, providedArgs),
  16. null, this.invokerHandlerMethod.getMethod().getGenericReturnType());
  17. }
  18. else if (this.delegatingHandler.hasDefaultHandler()) {
  19. // Needed to avoid returning raw Message which matches Object
  20. Object[] args = new Object[providedArgs.length + 1];
  21. args[0] = message.getPayload();
  22. System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
  23. return this.delegatingHandler.invoke(message, args);
  24. }
  25. else {
  26. return this.delegatingHandler.invoke(message, providedArgs);
  27. }
  28. }
  29. //...
  30. }


这里可以简单的理解调用HandlerAdapter.invoke方法可以间接调用@RabbitListener修饰的方法。

2、ChannelAwareMessageListener 
消费端接收到mq的消息会调用ChannelAwareMessageListener 接口中的onMessage方法。

  1. /*
  2. * Copyright 2002-2018 the original author or authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * https://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package org.springframework.amqp.rabbit.listener.api;
  17. import org.springframework.amqp.core.Message;
  18. import org.springframework.amqp.core.MessageListener;
  19. import com.rabbitmq.client.Channel;
  20. /**
  21. * A message listener that is aware of the Channel on which the message was received.
  22. *
  23. * @author Mark Pollack
  24. * @author Gary Russell
  25. */
  26. @FunctionalInterface
  27. public interface ChannelAwareMessageListener extends MessageListener {
  28. /**
  29. * 回调函数处理接受的消息
  30. *
  31. * Callback for processing a received Rabbit message.
  32. * <p>Implementors are supposed to process the given Message,
  33. * typically sending reply messages through the given Session.
  34. * @param message the received AMQP message (never <code>null</code>)
  35. * @param channel the underlying Rabbit Channel (never <code>null</code>)
  36. * @throws Exception Any.
  37. */
  38. void onMessage(Message message, Channel channel) throws Exception; // NOSONAR
  39. @Override
  40. default void onMessage(Message message) {
  41. throw new IllegalStateException("Should never be called for a ChannelAwareMessageListener");
  42. }
  43. }


下面要说的是其子类:MessagingMessageListenerAdapter

  1. public class MessagingMessageListenerAdapter extends AbstractAdaptableMessageListener {
  2. private HandlerAdapter handlerAdapter;
  3. private final MessagingMessageConverterAdapter messagingMessageConverter;
  4. private final boolean returnExceptions;
  5. private final RabbitListenerErrorHandler errorHandler;
  6. public MessagingMessageListenerAdapter() {
  7. this(null, null);
  8. }
  9. public MessagingMessageListenerAdapter(Object bean, Method method) {
  10. this(bean, method, false, null);
  11. }
  12. public MessagingMessageListenerAdapter(Object bean, Method method, boolean returnExceptions,
  13. RabbitListenerErrorHandler errorHandler) {
  14. this.messagingMessageConverter = new MessagingMessageConverterAdapter(bean, method);
  15. this.returnExceptions = returnExceptions;
  16. this.errorHandler = errorHandler;
  17. }
  18. @Override
  19. public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception { // NOSONAR
  20. //ChannelAwareMessageListener.onMessage()方法中的message是org.springframework.amqp.core.Message类型。
  21. //这里转换成org.springframework.messaging.Message类型。
  22. Message<?> message = toMessagingMessage(amqpMessage);
  23. if (logger.isDebugEnabled()) {
  24. logger.debug("Processing [" + message + "]");
  25. }
  26. InvocationResult result = null;
  27. try {
  28. //入口
  29. result = invokeHandler(amqpMessage, channel, message);
  30. if (result.getReturnValue() != null) {
  31. handleResult(result, amqpMessage, channel, message);
  32. }
  33. else {
  34. logger.trace("No result object given - no result to handle");
  35. }
  36. }
  37. catch (ListenerExecutionFailedException e) {
  38. if (this.errorHandler != null) {
  39. try {
  40. message = MessageBuilder.fromMessage(message)
  41. .setHeader(AmqpHeaders.CHANNEL, channel)
  42. .build();
  43. Object errorResult = this.errorHandler.handleError(amqpMessage, message, e);
  44. if (errorResult != null) {
  45. handleResult(this.handlerAdapter.getInvocationResultFor(errorResult, message.getPayload()),
  46. amqpMessage, channel, message);
  47. }
  48. else {
  49. logger.trace("Error handler returned no result");
  50. }
  51. }
  52. catch (Exception ex) {
  53. returnOrThrow(amqpMessage, channel, message, ex, ex);
  54. }
  55. }
  56. else {
  57. returnOrThrow(amqpMessage, channel, message, e.getCause(), e);
  58. }
  59. }
  60. }
  61. /**
  62. * Invoke the handler, wrapping any exception to a {@link ListenerExecutionFailedException}
  63. * with a dedicated error message.
  64. * @param amqpMessage the raw message.
  65. * @param channel the channel.
  66. * @param message the messaging message.
  67. * @return the result of invoking the handler.
  68. */
  69. private InvocationResult invokeHandler(org.springframework.amqp.core.Message amqpMessage, Channel channel,
  70. Message<?> message) {
  71. try {
  72. return this.handlerAdapter.invoke(message, amqpMessage, channel);
  73. }
  74. catch (MessagingException ex) {
  75. throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
  76. "be invoked with the incoming message", message.getPayload()), ex, amqpMessage);
  77. }
  78. catch (Exception ex) {
  79. throw new ListenerExecutionFailedException("Listener method '" +
  80. this.handlerAdapter.getMethodAsString(message.getPayload()) + "' threw exception", ex, amqpMessage);
  81. }
  82. }
  83. //...
  84. }


onMessage->invokehandler->this.handlerMethod.invoke 。this.handlerMethod为 HandlerAdapter类型。上面刚说完:调用HandlerAdapter.invoke方法可以间接调用@RabbitListener修饰的方法。

现在我们就已经梳洗清楚从onMessage到调用被@RabbitListener修饰的方法的整个流程

3、SimpleMessageListenerContainer
 

SimpleMessageListenerContainer 中有一个内部类AsyncMessageProcessingConsumer :

  1. //当终端接受到信息时,一个线程会执行该run方法。
  2.     private final class AsyncMessageProcessingConsumer implements Runnable {
  3. private static final int ABORT_EVENT_WAIT_SECONDS = 5;
  4. private final BlockingQueueConsumer consumer;
  5. private final CountDownLatch start;
  6. private volatile FatalListenerStartupException startupException;
  7. private int consecutiveIdles;
  8. private int consecutiveMessages;
  9. AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
  10. this.consumer = consumer;
  11. this.start = new CountDownLatch(1);
  12. }
  13.         @Override
  14.         public void run() { // NOSONAR - line count
  15. if (!isActive()) {
  16. return;
  17. }
  18. boolean aborted = false;
  19. this.consumer.setLocallyTransacted(isChannelLocallyTransacted());
  20. String routingLookupKey = getRoutingLookupKey();
  21. if (routingLookupKey != null) {
  22. SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey); // NOSONAR both never null
  23. }
  24. if (this.consumer.getQueueCount() < 1) {
  25. if (logger.isDebugEnabled()) {
  26. logger.debug("Consumer stopping; no queues for " + this.consumer);
  27. }
  28. SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
  29. if (getApplicationEventPublisher() != null) {
  30. getApplicationEventPublisher().publishEvent(
  31. new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
  32. }
  33. this.start.countDown();
  34. return;
  35. }
  36. try {
  37. initialize();
  38. while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
  39.     //这个 while 可以无限的循环,
  40. mainLoop();
  41. }
  42. }catch (InterruptedException e) {
  43. }
  44. //...
  45.         }
  46. private void mainLoop() throws Exception { // NOSONAR Exception
  47. try {
  48. boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
  49. if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
  50. checkAdjust(receivedOk);
  51. }
  52. long idleEventInterval = getIdleEventInterval();
  53. if (idleEventInterval > 0) {
  54. if (receivedOk) {
  55. updateLastReceive();
  56. }
  57. else {
  58. long now = System.currentTimeMillis();
  59. long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
  60. long lastReceive = getLastReceive();
  61. if (now > lastReceive + idleEventInterval
  62. && now > lastAlertAt + idleEventInterval
  63. && SimpleMessageListenerContainer.this.lastNoMessageAlert
  64. .compareAndSet(lastAlertAt, now)) {
  65. publishIdleContainerEvent(now - lastReceive);
  66. }
  67. }
  68. }
  69. }
  70. catch (ListenerExecutionFailedException ex) {
  71. // Continue to process, otherwise re-throw
  72. if (ex.getCause() instanceof NoSuchMethodException) {
  73. throw new FatalListenerExecutionException("Invalid listener", ex);
  74. }
  75. }
  76. catch (AmqpRejectAndDontRequeueException rejectEx) {
  77. /*
  78. * These will normally be wrapped by an LEFE if thrown by the
  79. * listener, but we will also honor it if thrown by an
  80. * error handler.
  81. */
  82. }
  83. }
  84.     }
  85. }


run方法中调用了receiveAndExecute方法,方法名直译:接受并执行。

  1. package org.springframework.amqp.rabbit.listener;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.Arrays;
  5. import java.util.Collection;
  6. import java.util.HashSet;
  7. import java.util.Iterator;
  8. import java.util.List;
  9. import java.util.Set;
  10. import java.util.concurrent.BlockingQueue;
  11. import java.util.concurrent.CountDownLatch;
  12. import java.util.concurrent.LinkedBlockingQueue;
  13. import java.util.concurrent.TimeUnit;
  14. import java.util.concurrent.TimeoutException;
  15. import java.util.concurrent.atomic.AtomicLong;
  16. import java.util.concurrent.atomic.AtomicReference;
  17. import org.springframework.amqp.AmqpAuthenticationException;
  18. import org.springframework.amqp.AmqpConnectException;
  19. import org.springframework.amqp.AmqpException;
  20. import org.springframework.amqp.AmqpIOException;
  21. import org.springframework.amqp.AmqpIllegalStateException;
  22. import org.springframework.amqp.AmqpRejectAndDontRequeueException;
  23. import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
  24. import org.springframework.amqp.core.Message;
  25. import org.springframework.amqp.core.Queue;
  26. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  27. import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
  28. import org.springframework.amqp.rabbit.connection.ConsumerChannelRegistry;
  29. import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
  30. import org.springframework.amqp.rabbit.connection.RabbitUtils;
  31. import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
  32. import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException;
  33. import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException;
  34. import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
  35. import org.springframework.amqp.rabbit.support.ConsumerCancelledException;
  36. import org.springframework.amqp.rabbit.support.ListenerContainerAware;
  37. import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
  38. import org.springframework.jmx.export.annotation.ManagedMetric;
  39. import org.springframework.jmx.support.MetricType;
  40. import org.springframework.transaction.PlatformTransactionManager;
  41. import org.springframework.transaction.support.TransactionSynchronizationManager;
  42. import org.springframework.transaction.support.TransactionTemplate;
  43. import org.springframework.util.Assert;
  44. import org.springframework.util.backoff.BackOffExecution;
  45. import com.rabbitmq.client.Channel;
  46. import com.rabbitmq.client.PossibleAuthenticationFailureException;
  47. import com.rabbitmq.client.ShutdownSignalException;
  48. /**
  49. * @author Mark Pollack
  50. * @author Mark Fisher
  51. * @author Dave Syer
  52. * @author Gary Russell
  53. * @author Artem Bilan
  54. * @author Alex Panchenko
  55. *
  56. * @since 1.0
  57. */
  58. public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
  59.  
  60.  
  61.      //接受并执行
  62.     private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {
  63.  
  64.        //do接受并执行
  65.         return doReceiveAndExecute(consumer);
  66.     }
  67.      //do接受并执行
  68.     private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable {
  69.  
  70.         Channel channel = consumer.getChannel();
  71.         for (int i = 0; i < this.txSize; i++) {//txSize为一次事务接受的消息个数
  72.             //读取消息,这里阻塞的,但是有一个超时时间。
  73.             Message message = consumer.nextMessage(this.receiveTimeout);
  74.             if (message == null) {//阻塞超时
  75.                 break;
  76.             }
  77.             try {
  78.                 executeListener(channel, message);//消息接收已完成,现在开始处理消息。
  79.             }
  80.             catch (Exception e) {}
  81.         }
  82.         return consumer.commitIfNecessary(isChannelLocallyTransacted());
  83.     }
  84.         
  85.     
  86.     //处理消息开始。该方法在其父类中
  87.     protected void executeListener(Channel channel, Message messageIn) throws Exception {
  88.         try {
  89.             Message message = messageIn;
  90.             if (……) {
  91.                 //批处理信息,这个不研究
  92.             }else {
  93.                 invokeListener(channel, message);
  94.             }
  95.         }catch (Exception ex) {}
  96.     }
  97.     //在其父类中
  98.     protected void invokeListener(Channel channel, Message message) throws Exception {
  99.         //这里this.proxy.invokeListener最终会调用actualInvokeListener方法。
  100.         this.proxy.invokeListener(channel, message);
  101.     }
  102.     //在其父类中
  103.     protected void actualInvokeListener(Channel channel, Message message) throws Exception {
  104.         Object listener = getMessageListener();
  105.         if (listener instanceof ChannelAwareMessageListener) {
  106.             doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
  107.         }
  108.         else if (listener instanceof MessageListener) {
  109.             //……
  110.             doInvokeListener((MessageListener) listener, message)
  111.         }else{
  112.             //……
  113.         }
  114.         
  115.     }    
  116.  
  117.     protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message)
  118.             throws Exception {
  119.             Channel channelToUse = channel;
  120.             try {
  121.                 listener.onMessage(message, channelToUse);
  122.             }
  123.             catch (Exception e) {
  124.                 throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
  125.             }
  126.  
  127.     }
  128. }


在doInvokeListener方中最终调用了onMessage方法。

到这我知道了SimpleMessageListenerContainer内部类AsyncMessageProcessingConsumer被放入线程池中运行,是谁把AsyncMessageProcessingConsumer放入线程池中的呢?


4、RabbitListenerAnnotationBeanPostProcessor
RabbitListenerAnnotationBeanPostProcessor 听名得意:@RabbitListener注解的后置处理器。

  1. public class RabbitListenerAnnotationBeanPostProcessor implements BeanPostProcessor,SmartInitializingSingleton {
  2.  
  3.     //对接口BeanPostProcessor的实现。
  4.     @Override
  5.     public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
  6.         Class<?> targetClass = AopUtils.getTargetClass(bean);
  7.         final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
  8. //获取被@RabbitListener修饰的方法。
  9.         for (ListenerMethod lm : metadata.listenerMethods) {
  10. //获取方法上的@RabbitListener注解。
  11.             for (RabbitListener rabbitListener : lm.annotations) {
  12.                 //处理Amqp监听:
  13.                 //有这三个参数就以实现监听队列并调用方法。参数:rabbitListener为我们的方法上@RabbitListener注解。参数method和bean可以能过反射的方式调用我们的方法。
  14.                 processAmqpListener(rabbitListener, lm.method, bean, beanName);
  15.             }
  16.         }
  17.         return bean;
  18.     }
  19.     
  20.     //处理Amqp监听
  21.     protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
  22.         Method methodToUse = checkProxy(method, bean);
  23.         
  24.         //Endpoint为终端,像电脑、手机都是终端,他们都可以接受外部信息并响应,如手机来短信了就有提示。
  25.         //这里也用了终端的概念,被@RabbitListener注解修饰方法也有终端的特点可以接受外部信息并响应。
  26.         //MethodRabbitListenerEndpoint名子也很形象,叫方法监听终端。
  27.         MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
  28.         endpoint.setMethod(methodToUse);//终端接收到信息时,会调用 methodToUse
  29.         processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
  30.     }
  31.     //注册员
  32.     private final RabbitListenerEndpointRegistrar registrar = new RabbitListenerEndpointRegistrar();
  33.     
  34.     protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
  35.             Object adminTarget, String beanName) {
  36.         //下面是为终端注入一些属性
  37.         endpoint.setBean(bean);
  38.         endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
  39.         endpoint.setId(getEndpointId(rabbitListener));
  40.         endpoint.setQueueNames(resolveQueues(rabbitListener));
  41.         endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
  42.         endpoint.setBeanFactory(this.beanFactory);
  43.         endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
  44.  
  45.         ……
  46.  
  47.         RabbitListenerContainerFactory<?> factory = null;
  48.         String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
  49.         if (StringUtils.hasText(containerFactoryBeanName)) {
  50.             Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
  51.             try {
  52.                 factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
  53.             }
  54.             catch (NoSuchBeanDefinitionException ex) {
  55.                 throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
  56.                         adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
  57.                         containerFactoryBeanName + "' was found in the application context", ex);
  58.             }
  59.         }
  60.         //终端准备完成后,还要将终端进行注册。    
  61.         //注册员将终端注册到注册处
  62.         this.registrar.registerEndpoint(endpoint, factory);
  63.     }
  64.     
  65.     
  66.     
  67.  
  68.     
  69.     //对SmartInitializingSingleton的实现,该方法在bean的后置处理BeanPostProcessor之后调用。
  70.     @Override
  71.     public void afterSingletonsInstantiated() {
  72.         this.registrar.setBeanFactory(this.beanFactory);
  73.  
  74.         if (this.containerFactoryBeanName != null) {
  75.             //this.containerFactoryBeanName为“rabbitListenerContainerFactory”
  76.             this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
  77.         }
  78.  
  79.         // Actually register all listeners
  80.         this.registrar.afterPropertiesSet();
  81.  
  82.     }
  83.  
  84. }


 
    

RabbitListenerEndpointRegistrar :rabbit 监听器注册员

  1. //终端注册员
  2. public class RabbitListenerEndpointRegistrar   implements  InitializingBean{
  3.  
  4.     private RabbitListenerEndpointRegistry endpointRegistry;//终端注册处
  5.     //终端描述器集合
  6.     private final List<AmqpListenerEndpointDescriptor> endpointDescriptors =
  7.             new ArrayList<AmqpListenerEndpointDescriptor>();
  8.  
  9.     //注册员将终端注册到注册处
  10.     public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
  11.         
  12.         //将 endpoint和 factory 封装成 终端描述器
  13.         AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
  14.         synchronized (this.endpointDescriptors) {
  15.             if (this.startImmediately) { //马下注册
  16.                  //将 终端注册到 终端注册处
  17.                 this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
  18.                         resolveContainerFactory(descriptor), true);
  19.             }
  20.             else {//afterPropertiesSet()方法中统一注册
  21.                                    //放入终端描述器集合
  22.                 this.endpointDescriptors.add(descriptor);
  23.             }
  24.         }
  25.     }
  26.  
  27.      //InitializingBean接口的实现
  28.      @Override
  29.     public void afterPropertiesSet() {
  30.         //统一注册
  31.         registerAllEndpoints();
  32.     }
  33.     //统一注册
  34.     protected void registerAllEndpoints() {
  35.         synchronized (this.endpointDescriptors) {
  36.             for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
  37.                 this.endpointRegistry.registerListenerContainer(
  38.                         descriptor.endpoint, resolveContainerFactory(descriptor));
  39.             }
  40.         }
  41.     }
  42.     
  43.  
  44.  
  45.  
  46.  
  47. }

 RabbitListenerEndpointRegistry:注册处

  1. public class RabbitListenerEndpointRegistry  implements SmartLifecycle{
  2.  
  3.     private final Map<String, MessageListenerContainer> listenerContainers =
  4.             new ConcurrentHashMap<String, MessageListenerContainer>();
  5.     //注册终端
  6.     public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
  7.                 boolean startImmediately) {
  8.  
  9.         String id = endpoint.getId();
  10.         synchronized (this.listenerContainers) {
  11.             //创建 listenerContainer
  12.             MessageListenerContainer container = createListenerContainer(endpoint, factory);
  13.             this.listenerContainers.put(id, container);
  14.             ……
  15.  
  16.             if (startImmediately) {
  17.                 startIfNecessary(container);
  18.             }
  19.         }
  20.     }
  21.  
  22.         protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
  23.             RabbitListenerContainerFactory<?> factory) {
  24.         //调用RabbitListener容器工厂的createListenerContainer方法获取RabbitListener容器
  25.         MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
  26.  
  27.         return listenerContainer;
  28.     }
  29.  
  30.     //start 是对 smartLifecycle 接口的实现,由 spring 容器调用
  31.        //上面的的代码中已经创建了listenerContainer 并放入了listenerContainers集合中,现在要将集合中的 listenerContainer放入线程池中。
  32.         @Override
  33.     public void start() {
  34.         //遍历listenerContainers集合
  35.         for (MessageListenerContainer listenerContainer : getListenerContainers()) {
  36.             startIfNecessary(listenerContainer);
  37.         }
  38.     }
  39.     //调用集合中的listenerContainer的 start 方法。
  40.       private void startIfNecessary(MessageListenerContainer listenerContainer) {
  41.         if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
  42.             listenerContainer.start();
  43.         }
  44.     }
  45.  
  46. }


分析了RabbitListenerAnnotationBeanPostProcessor:@RabbitListener注解的后置处理器 。从读取被@RabbitListener修饰的方法 => 创建 endpoint =>创建MessageListenerContainer并放入集合中=>遍历集合中的MessageListenerContainer并调用 start方法。

MessageListenerContainer中的 start 方法做了什么?

其子类 SimpleMessageListenerContainer是我们学习的重点

  1. public interface MessageListenerContainer extends SmartLifecycle {
  2.  
  3.     /**
  4.      * Setup the message listener to use. Throws an {@link IllegalArgumentException}
  5.      * if that message listener type is not supported.
  6.      */
  7.     void setupMessageListener(Object messageListener);
  8.  
  9.     /**
  10.      * @return the {@link MessageConverter} that can be used to
  11.      * convert {@link org.springframework.amqp.core.Message}, if any.
  12.      */
  13.     MessageConverter getMessageConverter();
  14.  
  15. }


start 方法在其抽象子类 AbstractMessageListenerContainer 中实现了:

    //MessageListenerContainer 同样也继承了 SmartLifecycle接口,SmartLifecycle中的方法是由 Spring 容器调用的,这里我们手动调用了 start 方法,这意味这 start 会被调用两次。
   

  1. @Override
  2.     public void start() {
  3.         if (isRunning()) {//第二调用时会直接 return。
  4.             return;
  5.         }
  6.         
  7.         try {
  8.             doStart();
  9.         }
  10.         catch (Exception ex) {
  11.         }
  12.     }


doStart 方法子类会覆盖:

在子类:SimpleMessageListenerContainer中:

  1. @Override
  2.     protected void doStart() throws Exception {
  3.         ……
  4.         super.doStart();
  5.         synchronized (this.consumersMonitor) {
  6.             
  7.             int newConsumers = initializeConsumers();
  8.             
  9.             Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
  10.             for (BlockingQueueConsumer consumer : this.consumers) {
  11.                         //AsyncMessageProcessingConsumer是SimpleMessageListenerContainer的内部类
  12.                 AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
  13.                 processors.add(processor);
  14.                         //放入线程池
  15.                 getTaskExecutor().execute(processor);
  16.                 
  17.             }
  18.         
  19.         }
  20.     }


RabbitListenerAnnotationBeanPostProcessor调用SimpleMessageListenerContainer是AsyncMessageProcessingConsumer放入线程池中的。


参考链接:https://blog.csdn.net/hong10086/article/details/93672166

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/381570
推荐阅读
相关标签
  

闽ICP备14008679号