当前位置:   article > 正文

Spring RabbitMQ 源码分析_parameter 1 of method simplerabbitlistenercontaine

parameter 1 of method simplerabbitlistenercontainerfactory in org.springfram

目录

 

一、知识点

1、RabbitListenerEndpoint :

2、RabbitListenerContainerFactory

二、开始

1、HandlerMethod

2、ChannelAwareMessageListener 

3、SimpleMessageListenerContainer

4、RabbitListenerAnnotationBeanPostProcessor


一、知识点

1、RabbitListenerEndpoint :

Endpoint为终端,像电脑、手机都是终端,他们都可以接受外部信息并响应,如手机来短信了就有提示。这里也用了终端的概念,例如:被@RabbitListener注解修饰方法也有终端的特点 可以接受外部信息并响应。

  1. public interface RabbitListenerEndpoint {
  2. /**
  3. * the id of this endpoint
  4. */
  5. String getId();
  6. /**
  7. * the group of this endpoint or null if not in a group.
  8. */
  9. String getGroup();
  10. /**
  11. *the concurrency of this endpoint.
  12. */
  13. String getConcurrency();
  14. /**
  15. * Override of the default autoStartup property.
  16. */
  17. Boolean getAutoStartup();
  18. /**
  19. * Setup the specified message listener container with the model
  20. * defined by this endpoint.
  21. */
  22. void setupListenerContainer(MessageListenerContainer listenerContainer);
  23. }

这里的终端有很多种,刚才说到 『被@RabbitListener注解修饰方法也是终端』 就是 MethodRabbitListenerEnpoint 方法终端。

接口方法中 void setupListenerContainer(MessageListenerContainer listenerContainer) 方法,中的MessageListenerContainer就是用来接收并处理消息的。

 

  1. 在抽象类中AbstractRabbitListenerEndpoint对setupListenerContainer方法进了实现:
  2.     @Override
  3.     public void setupListenerContainer(MessageListenerContainer listenerContainer)
  4.       ……
  5.         setupMessageListener(listenerContainer);
  6.     }
  7.     private void setupMessageListener(MessageListenerContainer container) {
  8.      //MessageListenerContainer为容器,容器是要装东西的,这里显然是装MessageListener。
  9. //createMessageListener(container)为抽象方法,用于子类覆盖。
  10.         MessageListener messageListener = createMessageListener(container);
  11.         container.setupMessageListener(messageListener);
  12.         
  13.     }
  14.     
  15.   

子类MethodRabbitListenerEndpoint中覆盖了createMessageListener。

  1.     @Override
  2.     protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
  3. //创建 messageListener
  4.         MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
  5. //        
  6.         messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
  7.         
  8.         return messageListener;
  9.     }
  10.     protected MessagingMessageListenerAdapter createMessageListenerInstance() {
  11.         return new MessagingMessageListenerAdapter(this.bean, this.method, this.returnExceptions, this.errorHandler);
  12.     }
  13.     protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) {
  14.         InvocableHandlerMethod invocableHandlerMethod =
  15.                 this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod());
  16.         return new HandlerAdapter(invocableHandlerMethod);
  17.     }
  18.     
  19.     


2、RabbitListenerContainerFactory

RabbitListenerContainerFactory为:rabbit 监听器容器工厂。既然为 监听器容器工厂 那一定是生产监听器容器。

监听器容器工厂:

  1. public interface RabbitListenerContainerFactory<C extends MessageListenerContainer> {
  2. /**
  3. * Create a {@link MessageListenerContainer} for the given {@link RabbitListenerEndpoint}.
  4. */
  5. C createListenerContainer(RabbitListenerEndpoint endpoint);
  6. }

监听器容器: 

  1. public interface MessageListenerContainer extends SmartLifecycle {
  2. /**
  3. * Setup the message listener to use. Throws an {@link IllegalArgumentException}
  4. * if that message listener type is not supported.
  5. */
  6. void setupMessageListener(Object messageListener);
  7. /**
  8. * @return the {@link MessageConverter} that can be used to
  9. * convert {@link org.springframework.amqp.core.Message}, if any.
  10. */
  11. MessageConverter getMessageConverter();
  12. }

 监听器容器工厂:

 在其抽象子类AbstractRabbitListenerContainerFactory中对 createListenerContainer 方法已做了实现。

我常用的是其子类SimpleRabbitListenerContainerFactory

  1. public class SimpleRabbitListenerContainerFactory
  2. extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer> {}

SimpleRabbitListenerContainerFactory工厂生产的是SimpleMessageListenerContainer

  1. public class AbstractRabbitListenerContainerFactory<C extends AbstractMessageListenerContainer> implements RabbitListenerContainerFactory<C>{
  2. //在使用SimpleRabbitListenerContainerFactory时,下面的 C 就是 SimpleMessageListenerContainer
  3. @Override
  4. public C createListenerContainer(RabbitListenerEndpoint endpoint) {
  5. //创建SimpleMessageListenerContainer
  6. C instance = createContainerInstance();
  7. if (this.connectionFactory != null) {
  8. instance.setConnectionFactory(this.connectionFactory);
  9. }
  10. if (this.errorHandler != null) {
  11. instance.setErrorHandler(this.errorHandler);
  12. }
  13. if (this.messageConverter != null) {
  14. instance.setMessageConverter(this.messageConverter);
  15. }
  16. ……
  17. //setupListenerContainer方法参数为 MessageListenerContainer类型,
  18.          //SimpleMessageListenerContainer是MessageListenerContainer的子类。
  19. endpoint.setupListenerContainer(instance);
  20. if (instance.getMessageListener() instanceof AbstractAdaptableMessageListener) {
  21. AbstractAdaptableMessageListener messageListener = (AbstractAdaptableMessageListener) instance
  22. .getMessageListener();
  23. if (this.beforeSendReplyPostProcessors != null) {
  24. messageListener.setBeforeSendReplyPostProcessors(this.beforeSendReplyPostProcessors);
  25. }
  26. if (this.retryTemplate != null) {
  27. messageListener.setRetryTemplate(this.retryTemplate);
  28. if (this.recoveryCallback != null) {
  29. messageListener.setRecoveryCallback(this.recoveryCallback);
  30. }
  31. }
  32. }
  33. initializeContainer(instance, endpoint);
  34. return instance;
  35. }
  36. }

 

二、开始

 

1、HandlerMethod

  在被@RabbitListener标注的方法。该方法会被封装成HanderMethod。

  1. HandlerMethod{
  2.   private final Object bean;
  3.   private final Method method;
  4.   public HandlerMethod(Object bean, Method method) {
  5.   this.bean = bean;
  6.   this.method = method;
  7. }
  8. }

我们常用其子类InvocableHandlerMethod。该类提供一个Object invoke(Message<?> message, Object... providedArgs) 方法,
 可以通过该invoke方法调用handlerMethod 中的method:method.invoke(bean,args)

HandlerAdapter称为hander适配器,该适配器中有InvocableHandlerMethod、DelegatingInvocableHandler这两个hander方法。注:InvocableHandlerMethod上文刚说过。

  1. public class HandlerAdapter {
  2.   private final InvocableHandlerMethod invokerHandlerMethod;
  3.   private final DelegatingInvocableHandler delegatingHandler;
  4.   public Object invoke(Message<?> message, Object... providedArgs) throws Exception
  5. {
  6.   if (this.invokerHandlerMethod != null) {
  7.   //InvocableHandlerMethod不为null,就调用invokerHandlerMethod.invoke方法。
  8.   return this.invokerHandlerMethod.invoke(message, providedArgs);
  9. }else if (this.delegatingHandler.hasDefaultHandler()) {
  10. //……
  11. }else {
  12.   //……
  13. }
  14. }
  15. }

这里可以简单的理解调用HandlerAdapter.invoke方法可以间接调用@RabbitListener修饰的方法。

 

2、ChannelAwareMessageListener 

消费端接收到mq的消息会调用ChannelAwareMessageListener 接口中的onMessage方法。

  1. public interface ChannelAwareMessageListener {
  2. /**
  3. * 回调函数处理接受的消息
  4. */
  5. void onMessage(Message message, Channel channel) throws Exception;
  6. }

下面要说的是其子类:MessagingMessageListenerAdapter

  1. public class MessagingMessageListenerAdapter extends AbstractAdaptableMessageListener {
  2. private HandlerAdapter handlerMethod;
  3. @Override
  4. public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception {
  5. //ChannelAwareMessageListener.onMessage()方法中的message是org.springframework.amqp.core.Message类型。
  6. //这里转换成org.springframework.messaging.Message类型。
  7. Message<?> message = toMessagingMessage(amqpMessage);
  8. try {
  9. Object result = invokeHandler(amqpMessage, channel, message);
  10. if (result != null) {
  11. handleResult(result, amqpMessage, channel, message);
  12. }
  13. }
  14. catch () {}
  15. }
  16. private Object invokeHandler(org.springframework.amqp.core.Message amqpMessage, Channel channel,
  17. Message<?> message) {
  18. try {
  19. return this.handlerMethod.invoke(message, amqpMessage, channel);
  20. }
  21. catch () {}
  22. }
  23. }

onMessage->invokehandler->this.handlerMethod.invoke 。this.handlerMethod为 HandlerAdapter类型。上面刚说完:调用HandlerAdapter.invoke方法可以间接调用@RabbitListener修饰的方法

现在我们就已经梳洗清楚从onMessage到调用被@RabbitListener修饰的方法的整个流程

 

3、SimpleMessageListenerContainer

 

SimpleMessageListenerContainer 中有一个内部类AsyncMessageProcessingConsumer :

  1.  //当终端接受到信息时,一个线程会执行该run方法。
  2.     private final class AsyncMessageProcessingConsumer implements Runnable {
  3.         @Override
  4.         public void run() {
  5. //这个 while 可以无限的循环,
  6.             while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
  7.              boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
  8. }
  9.         }
  10.     }

run方法中调用了receiveAndExecute方法,方法名直译:接受并执行。

  1. SimpleMessageListenerContainer {
  2.      //接受并执行
  3.     private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {
  4.        //do接受并执行
  5.         return doReceiveAndExecute(consumer);
  6.     }
  7.      //do接受并执行
  8.     private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable {
  9.         Channel channel = consumer.getChannel();
  10.         for (int i = 0; i < this.txSize; i++) {//txSize为一次事务接受的消息个数
  11.             //读取消息,这里阻塞的,但是有一个超时时间。
  12.             Message message = consumer.nextMessage(this.receiveTimeout);
  13. if (message == null) {//阻塞超时
  14. break;
  15. }
  16.             try {
  17.                 executeListener(channel, message);//消息接收已完成,现在开始处理消息。
  18.             }
  19.             catch (Exception e) {}
  20.         }
  21.         return consumer.commitIfNecessary(isChannelLocallyTransacted());
  22.     }
  23.         
  24.     
  25.     //处理消息开始。该方法在其父类中
  26.     protected void executeListener(Channel channel, Message messageIn) throws Exception {
  27.         try {
  28.             Message message = messageIn;
  29.             if (……) {
  30.                 //批处理信息,这个不研究
  31.             }else {
  32.                 invokeListener(channel, message);
  33.             }
  34.         }catch (Exception ex) {}
  35.     }
  36.     //在其父类中
  37.     protected void invokeListener(Channel channel, Message message) throws Exception {
  38.         //这里this.proxy.invokeListener最终会调用actualInvokeListener方法。
  39.         this.proxy.invokeListener(channel, message);
  40.     }
  41.     //在其父类中
  42.     protected void actualInvokeListener(Channel channel, Message message) throws Exception {
  43.         Object listener = getMessageListener();
  44.         if (listener instanceof ChannelAwareMessageListener) {
  45.             doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
  46.         }
  47.         else if (listener instanceof MessageListener) {
  48.             //……
  49.             doInvokeListener((MessageListener) listener, message)
  50.         }else{
  51.             //……
  52.         }
  53.         
  54.     }    
  55.     protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message)
  56.             throws Exception {
  57.             Channel channelToUse = channel;
  58.             try {
  59.                 listener.onMessage(message, channelToUse);
  60.             }
  61.             catch (Exception e) {
  62.                 throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
  63.             }
  64.     }
  65. }

在doInvokeListener方中最终调用了onMessage方法。

到这我知道了SimpleMessageListenerContainer内部类AsyncMessageProcessingConsumer被放入线程池中运行,是谁把AsyncMessageProcessingConsumer放入线程池中的呢?


4、RabbitListenerAnnotationBeanPostProcessor

RabbitListenerAnnotationBeanPostProcessor 听名得意:@RabbitListener注解的后置处理器。

  1. public class RabbitListenerAnnotationBeanPostProcessor implements BeanPostProcessor,SmartInitializingSingleton {
  2.     //对接口BeanPostProcessor的实现。
  3.     @Override
  4.     public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
  5.         Class<?> targetClass = AopUtils.getTargetClass(bean);
  6.         final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
  7.         for (ListenerMethod lm : metadata.listenerMethods) {//获取被@RabbitListener修饰的方法。
  8.             for (RabbitListener rabbitListener : lm.annotations) {//获取方法上的@RabbitListener注解。
  9. //处理Amqp监听:
  10.                 //有这三个参数就以实现监听队列并调用方法。参数:rabbitListener为我们的方法上@RabbitListener注解。参数method和bean可以能过反射的方式调用我们的方法。
  11.                 processAmqpListener(rabbitListener, lm.method, bean, beanName);
  12.             }
  13.         }
  14.         return bean;
  15.     }
  16.     
  17.     //处理Amqp监听
  18.     protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
  19.         Method methodToUse = checkProxy(method, bean);
  20.         
  21.         //Endpoint为终端,像电脑、手机都是终端,他们都可以接受外部信息并响应,如手机来短信了就有提示。
  22.         //这里也用了终端的概念,被@RabbitListener注解修饰方法也有终端的特点可以接受外部信息并响应。
  23. //MethodRabbitListenerEndpoint名子也很形象,叫方法监听终端。
  24.         MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
  25.         endpoint.setMethod(methodToUse);//终端接收到信息时,会调用 methodToUse
  26.         processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
  27.     }
  28.     //注册员
  29.     private final RabbitListenerEndpointRegistrar registrar = new RabbitListenerEndpointRegistrar();
  30.     protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
  31.             Object adminTarget, String beanName) {
  32. //下面是为终端注入一些属性
  33.         endpoint.setBean(bean);
  34.         endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
  35.         endpoint.setId(getEndpointId(rabbitListener));
  36.         endpoint.setQueueNames(resolveQueues(rabbitListener));
  37.         endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
  38.         endpoint.setBeanFactory(this.beanFactory);
  39.         endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
  40. ……
  41. RabbitListenerContainerFactory<?> factory = null;
  42. String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
  43. if (StringUtils.hasText(containerFactoryBeanName)) {
  44. Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
  45. try {
  46. factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
  47. }
  48. catch (NoSuchBeanDefinitionException ex) {
  49. throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
  50. adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
  51. containerFactoryBeanName + "' was found in the application context", ex);
  52. }
  53. }
  54.         //终端准备完成后,还要将终端进行注册。
  55. //注册员将终端注册到注册处
  56.         this.registrar.registerEndpoint(endpoint, factory);
  57.     }
  58.     
  59.     
  60.     
  61.     
  62.     //对SmartInitializingSingleton的实现,该方法在bean的后置处理BeanPostProcessor之后调用。
  63.     @Override
  64.     public void afterSingletonsInstantiated() {
  65.         this.registrar.setBeanFactory(this.beanFactory);
  66.         if (this.containerFactoryBeanName != null) {
  67.             //this.containerFactoryBeanName为“rabbitListenerContainerFactory”
  68.             this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
  69.         }
  70.         // Actually register all listeners
  71.         this.registrar.afterPropertiesSet();
  72.     }
  73. }

    

RabbitListenerEndpointRegistrar :rabbit 监听器注册员

  1. //终端注册员
  2. public class RabbitListenerEndpointRegistrar implements  InitializingBean{
  3. private RabbitListenerEndpointRegistry endpointRegistry;//终端注册处
  4. //终端描述器集合
  5. private final List<AmqpListenerEndpointDescriptor> endpointDescriptors =
  6. new ArrayList<AmqpListenerEndpointDescriptor>();
  7. //注册员将终端注册到注册处
  8. public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
  9. //将 endpoint和 factory 封装成 终端描述器
  10. AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
  11. synchronized (this.endpointDescriptors) {
  12. if (this.startImmediately) { //马下注册
  13. //将 终端注册到 终端注册处
  14. this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
  15. resolveContainerFactory(descriptor), true);
  16. }
  17. else {//afterPropertiesSet()方法中统一注册
  18. //放入终端描述器集合
  19. this.endpointDescriptors.add(descriptor);
  20. }
  21. }
  22. }
  23. //InitializingBean接口的实现
  24.  @Override
  25.     public void afterPropertiesSet() {
  26. //统一注册
  27.         registerAllEndpoints();
  28.     }
  29.     //统一注册
  30.     protected void registerAllEndpoints() {
  31.         synchronized (this.endpointDescriptors) {
  32.             for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
  33.                 this.endpointRegistry.registerListenerContainer(
  34.                         descriptor.endpoint, resolveContainerFactory(descriptor));
  35.             }
  36.         }
  37.     }
  38.     
  39. }

 

RabbitListenerEndpointRegistry:注册处

  1. public class RabbitListenerEndpointRegistry implements SmartLifecycle{
  2. private final Map<String, MessageListenerContainer> listenerContainers =
  3. new ConcurrentHashMap<String, MessageListenerContainer>();
  4. //注册终端
  5. public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
  6. boolean startImmediately) {
  7. String id = endpoint.getId();
  8. synchronized (this.listenerContainers) {
  9. //创建 listenerContainer
  10. MessageListenerContainer container = createListenerContainer(endpoint, factory);
  11. this.listenerContainers.put(id, container);
  12. ……
  13. if (startImmediately) {
  14. startIfNecessary(container);
  15. }
  16. }
  17. }
  18. protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
  19.             RabbitListenerContainerFactory<?> factory) {
  20.         //调用RabbitListener容器工厂的createListenerContainer方法获取RabbitListener容器
  21.         MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
  22.         return listenerContainer;
  23.     }
  24. //start 是对 smartLifecycle 接口的实现,由 spring 容器调用
  25. //上面的的代码中已经创建了listenerContainer 并放入了listenerContainers集合中,现在要将集合中的 listenerContainer放入线程池中。
  26. @Override
  27. public void start() {
  28. //遍历listenerContainers集合
  29. for (MessageListenerContainer listenerContainer : getListenerContainers()) {
  30. startIfNecessary(listenerContainer);
  31. }
  32. }
  33. //调用集合中的listenerContainer的 start 方法。
  34. private void startIfNecessary(MessageListenerContainer listenerContainer) {
  35. if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
  36. listenerContainer.start();
  37. }
  38. }
  39. }

分析了RabbitListenerAnnotationBeanPostProcessor:@RabbitListener注解的后置处理器 。从读取被@RabbitListener修饰的方法 => 创建 endpoint =>创建MessageListenerContainer并放入集合中=>遍历集合中的MessageListenerContainer并调用 start方法。

 

MessageListenerContainer中的 start 方法做了什么?

其子类 SimpleMessageListenerContainer是我们学习的重点

  1. public interface MessageListenerContainer extends SmartLifecycle {
  2. /**
  3. * Setup the message listener to use. Throws an {@link IllegalArgumentException}
  4. * if that message listener type is not supported.
  5. */
  6. void setupMessageListener(Object messageListener);
  7. /**
  8. * @return the {@link MessageConverter} that can be used to
  9. * convert {@link org.springframework.amqp.core.Message}, if any.
  10. */
  11. MessageConverter getMessageConverter();
  12. }

start 方法在其抽象子类 AbstractMessageListenerContainer 中实现了:

  1. //MessageListenerContainer 同样也继承了 SmartLifecycle接口,SmartLifecycle中的方法是由 Spring 容器调用的,这里我们手动调用了 start 方法,这意味这 start 会被调用两次。
  2. @Override
  3. public void start() {
  4. if (isRunning()) {//第二调用时会直接 return。
  5. return;
  6. }
  7. try {
  8. doStart();
  9. }
  10. catch (Exception ex) {
  11. }
  12. }

doStart 方法子类会覆盖:

在子类:SimpleMessageListenerContainer中:

  1. @Override
  2. protected void doStart() throws Exception {
  3. ……
  4. super.doStart();
  5. synchronized (this.consumersMonitor) {
  6. int newConsumers = initializeConsumers();
  7. Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
  8. for (BlockingQueueConsumer consumer : this.consumers) {
  9. //AsyncMessageProcessingConsumer是SimpleMessageListenerContainer的内部类
  10. AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
  11. processors.add(processor);
  12. //放入线程池
  13. getTaskExecutor().execute(processor);
  14. }
  15. }
  16. }

RabbitListenerAnnotationBeanPostProcessor调用SimpleMessageListenerContainer是AsyncMessageProcessingConsumer放入线程池中的。

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/716863
推荐阅读
相关标签
  

闽ICP备14008679号