当前位置:   article > 正文

Spring Cloud Stream 源码解析_spring cloud stream源码解析

spring cloud stream源码解析

       Spring Cloud Stream 是一个消息驱动微服务的框架。
  应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
  通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQKafka

下面开始进行分析,首先引入pom文件。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>1.5.3.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>eureka.stream</groupId>
  12. <artifactId>stream</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>springstream</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. <spring-cloud.version>Brixton.SR5</spring-cloud.version>
  19. </properties>
  20. <dependencies>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter-web</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.cloud</groupId>
  27. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-starter-test</artifactId>
  32. <scope>test</scope>
  33. </dependency>
  34. <dependency>
  35. <groupId>junit</groupId>
  36. <artifactId>junit</artifactId>
  37. </dependency>
  38. </dependencies>
  39. <dependencyManagement>
  40. <dependencies>
  41. <dependency>
  42. <groupId>org.springframework.cloud</groupId>
  43. <artifactId>spring-cloud-dependencies</artifactId>
  44. <version>${spring-cloud.version}</version>
  45. <type>pom</type>
  46. <scope>import</scope>
  47. </dependency>
  48. </dependencies>
  49. </dependencyManagement>
  50. <build>
  51. <plugins>
  52. <plugin>
  53. <groupId>org.springframework.boot</groupId>
  54. <artifactId>spring-boot-maven-plugin</artifactId>
  55. </plugin>
  56. </plugins>
  57. </build>
  58. </project>

其中spring-cloud-starter-stream-rabbit就是引入的stream的框架,也可以支持spring-cloud-starter-stream-kafka。这里以rabbit做分析。

首先我们先创建一个简单的例子。

先创建消息input、output管道类StreamSendClient。

  1. import org.springframework.cloud.stream.annotation.Input;
  2. import org.springframework.cloud.stream.annotation.Output;
  3. import org.springframework.cloud.stream.messaging.Sink;
  4. import org.springframework.messaging.MessageChannel;
  5. import org.springframework.messaging.SubscribableChannel;
  6. public interface StreamSendClient {
  7. @Output("testMessage")
  8. MessageChannel output();
  9. @Input("testMessage")
  10. MessageChannel input();
  11. }

再创建一个消息处理类SinkReceiver。上面加上@EnableBinding注解。注解定义的类为StreamSendClient。

  1. @EnableBinding({StreamSendClient.class})
  2. public class SinkReceiver {
  3. @StreamListener("testMessage")
  4. public void reveive(Object payload){
  5. System.out.println("Received:" + payload);
  6. }
  7. }

创建启动类StreamApplication。

  1. @SpringBootApplication
  2. public class StreamApplication {
  3. public static void main(String[] args) {
  4. ConfigurableApplicationContext run = SpringApplication.run(StreamApplication.class, args);
  5. StreamSendClient streamClient = (StreamSendClient)run.getBean("com.springcloud.eurekaclient.StreamSendClient");
  6. streamClient.output().send(MessageBuilder.withPayload("from streamClient").build());
  7. }
  8. }

执行之后变可以在控制台发现打印信息:Received:from streamClient。同时也可以看到rabbitmq控制台中队列包含了testMessage。

下面开始分析。

首先启动类没有新添任何注解,在SinkReceiver上面有@EnableBinding注解。

  1. @Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. @Inherited
  5. @Configuration
  6. @Import({ChannelBindingServiceConfiguration.class, BindingBeansRegistrar.class, BinderFactoryConfiguration.class,
  7. SpelExpressionConverterConfiguration.class})
  8. @EnableIntegration
  9. public @interface EnableBinding {
  10. /**
  11. * A list of interfaces having methods annotated with {@link Input} and/or
  12. * {@link Output} to indicate bindable components.
  13. */
  14. Class<?>[] value() default {};
  15. }

可以知道:

1、该类是一个@Component类

2、该类导入了ChannelBindingServiceConfiguration.class, BindingBeansRegistrar.class, BinderFactoryConfiguration.class, SpelExpressionConverterConfiguration.class类。

3、开启了EnableIntegration注解。Spring Integration的定位是一种企业服务总线 ESB(Enterprise Service Bus),在Spring Integration中,通道被抽象成两种表现形式:PollableChannel和SubscribableChannel,都是继承了MessageChannel。

ChannelBindingServiceConfiguration类分析:

  1. @Configuration
  2. @EnableConfigurationProperties(ChannelBindingServiceProperties.class)
  3. public class ChannelBindingServiceConfiguration {
  4. private static final String ERROR_CHANNEL_NAME = "error";
  5. @Autowired
  6. private MessageBuilderFactory messageBuilderFactory;
  7. @Autowired(required = false)
  8. private ObjectMapper objectMapper;
  9. /**
  10. * User defined custom message converters
  11. */
  12. @Autowired(required = false)
  13. private List<AbstractFromMessageConverter> customMessageConverters;
  14. @Bean
  15. // This conditional is intentionally not in an autoconfig (usually a bad idea) because
  16. // it is used to detect a ChannelBindingService in the parent context (which we know
  17. // already exists).
  18. @ConditionalOnMissingBean(ChannelBindingService.class)
  19. public ChannelBindingService bindingService(ChannelBindingServiceProperties channelBindingServiceProperties,
  20. BinderFactory<MessageChannel> binderFactory) {
  21. return new ChannelBindingService(channelBindingServiceProperties, binderFactory);
  22. }
  23. @Bean
  24. public BindableChannelFactory channelFactory(CompositeMessageChannelConfigurer compositeMessageChannelConfigurer) {
  25. return new DefaultBindableChannelFactory(compositeMessageChannelConfigurer);
  26. }
  27. @Bean
  28. public CompositeMessageChannelConfigurer compositeMessageChannelConfigurer(
  29. MessageConverterConfigurer messageConverterConfigurer) {
  30. List<MessageChannelConfigurer> configurerList = new ArrayList<>();
  31. configurerList.add(messageConverterConfigurer);
  32. return new CompositeMessageChannelConfigurer(configurerList);
  33. }
  34. @Bean
  35. @DependsOn("bindingService")
  36. public OutputBindingLifecycle outputBindingLifecycle() {
  37. return new OutputBindingLifecycle();
  38. }
  39. @Bean
  40. @DependsOn("bindingService")
  41. public InputBindingLifecycle inputBindingLifecycle() {
  42. return new InputBindingLifecycle();
  43. }
  44. @Bean
  45. @DependsOn("bindingService")
  46. public ContextStartAfterRefreshListener contextStartAfterRefreshListener() {
  47. return new ContextStartAfterRefreshListener();
  48. }
  49. @Bean
  50. public static StreamListenerAnnotationBeanPostProcessor bindToAnnotationBeanPostProcessor(
  51. @Lazy BinderAwareChannelResolver binderAwareChannelResolver,
  52. @Lazy MessageHandlerMethodFactory messageHandlerMethodFactory) {
  53. return new StreamListenerAnnotationBeanPostProcessor(binderAwareChannelResolver,
  54. messageHandlerMethodFactory);
  55. }
  56. }

ChannelBindingServiceConfiguration装载了重要的Bean:

1、ChannelBindingService:负责创建生产者、消费者的MessageChannel,以及RabbitMQ中的交换器(Exchange)、Queue等。

2、inputBindingLifecycle、outputBindingLifecycle:主要负责启动后,调用ChannelBindingService进行创建。

3、StreamListenerAnnotationBeanPostProcessor:负责方法上有@StreamListener注解的方法和RabbitMQ消费channel创建关联关系。即当有rabbitmq消息推送过来,执行方法上有@StreamListener注解的方法。

BindingBeansRegistrar类分析:

BindingBeansRegistrar主要对实现@EnableBinding(StreamSendClient.class)中的class进行分析。

  1. public class BindingBeansRegistrar implements ImportBeanDefinitionRegistrar {
  2. @Override
  3. public void registerBeanDefinitions(AnnotationMetadata metadata,
  4. BeanDefinitionRegistry registry) {
  5. AnnotationAttributes attrs = AnnotatedElementUtils.getMergedAnnotationAttributes(
  6. ClassUtils.resolveClassName(metadata.getClassName(), null),
  7. EnableBinding.class);
  8. for (Class<?> type : collectClasses(attrs, metadata.getClassName())) {
  9. BindingBeanDefinitionRegistryUtils.registerChannelBeanDefinitions(type,
  10. type.getName(), registry);
  11. BindingBeanDefinitionRegistryUtils.registerChannelsQualifiedBeanDefinitions(
  12. ClassUtils.resolveClassName(metadata.getClassName(), null), type,
  13. registry);
  14. }
  15. }
  16. private Class<?>[] collectClasses(AnnotationAttributes attrs, String className) {
  17. EnableBinding enableBinding = AnnotationUtils.synthesizeAnnotation(attrs,
  18. EnableBinding.class, ClassUtils.resolveClassName(className, null));
  19. return enableBinding.value();
  20. }
  21. }

通过collectClasses方法,获取EnableBinding中的enableBinding.value(),也就是之前例子中的StreamSendClient类。之后的BindingBeanDefinitionRegistryUtils.registerChannelBeanDefinitions方法

  1. public static void registerChannelBeanDefinitions(Class<?> type,
  2. final String channelInterfaceBeanName, final BeanDefinitionRegistry registry) {
  3. ReflectionUtils.doWithMethods(type, new MethodCallback() {
  4. @Override
  5. public void doWith(Method method) throws IllegalArgumentException,
  6. IllegalAccessException {
  7. Input input = AnnotationUtils.findAnnotation(method, Input.class);
  8. if (input != null) {
  9. String name = getChannelName(input, method);
  10. registerInputChannelBeanDefinition(input.value(), name,
  11. channelInterfaceBeanName, method.getName(), registry);
  12. }
  13. Output output = AnnotationUtils.findAnnotation(method, Output.class);
  14. if (output != null) {
  15. String name = getChannelName(output, method);
  16. registerOutputChannelBeanDefinition(output.value(), name,
  17. channelInterfaceBeanName, method.getName(), registry);
  18. }
  19. }
  20. });
  21. }
  1. public static void registerChannelBeanDefinitions(Class<?> type,
  2. final String channelInterfaceBeanName, final BeanDefinitionRegistry registry) {
  3. ReflectionUtils.doWithMethods(type, new MethodCallback() {
  4. @Override
  5. public void doWith(Method method) throws IllegalArgumentException,
  6. IllegalAccessException {
  7. Input input = AnnotationUtils.findAnnotation(method, Input.class);
  8. if (input != null) {
  9. String name = getChannelName(input, method);
  10. registerInputChannelBeanDefinition(input.value(), name,
  11. channelInterfaceBeanName, method.getName(), registry);
  12. }
  13. Output output = AnnotationUtils.findAnnotation(method, Output.class);
  14. if (output != null) {
  15. String name = getChannelName(output, method);
  16. registerOutputChannelBeanDefinition(output.value(), name,
  17. channelInterfaceBeanName, method.getName(), registry);
  18. }
  19. }
  20. });
  21. }
  1. public static void registerInputChannelBeanDefinition(String qualifierValue,
  2. String name, String channelInterfaceBeanName,
  3. String channelInterfaceMethodName, BeanDefinitionRegistry registry) {
  4. registerChannelBeanDefinition(Input.class, qualifierValue, name,
  5. channelInterfaceBeanName, channelInterfaceMethodName, registry);
  6. }
  7. private static void registerChannelBeanDefinition(
  8. Class<? extends Annotation> qualifier, String qualifierValue, String name,
  9. String channelInterfaceBeanName, String channelInterfaceMethodName,
  10. BeanDefinitionRegistry registry) {
  11. RootBeanDefinition rootBeanDefinition = new RootBeanDefinition();
  12. rootBeanDefinition.setFactoryBeanName(channelInterfaceBeanName);
  13. rootBeanDefinition.setUniqueFactoryMethodName(channelInterfaceMethodName);
  14. rootBeanDefinition.addQualifier(new AutowireCandidateQualifier(qualifier,
  15. qualifierValue));
  16. registry.registerBeanDefinition(name, rootBeanDefinition);
  17. }

找到StreamSendClient类中的@input和@output注解,将两个方法注入成Bean。Bean的名称为StreamSendClient中@Input注解的值,也就是testMessage。并设置定义 BeanDefinition 的生成该Bean的工厂类为StreamSendClient,生成该Bean(testMessage)的方法setUniqueFactoryMethodName为input()。

BindingBeanDefinitionRegistryUtils.registerChannelsQualifiedBeanDefinitions方法:

  1. public static void registerChannelsQualifiedBeanDefinitions(Class<?> parent,
  2. Class<?> type, final BeanDefinitionRegistry registry) {
  3. if (type.isInterface()) {
  4. RootBeanDefinition rootBeanDefinition = new RootBeanDefinition(
  5. BindableProxyFactory.class);
  6. rootBeanDefinition.addQualifier(new AutowireCandidateQualifier(
  7. Bindings.class, parent));
  8. rootBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(
  9. type);
  10. registry.registerBeanDefinition(type.getName(), rootBeanDefinition);
  11. }
  12. else {
  13. RootBeanDefinition rootBeanDefinition = new RootBeanDefinition(type);
  14. rootBeanDefinition.addQualifier(new AutowireCandidateQualifier(
  15. Bindings.class, parent));
  16. registry.registerBeanDefinition(type.getName(), rootBeanDefinition);
  17. }
  18. }

注入一个beanName为StreamSendClient,Qualifier名为Bindings,BeanClass为BindableProxyFactory(Bindable)类型的Bean对象。BindableProxyFactory实现了Bindable接口。

我们这里在看一下BindableProxyFactory源码。

  1. public class BindableProxyFactory implements MethodInterceptor, FactoryBean<Object>, Bindable, InitializingBean {
  2. private Map<String, ChannelHolder> inputHolders = new HashMap<>();
  3. private Map<String, ChannelHolder> outputHolders = new HashMap<>();
  4. //实现了动态代理,所以当获取input和output方法获取channel时,会通过这里获得
  5. public synchronized Object invoke(MethodInvocation invocation) throws Throwable {
  6. MessageChannel messageChannel = null;
  7. Method method = invocation.getMethod();
  8. if (MessageChannel.class.isAssignableFrom(method.getReturnType())) {
  9. Input input = AnnotationUtils.findAnnotation(method, Input.class);
  10. if (input != null) {
  11. String name = BindingBeanDefinitionRegistryUtils.getChannelName(input, method);
  12. messageChannel = this.inputHolders.get(name).getMessageChannel();
  13. }
  14. Output output = AnnotationUtils.findAnnotation(method, Output.class);
  15. if (output != null) {
  16. String name = BindingBeanDefinitionRegistryUtils.getChannelName(output, method);
  17. messageChannel = this.outputHolders.get(name).getMessageChannel();
  18. }
  19. }
  20. //ignore
  21. return messageChannel;
  22. }
  23. //实现了InitializingBean,在Bean生成后,会将input和output两个注解生成对应的channel
  24. //类,默认是DirectChannel类
  25. public void afterPropertiesSet() throws Exception {
  26. ReflectionUtils.doWithMethods(type, new ReflectionUtils.MethodCallback() {
  27. @Override
  28. public void doWith(Method method) throws IllegalArgumentException {
  29. Assert.notNull(channelFactory, "Channel Factory cannot be null");
  30. Input input = AnnotationUtils.findAnnotation(method, Input.class);
  31. if (input != null) {
  32. String name = BindingBeanDefinitionRegistryUtils.getChannelName(input, method);
  33. validateChannelType(method.getReturnType());
  34. MessageChannel sharedChannel = locateSharedChannel(name);
  35. if (sharedChannel == null) {
  36. inputHolders.put(name, new ChannelHolder(channelFactory.createSubscribableChannel(name), true));
  37. }
  38. else {
  39. inputHolders.put(name, new ChannelHolder(sharedChannel, false));
  40. }
  41. }
  42. }
  43. });
  44. ReflectionUtils.doWithMethods(type, new ReflectionUtils.MethodCallback() {
  45. @Override
  46. public void doWith(Method method) throws IllegalArgumentException {
  47. Output output = AnnotationUtils.findAnnotation(method, Output.class);
  48. if (output != null) {
  49. String name = BindingBeanDefinitionRegistryUtils.getChannelName(output, method);
  50. validateChannelType(method.getReturnType());
  51. MessageChannel sharedChannel = locateSharedChannel(name);
  52. if (sharedChannel == null) {
  53. outputHolders.put(name, new ChannelHolder(channelFactory.createSubscribableChannel(name), true));
  54. }
  55. else {
  56. outputHolders.put(name, new ChannelHolder(sharedChannel, false));
  57. }
  58. }
  59. }
  60. });
  61. }
  62. private void validateChannelType(Class<?> channelType) {
  63. Assert.isTrue(SubscribableChannel.class.equals(channelType) || MessageChannel.class.equals(channelType),
  64. "A bound channel should be either a '" + MessageChannel.class.getName() + "', " +
  65. " or a '" + SubscribableChannel.class.getName() + "'");
  66. }
  67. private MessageChannel locateSharedChannel(String name) {
  68. return this.sharedChannelRegistry != null ?
  69. this.sharedChannelRegistry.get(getNamespacePrefixedChannelName(name)) : null;
  70. }
  71. private String getNamespacePrefixedChannelName(String name) {
  72. return this.channelNamespace + "." + name;
  73. }
  74. @Override
  75. public synchronized Object getObject() throws Exception {
  76. if (this.proxy == null) {
  77. ProxyFactory factory = new ProxyFactory(this.type, this);
  78. this.proxy = factory.getProxy();
  79. }
  80. return this.proxy;
  81. }
  82. @Override
  83. public Class<?> getObjectType() {
  84. return this.type;
  85. }
  86. @Override
  87. public boolean isSingleton() {
  88. return true;
  89. }
  90. public void bindInputs(ChannelBindingService channelBindingService) {
  91. if (log.isDebugEnabled()) {
  92. log.debug(String.format("Binding inputs for %s:%s", this.channelNamespace, this.type));
  93. }
  94. for (Map.Entry<String, ChannelHolder> channelHolderEntry : this.inputHolders.entrySet()) {
  95. String inputChannelName = channelHolderEntry.getKey();
  96. ChannelHolder channelHolder = channelHolderEntry.getValue();
  97. if (channelHolder.isBindable()) {
  98. if (log.isDebugEnabled()) {
  99. log.debug(String.format("Binding %s:%s:%s", this.channelNamespace, this.type, inputChannelName));
  100. }
  101. channelBindingService.bindConsumer(channelHolder.getMessageChannel(), inputChannelName);
  102. }
  103. }
  104. }
  105. @Override
  106. public void bindOutputs(ChannelBindingService channelBindingService) {
  107. if (log.isDebugEnabled()) {
  108. log.debug(String.format("Binding outputs for %s:%s", this.channelNamespace, this.type));
  109. }
  110. for (Map.Entry<String, ChannelHolder> channelHolderEntry : this.outputHolders.entrySet()) {
  111. ChannelHolder channelHolder = channelHolderEntry.getValue();
  112. String outputChannelName = channelHolderEntry.getKey();
  113. if (channelHolderEntry.getValue().isBindable()) {
  114. if (log.isDebugEnabled()) {
  115. log.debug(String.format("Binding %s:%s:%s", this.channelNamespace, this.type, outputChannelName));
  116. }
  117. channelBindingService.bindProducer(channelHolder.getMessageChannel(), outputChannelName);
  118. }
  119. }
  120. }
  121. @Override
  122. public void unbindInputs(ChannelBindingService channelBindingService) {
  123. if (log.isDebugEnabled()) {
  124. log.debug(String.format("Unbinding inputs for %s:%s", this.channelNamespace, this.type));
  125. }
  126. for (Map.Entry<String, ChannelHolder> channelHolderEntry : this.inputHolders.entrySet()) {
  127. if (channelHolderEntry.getValue().isBindable()) {
  128. if (log.isDebugEnabled()) {
  129. log.debug(String.format("Unbinding %s:%s:%s", this.channelNamespace, this.type, channelHolderEntry.getKey()));
  130. }
  131. channelBindingService.unbindConsumers(channelHolderEntry.getKey());
  132. }
  133. }
  134. }
  135. @Override
  136. public void unbindOutputs(ChannelBindingService channelBindingService) {
  137. if (log.isDebugEnabled()) {
  138. log.debug(String.format("Unbinding outputs for %s:%s", this.channelNamespace, this.type));
  139. }
  140. for (Map.Entry<String, ChannelHolder> channelHolderEntry : this.outputHolders.entrySet()) {
  141. if (channelHolderEntry.getValue().isBindable()) {
  142. if (log.isDebugEnabled()) {
  143. log.debug(String.format("Binding %s:%s:%s", this.channelNamespace, this.type, channelHolderEntry.getKey()));
  144. }
  145. channelBindingService.unbindProducers(channelHolderEntry.getKey());
  146. }
  147. }
  148. }

BinderFactoryConfiguration类分析

  1. @Configuration
  2. public class BinderFactoryConfiguration {
  3. @Bean
  4. @ConditionalOnMissingBean(BinderFactory.class)
  5. public BinderFactory<?> binderFactory(BinderTypeRegistry binderTypeRegistry,
  6. ChannelBindingServiceProperties channelBindingServiceProperties) {
  7. Map<String, BinderConfiguration> binderConfigurations = new HashMap<>();
  8. Map<String, BinderProperties> declaredBinders = channelBindingServiceProperties.getBinders();
  9. boolean defaultCandidatesExist = false;
  10. Iterator<Map.Entry<String, BinderProperties>> binderPropertiesIterator = declaredBinders.entrySet().iterator();
  11. while (!defaultCandidatesExist && binderPropertiesIterator.hasNext()) {
  12. defaultCandidatesExist = binderPropertiesIterator.next().getValue().isDefaultCandidate();
  13. }
  14. for (Map.Entry<String, BinderProperties> binderEntry : declaredBinders.entrySet()) {
  15. BinderProperties binderProperties = binderEntry.getValue();
  16. if (binderTypeRegistry.get(binderEntry.getKey()) != null) {
  17. binderConfigurations.put(binderEntry.getKey(),
  18. new BinderConfiguration(binderTypeRegistry.get(binderEntry.getKey()),
  19. binderProperties.getEnvironment(), binderProperties.isInheritEnvironment(),
  20. binderProperties.isDefaultCandidate()));
  21. }
  22. else {
  23. Assert.hasText(binderProperties.getType(),
  24. "No 'type' property present for custom binder " + binderEntry.getKey());
  25. BinderType binderType = binderTypeRegistry.get(binderProperties.getType());
  26. Assert.notNull(binderType, "Binder type " + binderProperties.getType() + " is not defined");
  27. binderConfigurations.put(binderEntry.getKey(),
  28. new BinderConfiguration(binderType, binderProperties.getEnvironment(),
  29. binderProperties.isInheritEnvironment(), binderProperties.isDefaultCandidate()));
  30. }
  31. }
  32. if (!defaultCandidatesExist) {
  33. for (Map.Entry<String, BinderType> entry : binderTypeRegistry.getAll().entrySet()) {
  34. binderConfigurations.put(entry.getKey(),
  35. new BinderConfiguration(entry.getValue(), new Properties(), true, true));
  36. }
  37. }
  38. DefaultBinderFactory<?> binderFactory = new DefaultBinderFactory<>(binderConfigurations);
  39. binderFactory.setDefaultBinder(channelBindingServiceProperties.getDefaultBinder());
  40. return binderFactory;
  41. }
  42. }

主要就是创建一个DefaultBinderFactory的工厂。

ChannelBindingServiceConfiguration, BindingBeansRegistrar, BinderFactoryConfiguration三个装载Bean的内容大概介绍完毕了,现在开始说一下加载过程:

1、ChannelBindingServiceConfiguration类加载的Bean对象outputBindingLifecycle,inputBindingLifecycle。我们拿inputBindingLifecycle做分析,outputBindingLifecycle类似。

  1. @Bean
  2. @DependsOn("bindingService")
  3. public OutputBindingLifecycle outputBindingLifecycle() {
  4. return new OutputBindingLifecycle();
  5. }
  6. @Bean
  7. @DependsOn("bindingService")
  8. public InputBindingLifecycle inputBindingLifecycle() {
  9. return new InputBindingLifecycle();
  10. }

inputBindingLifecycle类实现了SmartLifecycle接口,在spring启动后会执行start方法。

  1. public class InputBindingLifecycle implements SmartLifecycle, ApplicationContextAware {
  2. public void start() {
  3. if (!running) {
  4. // retrieve the ChannelBindingService lazily, avoiding early initialization
  5. try {
  6. ChannelBindingService channelBindingService = this.applicationContext
  7. .getBean(ChannelBindingService.class);
  8. Map<String, Bindable> bindables = this.applicationContext
  9. .getBeansOfType(Bindable.class);
  10. for (Bindable bindable : bindables.values()) {
  11. //bindables.values即为@@EnableBinding({StreamSendClient.class})类,BeanClass为BindableProxyFactory
  12. bindable.bindInputs(channelBindingService);
  13. }
  14. }
  15. catch (BeansException e) {
  16. throw new IllegalStateException(
  17. "Cannot perform binding, no proper implementation found", e);
  18. }
  19. this.running = true;
  20. }
  21. }
  22. }
  23. BindableProxyFactory.bindInputs方法如下:
  24. public void bindInputs(ChannelBindingService channelBindingService) {
  25. if (log.isDebugEnabled()) {
  26. log.debug(String.format("Binding inputs for %s:%s", this.channelNamespace, this.type));
  27. }
  28. for (Map.Entry<String, ChannelHolder> channelHolderEntry : this.inputHolders.entrySet()) {
  29. String inputChannelName = channelHolderEntry.getKey();
  30. ChannelHolder channelHolder = channelHolderEntry.getValue();
  31. if (channelHolder.isBindable()) {
  32. if (log.isDebugEnabled()) {
  33. log.debug(String.format("Binding %s:%s:%s", this.channelNamespace, this.type, inputChannelName));
  34. }
  35. //这里继续进入
  36. channelBindingService.bindConsumer(channelHolder.getMessageChannel(), inputChannelName);
  37. }
  38. }
  39. public Collection<Binding<MessageChannel>> bindConsumer(MessageChannel inputChannel, String inputChannelName) {
  40. ....................
  41. validate(consumerProperties);
  42. for (String target : channelBindingTargets) {
  43. //继续进入binder.bindConsumer方法
  44. Binding<MessageChannel> binding = binder.bindConsumer(target, channelBindingServiceProperties.getGroup(inputChannelName), inputChannel, consumerProperties);
  45. bindings.add(binding);
  46. }
  47. this.consumerBindings.put(inputChannelName, bindings);
  48. return bindings;
  49. }
  50. }
  51. 会进入RabbitMessageChannelBinder.bindConsumer方法
  52. public Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel,
  53. ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
  54. String prefix = properties.getExtension().getPrefix();
  55. String exchangeName = applyPrefix(prefix, name);
  56. TopicExchange exchange = new TopicExchange(exchangeName);
  57. //创建交换器
  58. declareExchange(exchangeName, exchange);
  59. String queueName = applyPrefix(prefix, baseQueueName);
  60. boolean partitioned = !anonymousConsumer && properties.isPartitioned();
  61. boolean durable = !anonymousConsumer && properties.getExtension().isDurableSubscription();
  62. Queue queue;
  63. ......................
  64. //创建队列
  65. declareQueue(queueName, queue);
  66. if (partitioned) {
  67. String bindingKey = String.format("%s-%d", name, properties.getInstanceIndex());
  68. declareBinding(queue.getName(), BindingBuilder.bind(queue).to(exchange).with(bindingKey));
  69. }
  70. else {
  71. declareBinding(queue.getName(), BindingBuilder.bind(queue).to(exchange).with("#"));
  72. }
  73. Binding<MessageChannel> binding = doRegisterConsumer(baseQueueName, group, inputChannel, queue, properties);
  74. .................
  75. return binding;
  76. }

可以看到通过inputBindingLifecycle创建了RabbitMq的交换器(Exchange)和队列。

同理通过outputBindingLifecycle启动后会创建生产者。

2、ChannelBindingServiceConfiguration类加载的Bean对象StreamListenerAnnotationBeanPostProcessor。StreamListenerAnnotationBeanPostProcessor实现了BeanPostProcessor接口。会执行postProcessAfterInitialization方法。

  1. public class StreamListenerAnnotationBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware, SmartInitializingSingleton {
  2. public Object postProcessAfterInitialization(final Object bean, String beanName) throws BeansException {
  3. Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
  4. ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() {
  5. @Override
  6. public void doWith(final Method method) throws IllegalArgumentException, IllegalAccessException {
  7. // 步骤1
  8. StreamListener streamListener = AnnotationUtils.findAnnotation(method, StreamListener.class);
  9. if (streamListener != null) {
  10. Method targetMethod = checkProxy(method, bean);
  11. Assert.hasText(streamListener.value(), "The binding name cannot be null");
  12. //步骤2
  13. final InvocableHandlerMethod invocableHandlerMethod = messageHandlerMethodFactory.createInvocableHandlerMethod(bean, targetMethod);
  14. if (!StringUtils.hasText(streamListener.value())) {
  15. throw new BeanInitializationException("A bound component name must be specified");
  16. }
  17. if (mappedBindings.containsKey(streamListener.value())) {
  18. throw new BeanInitializationException("Duplicate @" + StreamListener.class.getSimpleName() +
  19. " mapping for '" + streamListener.value() + "' on " + invocableHandlerMethod.getShortLogMessage() +
  20. " already existing for " + mappedBindings.get(streamListener.value()).getShortLogMessage());
  21. }
  22. mappedBindings.put(streamListener.value(), invocableHandlerMethod);
  23. //步骤3
  24. SubscribableChannel channel = applicationContext.getBean(streamListener.value(),
  25. SubscribableChannel.class);
  26. final String defaultOutputChannel = extractDefaultOutput(method);
  27. if (invocableHandlerMethod.isVoid()) {
  28. Assert.isTrue(StringUtils.isEmpty(defaultOutputChannel), "An output channel cannot be specified for a method that " +
  29. "does not return a value");
  30. }
  31. else {
  32. Assert.isTrue(!StringUtils.isEmpty(defaultOutputChannel), "An output channel must be specified for a method that " +
  33. "can return a value");
  34. }
  35. //步骤4
  36. StreamListenerMessageHandler handler = new StreamListenerMessageHandler(invocableHandlerMethod);
  37. handler.setApplicationContext(applicationContext);
  38. handler.setChannelResolver(binderAwareChannelResolver);
  39. if (!StringUtils.isEmpty(defaultOutputChannel)) {
  40. handler.setOutputChannelName(defaultOutputChannel);
  41. }
  42. handler.afterPropertiesSet();
  43. //步骤5
  44. channel.subscribe(handler);
  45. }
  46. }
  47. });
  48. return bean;
  49. }
  50. }

postProcessAfterInitialization方法过程:

1、会将包含@StreamListener("testMessage")的方法找出来

2、创建一个InvocableHandlerMethod代理类,代理类执行我们创建的具体方法。

3、streamListener.value()的值为testMessage,该Bean实际获取的是工厂类为StreamSendClient,方法为input()获得的Bean对象testMessage。生成该Bean的具体方式上面已经说过在BindingBeanDefinitionRegistryUtils.registerChannelBeanDefinitions方法。将之前由BindableProxyFactory对象通过afterPropertiesSet方法生成的@input、@output注解对应的SubscribableChannel Bean查找出来。

PS:获取testMessage Bean对象时,先找StreamSendClient类,在调用input方法。因为前面设置了rootBeanDefinition.setFactoryBeanName(StreamSendClient);

rootBeanDefinition.setUniqueFactoryMethodName(input);

4、创建一个StreamListenerMessageHandler对象,构造方法中InvocableHandlerMethod作为入参。

5、将SubscribableChannel添加订阅StreamListenerMessageHandler。

这样当消息传过来的时候,就会由StreamListenerMessageHandler中的InvocableHandlerMethod,找到具体方法去处理了。

========分割线-Rabbit消息找对应方法源码深度解析=====================

我们知道刚才RabbitMessageChannelBinder.bindConsumer方法中

  1. RabbitMessageChannelBinder.bindConsumer方法
  2. public Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel,
  3. ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
  4. String prefix = properties.getExtension().getPrefix();
  5. String exchangeName = applyPrefix(prefix, name);
  6. TopicExchange exchange = new TopicExchange(exchangeName);
  7. //创建交换器
  8. declareExchange(exchangeName, exchange);
  9. String queueName = applyPrefix(prefix, baseQueueName);
  10. boolean partitioned = !anonymousConsumer && properties.isPartitioned();
  11. boolean durable = !anonymousConsumer && properties.getExtension().isDurableSubscription();
  12. Queue queue;
  13. ......................
  14. //创建队列
  15. declareQueue(queueName, queue);
  16. if (partitioned) {
  17. String bindingKey = String.format("%s-%d", name, properties.getInstanceIndex());
  18. declareBinding(queue.getName(), BindingBuilder.bind(queue).to(exchange).with(bindingKey));
  19. }
  20. else {
  21. declareBinding(queue.getName(), BindingBuilder.bind(queue).to(exchange).with("#"));
  22. }
  23. Binding<MessageChannel> binding = doRegisterConsumer(baseQueueName, group, inputChannel, queue, properties);
  24. .................
  25. return binding;
  26. }

最后这句doRegisterConsumer方法就是找到RabbitMq消息对应处理方法的地方。

  1. private Binding<MessageChannel> doRegisterConsumer(final String name, String group, MessageChannel moduleInputChannel, Queue queue,
  2. final ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
  3. DefaultBinding<MessageChannel> consumerBinding;
  4. SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(
  5. this.connectionFactory);
  6. listenerContainer.setAcknowledgeMode(properties.getExtension().getAcknowledgeMode());
  7. listenerContainer.setChannelTransacted(properties.getExtension().isTransacted());
  8. listenerContainer.setDefaultRequeueRejected(properties.getExtension().isRequeueRejected());
  9. int concurrency = properties.getConcurrency();
  10. concurrency = concurrency > 0 ? concurrency : 1;
  11. listenerContainer.setConcurrentConsumers(concurrency);
  12. int maxConcurrency = properties.getExtension().getMaxConcurrency();
  13. if (maxConcurrency > concurrency) {
  14. listenerContainer.setMaxConcurrentConsumers(maxConcurrency);
  15. }
  16. listenerContainer.setPrefetchCount(properties.getExtension().getPrefetch());
  17. listenerContainer.setRecoveryInterval(properties.getExtension().getRecoveryInterval());
  18. listenerContainer.setTxSize(properties.getExtension().getTxSize());
  19. listenerContainer.setTaskExecutor(new SimpleAsyncTaskExecutor(queue.getName() + "-"));
  20. listenerContainer.setQueues(queue);
  21. int maxAttempts = properties.getMaxAttempts();
  22. if (maxAttempts > 1 || properties.getExtension().isRepublishToDlq()) {
  23. RetryOperationsInterceptor retryInterceptor = RetryInterceptorBuilder.stateless()
  24. .maxAttempts(maxAttempts)
  25. .backOffOptions(properties.getBackOffInitialInterval(),
  26. properties.getBackOffMultiplier(),
  27. properties.getBackOffMaxInterval())
  28. .recoverer(determineRecoverer(name, properties.getExtension().getPrefix(), properties.getExtension().isRepublishToDlq()))
  29. .build();
  30. listenerContainer.setAdviceChain(new Advice[] { retryInterceptor });
  31. }
  32. listenerContainer.setAfterReceivePostProcessors(this.decompressingPostProcessor);
  33. listenerContainer.setMessagePropertiesConverter(RabbitMessageChannelBinder.inboundMessagePropertiesConverter);
  34. listenerContainer.afterPropertiesSet();
  35. AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
  36. adapter.setBeanFactory(this.getBeanFactory());
  37. DirectChannel bridgeToModuleChannel = new DirectChannel();
  38. bridgeToModuleChannel.setBeanFactory(this.getBeanFactory());
  39. bridgeToModuleChannel.setBeanName(name + ".bridge");
  40. adapter.setOutputChannel(bridgeToModuleChannel);
  41. adapter.setBeanName("inbound." + name);
  42. DefaultAmqpHeaderMapper mapper = new DefaultAmqpHeaderMapper();
  43. mapper.setRequestHeaderNames(properties.getExtension().getRequestHeaderPatterns());
  44. mapper.setReplyHeaderNames(properties.getExtension().getReplyHeaderPatterns());
  45. adapter.setHeaderMapper(mapper);
  46. adapter.afterPropertiesSet();
  47. consumerBinding = new DefaultBinding<MessageChannel>(name, group, moduleInputChannel, adapter) {
  48. @Override
  49. protected void afterUnbind() {
  50. cleanAutoDeclareContext(properties.getExtension().getPrefix(), name);
  51. }
  52. };
  53. ReceivingHandler convertingBridge = new ReceivingHandler();
  54. convertingBridge.setOutputChannel(moduleInputChannel);
  55. convertingBridge.setBeanName(name + ".convert.bridge");
  56. convertingBridge.afterPropertiesSet();
  57. bridgeToModuleChannel.subscribe(convertingBridge);
  58. adapter.start();
  59. return consumerBinding;
  60. }

1、可以看到Spring Stream的创建了SimpleMessageListenerContainer,该类的作用就是RabbitMQ接收消息并让对应方法处理,在SpringRabbit中是找到@RabbitListener注解,详细分析可以看之前的SpringBoot整合Rabbit的文章。

这里我们只需要知道SimpleMessageListenerContainer是负责和RabbitMQ和本地监听方法交互的一个容器就行。

2、接着创建了一个AmqpInboundChannelAdapter对象,入参是SimpleMessageListenerContainer。并创建了一个名为bridgeToModuleChannel的DirectChannel对象,设置Adapter的OutputChannel为DirectChannel。之后调用adapter的afterPropertiesSet方法。afterPropertiesSet方法会调用自身的onInit方法。

  1. protected void onInit() {
  2. this.messageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
  3. @Override
  4. public void onMessage(Message message, Channel channel) throws Exception {
  5. Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
  6. Map<String, Object> headers =
  7. AmqpInboundChannelAdapter.this.headerMapper.toHeadersFromRequest(message.getMessageProperties());
  8. if (AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode()
  9. == AcknowledgeMode.MANUAL) {
  10. headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
  11. headers.put(AmqpHeaders.CHANNEL, channel);
  12. }
  13. sendMessage(getMessageBuilderFactory().withPayload(payload).copyHeaders(headers).build());
  14. }
  15. });
  16. this.messageListenerContainer.afterPropertiesSet();
  17. super.onInit();
  18. }

onInit方法可以看到,将SimpleMessageListenerContainer的消息监听设置为实现ChannelAwareMessageListener接口的自定义方法。在SpringBoot整合RabbitMQ中,setMessageListener的对象为MessagingMessageListenerAdapter。

3、创建一个ReceivingHandler对象,ReceivingHandler对象实现了MessageHandler接口。并设置ReceivingHandler的OutputChannel为BindableProxyFactory创建的messagechannel。之前已经分析了,在StreamListenerAnnotationBeanPostProcessor对象的postProcessAfterInitialization中,已经将该messagechannel添加了StreamListenerMessageHandler,StreamListenerMessageHandler中又创建了具体的处理Bean和Method。

4、将bridgeToModuleChannel添加观察者ReceivingHandler。

下面分析当一条MQ消息到来时经历的步骤:

1、首先会调用SimpleMessageListenerContainer的onMessage方法,详细分析可以看之前的SpringBoot整合Rabbit的文章。

2、onMessage会调用sendMessage方法。

  1. public void onMessage(Message message, Channel channel) throws Exception {
  2. Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
  3. Map<String, Object> headers =
  4. AmqpInboundChannelAdapter.this.headerMapper.toHeadersFromRequest(message.getMessageProperties());
  5. if (AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode()
  6. == AcknowledgeMode.MANUAL) {
  7. headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
  8. headers.put(AmqpHeaders.CHANNEL, channel);
  9. }
  10. sendMessage(getMessageBuilderFactory().withPayload(payload).copyHeaders(headers).build());
  11. }
  12. protected void sendMessage(Message<?> message) {
  13. ................
  14. try {
  15. //这里的OutputChannel就是之前的bridgeToModuleChannel
  16. this.messagingTemplate.send(getOutputChannel(), message);
  17. }
  18. ................
  19. }
  20. public void send(D destination, Message<?> message) {
  21. doSend(destination, message);
  22. }
  23. protected final void doSend(MessageChannel channel, Message<?> message) {
  24. ......................
  25. boolean sent = (timeout >= 0 ? channel.send(message, timeout) : channel.send(message));
  26. ..................
  27. }

channel.send(message)方法会调用AbstractMessageChannel的send方法

①:

  1. public final boolean send(Message<?> message, long timeout) {
  2. Assert.notNull(message, "message must not be null");
  3. Assert.notNull(message.getPayload(), "message payload must not be null");
  4. .............
  5. sent = this.doSend(message, timeout);
  6. if (countsEnabled) {
  7. channelMetrics.afterSend(metrics, sent);
  8. metricsProcessed = true;
  9. }
  10. if (debugEnabled) {
  11. logger.debug("postSend (sent=" + sent + ") on channel '" + this + "', message: " + message);
  12. }
  13. if (interceptorStack != null) {
  14. interceptors.postSend(message, this, sent);
  15. interceptors.afterSendCompletion(message, this, sent, null, interceptorStack);
  16. }
  17. return sent;
  18. ....................
  19. }

这里的doSend方法会调用AbstractSubscribableChannel的doSend方法

②:

  1. protected boolean doSend(Message<?> message, long timeout) {
  2. try {
  3. return getRequiredDispatcher().dispatch(message);
  4. }
  5. catch (MessageDispatchingException e) {
  6. String description = e.getMessage() + " for channel '" + this.getFullChannelName() + "'.";
  7. throw new MessageDeliveryException(message, description, e);
  8. }
  9. }
  10. //UnicastingDispatcher类的dispatch方法
  11. public final boolean dispatch(final Message<?> message) {
  12. if (this.executor != null) {
  13. Runnable task = createMessageHandlingTask(message);
  14. this.executor.execute(task);
  15. return true;
  16. }
  17. return this.doDispatch(message);
  18. }
  19. private boolean doDispatch(Message<?> message) {
  20. if (this.tryOptimizedDispatch(message)) {
  21. return true;
  22. }
  23. ....................
  24. return success;
  25. }
  26. protected boolean tryOptimizedDispatch(Message<?> message) {
  27. MessageHandler handler = this.theOneHandler;
  28. if (handler != null) {
  29. try {
  30. handler.handleMessage(message);
  31. return true;
  32. }
  33. catch (Exception e) {
  34. throw wrapExceptionIfNecessary(message, e);
  35. }
  36. }
  37. return false;
  38. }

最后会调用handler.handleMessage方法。由刚才创建可知bridgeToModuleChannel的handler为ReceivingHandler。所以会调用ReceivingHandler.handleMessage方法。ReceivingHandler继承自AbstractReplyProducingMessageHandler,AbstractReplyProducingMessageHandler继承自AbstractMessageHandler。所以会调用AbstractMessageHandler.handleMessage方法。

③:

  1. public final void handleMessage(Message<?> message) {
  2. ................
  3. try {
  4. ............
  5. this.handleMessageInternal(message);
  6. ............
  7. }
  8. catch (Exception e) {
  9. ...........
  10. }
  11. }

之后会执行 AbstractReplyProducingMessageHandler.handleMessageInternal方法

④:

  1. protected final void handleMessageInternal(Message<?> message) {
  2. Object result;
  3. if (this.advisedRequestHandler == null) {
  4. result = handleRequestMessage(message);
  5. }
  6. else {
  7. result = doInvokeAdvisedRequestHandler(message);
  8. }
  9. if (result != null) {
  10. sendOutputs(result, message);
  11. }
  12. else if (this.requiresReply && !isAsync()) {
  13. throw new ReplyRequiredException(message, "No reply produced by handler '" +
  14. getComponentName() + "', and its 'requiresReply' property is set to true.");
  15. }
  16. else if (!isAsync() && logger.isDebugEnabled()) {
  17. logger.debug("handler '" + this + "' produced no reply for request Message: " + message);
  18. }
  19. }

 会执行ReceivingHandler.handleRequestMessage方法,对消息进行反序列化等。之后会执行sendOutputs。

  1. protected void sendOutputs(Object result, Message<?> requestMessage) {
  2. if (result instanceof Iterable<?> && shouldSplitOutput((Iterable<?>) result)) {
  3. for (Object o : (Iterable<?>) result) {
  4. this.produceOutput(o, requestMessage);
  5. }
  6. }
  7. else if (result != null) {
  8. this.produceOutput(result, requestMessage);
  9. }
  10. }
  11. protected void produceOutput(Object reply, final Message<?> requestMessage) {
  12. final MessageHeaders requestHeaders = requestMessage.getHeaders();
  13. Object replyChannel = null;
  14. if (getOutputChannel() == null) {
  15. ............
  16. }
  17. if (this.async && reply instanceof ListenableFuture<?>) {
  18. .......................
  19. }
  20. else {
  21. sendOutput(createOutputMessage(reply, requestHeaders), replyChannel, false);
  22. }
  23. }
  1. protected void sendOutput(Object output, Object replyChannel, boolean useArgChannel) {
  2. MessageChannel outputChannel = getOutputChannel();
  3. ....................
  4. if (replyChannel instanceof MessageChannel) {
  5. if (output instanceof Message<?>) {
  6. this.messagingTemplate.send((MessageChannel) replyChannel, (Message<?>) output);
  7. }
  8. else {
  9. this.messagingTemplate.convertAndSend((MessageChannel) replyChannel, output);
  10. }
  11. }
  12. ..................
  13. }

此处ReceivingHandler.sendOutput的getOutputChannel就是BindableProxyFactory创建的messagechannel。

3、之后会调用this.messagingTemplate.send((MessageChannel) replyChannel, (Message<?>) output);也就是重复之前的①、②、③、④步骤。进入到AbstractReplyProducingMessageHandler.handleMessageInternal方法。

  1. protected final void handleMessageInternal(Message<?> message) {
  2. Object result;
  3. if (this.advisedRequestHandler == null) {
  4. result = handleRequestMessage(message);
  5. }
  6. else {
  7. result = doInvokeAdvisedRequestHandler(message);
  8. }
  9. if (result != null) {
  10. sendOutputs(result, message);
  11. }
  12. else if (this.requiresReply && !isAsync()) {
  13. throw new ReplyRequiredException(message, "No reply produced by handler '" +
  14. getComponentName() + "', and its 'requiresReply' property is set to true.");
  15. }
  16. else if (!isAsync() && logger.isDebugEnabled()) {
  17. logger.debug("handler '" + this + "' produced no reply for request Message: " + message);
  18. }
  19. }

但是此时的messagechannel是BindableProxyFactory创建的,此时的观察者是StreamListenerMessageHandler。

  1. protected Object handleRequestMessage(Message<?> requestMessage) {
  2. try {
  3. return invocableHandlerMethod.invoke(requestMessage);
  4. }
  5. catch (Exception e) {
  6. if (e instanceof MessagingException) {
  7. throw (MessagingException) e;
  8. }
  9. else {
  10. throw new MessagingException(requestMessage, "Exception thrown while invoking " + invocableHandlerMethod.getShortLogMessage(), e);
  11. }
  12. }
  13. }

由之前源码解析可知,invocableHandlerMethod已经封装了对应的Bean和Method。这样就完成了从RabbitMQ到Spring Stream找到对应方法的解析。

Spring Stream Rabbit消息找对应方法源码总结:

1、Spring Stream的创建了SimpleMessageListenerContainer,用于监听RabbitMQ服务器。

2、创建一个AmqpInboundChannelAdapter对象,入参是SimpleMessageListenerContainer。两者做关联,让SimpleMessageListenerContainer收到信息后,调用AmqpInboundChannelAdapter中onInit方法里创建的onMessage信息。

3、创建一个名为bridgeToModuleChannel的DirectChannel对象,设置Adapter的OutputChannel为DirectChannel。

4、创建一个观察者ReceivingHandler,用于观察bridgeToModuleChannel。并设置ReceivingHandler的OutputChannel为BindableProxyFactory创建的messagechannel。

5、RabbitMQ发送信息后,第一次会到ReceivingHandler.handleRequestMessage方法,对消息进行反序列化等。之后会执行sendOutputs。

6、再次sendOutputs后,会调用BindableProxyFactory创建的messagechannel的观察者StreamListenerMessageHandler。StreamListenerMessageHandler.handleRequestMessage方法会通过invocableHandlerMethod调用已经封装了对应的Bean和Method。从而找到对应的类方法。

 

以上就是Spring Cloud Stream的简单分析,Spring Stream的使用需要结合Spring Integration。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/840777
推荐阅读
相关标签
  

闽ICP备14008679号