赞
踩
Spring Cloud Stream 是一个消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQ、Kafka。
下面开始进行分析,首先引入pom文件。
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>1.5.3.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>eureka.stream</groupId>
- <artifactId>stream</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>springstream</name>
- <description>Demo project for Spring Boot</description>
-
- <properties>
- <java.version>1.8</java.version>
- <spring-cloud.version>Brixton.SR5</spring-cloud.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- </dependencies>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-dependencies</artifactId>
- <version>${spring-cloud.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
- </project>
其中spring-cloud-starter-stream-rabbit就是引入的stream的框架,也可以支持spring-cloud-starter-stream-kafka。这里以rabbit做分析。
首先我们先创建一个简单的例子。
先创建消息input、output管道类StreamSendClient。
- import org.springframework.cloud.stream.annotation.Input;
- import org.springframework.cloud.stream.annotation.Output;
- import org.springframework.cloud.stream.messaging.Sink;
- import org.springframework.messaging.MessageChannel;
- import org.springframework.messaging.SubscribableChannel;
-
-
- public interface StreamSendClient {
-
- @Output("testMessage")
- MessageChannel output();
-
- @Input("testMessage")
- MessageChannel input();
- }
再创建一个消息处理类SinkReceiver。上面加上@EnableBinding注解。注解定义的类为StreamSendClient。
- @EnableBinding({StreamSendClient.class})
- public class SinkReceiver {
-
- @StreamListener("testMessage")
- public void reveive(Object payload){
- System.out.println("Received:" + payload);
- }
- }
创建启动类StreamApplication。
- @SpringBootApplication
- public class StreamApplication {
-
- public static void main(String[] args) {
- ConfigurableApplicationContext run = SpringApplication.run(StreamApplication.class, args);
- StreamSendClient streamClient = (StreamSendClient)run.getBean("com.springcloud.eurekaclient.StreamSendClient");
- streamClient.output().send(MessageBuilder.withPayload("from streamClient").build());
- }
-
- }
执行之后变可以在控制台发现打印信息:Received:from streamClient。同时也可以看到rabbitmq控制台中队列包含了testMessage。
下面开始分析。
首先启动类没有新添任何注解,在SinkReceiver上面有@EnableBinding注解。
- @Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- @Inherited
- @Configuration
- @Import({ChannelBindingServiceConfiguration.class, BindingBeansRegistrar.class, BinderFactoryConfiguration.class,
- SpelExpressionConverterConfiguration.class})
- @EnableIntegration
- public @interface EnableBinding {
-
- /**
- * A list of interfaces having methods annotated with {@link Input} and/or
- * {@link Output} to indicate bindable components.
- */
- Class<?>[] value() default {};
-
- }
可以知道:
1、该类是一个@Component类
2、该类导入了ChannelBindingServiceConfiguration.class, BindingBeansRegistrar.class, BinderFactoryConfiguration.class, SpelExpressionConverterConfiguration.class类。
3、开启了EnableIntegration注解。Spring Integration的定位是一种企业服务总线 ESB(Enterprise Service Bus),在Spring Integration中,通道被抽象成两种表现形式:PollableChannel和SubscribableChannel,都是继承了MessageChannel。
- @Configuration
- @EnableConfigurationProperties(ChannelBindingServiceProperties.class)
- public class ChannelBindingServiceConfiguration {
-
- private static final String ERROR_CHANNEL_NAME = "error";
-
- @Autowired
- private MessageBuilderFactory messageBuilderFactory;
-
- @Autowired(required = false)
- private ObjectMapper objectMapper;
-
- /**
- * User defined custom message converters
- */
- @Autowired(required = false)
- private List<AbstractFromMessageConverter> customMessageConverters;
-
- @Bean
- // This conditional is intentionally not in an autoconfig (usually a bad idea) because
- // it is used to detect a ChannelBindingService in the parent context (which we know
- // already exists).
- @ConditionalOnMissingBean(ChannelBindingService.class)
- public ChannelBindingService bindingService(ChannelBindingServiceProperties channelBindingServiceProperties,
- BinderFactory<MessageChannel> binderFactory) {
- return new ChannelBindingService(channelBindingServiceProperties, binderFactory);
- }
-
-
- @Bean
- public BindableChannelFactory channelFactory(CompositeMessageChannelConfigurer compositeMessageChannelConfigurer) {
- return new DefaultBindableChannelFactory(compositeMessageChannelConfigurer);
- }
-
- @Bean
- public CompositeMessageChannelConfigurer compositeMessageChannelConfigurer(
- MessageConverterConfigurer messageConverterConfigurer) {
- List<MessageChannelConfigurer> configurerList = new ArrayList<>();
- configurerList.add(messageConverterConfigurer);
- return new CompositeMessageChannelConfigurer(configurerList);
- }
-
- @Bean
- @DependsOn("bindingService")
- public OutputBindingLifecycle outputBindingLifecycle() {
- return new OutputBindingLifecycle();
- }
-
- @Bean
- @DependsOn("bindingService")
- public InputBindingLifecycle inputBindingLifecycle() {
- return new InputBindingLifecycle();
- }
-
- @Bean
- @DependsOn("bindingService")
- public ContextStartAfterRefreshListener contextStartAfterRefreshListener() {
- return new ContextStartAfterRefreshListener();
- }
-
- @Bean
- public static StreamListenerAnnotationBeanPostProcessor bindToAnnotationBeanPostProcessor(
- @Lazy BinderAwareChannelResolver binderAwareChannelResolver,
- @Lazy MessageHandlerMethodFactory messageHandlerMethodFactory) {
- return new StreamListenerAnnotationBeanPostProcessor(binderAwareChannelResolver,
- messageHandlerMethodFactory);
- }
-
- }
ChannelBindingServiceConfiguration装载了重要的Bean:
1、ChannelBindingService:负责创建生产者、消费者的MessageChannel,以及RabbitMQ中的交换器(Exchange)、Queue等。
2、inputBindingLifecycle、outputBindingLifecycle:主要负责启动后,调用ChannelBindingService进行创建。
3、StreamListenerAnnotationBeanPostProcessor:负责方法上有@StreamListener注解的方法和RabbitMQ消费channel创建关联关系。即当有rabbitmq消息推送过来,执行方法上有@StreamListener注解的方法。
BindingBeansRegistrar主要对实现@EnableBinding(StreamSendClient.class)中的class进行分析。
- public class BindingBeansRegistrar implements ImportBeanDefinitionRegistrar {
-
- @Override
- public void registerBeanDefinitions(AnnotationMetadata metadata,
- BeanDefinitionRegistry registry) {
- AnnotationAttributes attrs = AnnotatedElementUtils.getMergedAnnotationAttributes(
- ClassUtils.resolveClassName(metadata.getClassName(), null),
- EnableBinding.class);
- for (Class<?> type : collectClasses(attrs, metadata.getClassName())) {
- BindingBeanDefinitionRegistryUtils.registerChannelBeanDefinitions(type,
- type.getName(), registry);
- BindingBeanDefinitionRegistryUtils.registerChannelsQualifiedBeanDefinitions(
- ClassUtils.resolveClassName(metadata.getClassName(), null), type,
- registry);
- }
- }
-
- private Class<?>[] collectClasses(AnnotationAttributes attrs, String className) {
- EnableBinding enableBinding = AnnotationUtils.synthesizeAnnotation(attrs,
- EnableBinding.class, ClassUtils.resolveClassName(className, null));
- return enableBinding.value();
- }
-
- }
通过collectClasses方法,获取EnableBinding中的enableBinding.value(),也就是之前例子中的StreamSendClient类。之后的BindingBeanDefinitionRegistryUtils.registerChannelBeanDefinitions方法
- public static void registerChannelBeanDefinitions(Class<?> type,
- final String channelInterfaceBeanName, final BeanDefinitionRegistry registry) {
- ReflectionUtils.doWithMethods(type, new MethodCallback() {
- @Override
- public void doWith(Method method) throws IllegalArgumentException,
- IllegalAccessException {
- Input input = AnnotationUtils.findAnnotation(method, Input.class);
- if (input != null) {
- String name = getChannelName(input, method);
- registerInputChannelBeanDefinition(input.value(), name,
- channelInterfaceBeanName, method.getName(), registry);
- }
- Output output = AnnotationUtils.findAnnotation(method, Output.class);
- if (output != null) {
- String name = getChannelName(output, method);
- registerOutputChannelBeanDefinition(output.value(), name,
- channelInterfaceBeanName, method.getName(), registry);
- }
- }
-
- });
- }
- public static void registerChannelBeanDefinitions(Class<?> type,
- final String channelInterfaceBeanName, final BeanDefinitionRegistry registry) {
- ReflectionUtils.doWithMethods(type, new MethodCallback() {
- @Override
- public void doWith(Method method) throws IllegalArgumentException,
- IllegalAccessException {
- Input input = AnnotationUtils.findAnnotation(method, Input.class);
- if (input != null) {
- String name = getChannelName(input, method);
- registerInputChannelBeanDefinition(input.value(), name,
- channelInterfaceBeanName, method.getName(), registry);
- }
- Output output = AnnotationUtils.findAnnotation(method, Output.class);
- if (output != null) {
- String name = getChannelName(output, method);
- registerOutputChannelBeanDefinition(output.value(), name,
- channelInterfaceBeanName, method.getName(), registry);
- }
- }
-
- });
- }
- public static void registerInputChannelBeanDefinition(String qualifierValue,
- String name, String channelInterfaceBeanName,
- String channelInterfaceMethodName, BeanDefinitionRegistry registry) {
- registerChannelBeanDefinition(Input.class, qualifierValue, name,
- channelInterfaceBeanName, channelInterfaceMethodName, registry);
- }
-
- private static void registerChannelBeanDefinition(
- Class<? extends Annotation> qualifier, String qualifierValue, String name,
- String channelInterfaceBeanName, String channelInterfaceMethodName,
- BeanDefinitionRegistry registry) {
-
- RootBeanDefinition rootBeanDefinition = new RootBeanDefinition();
- rootBeanDefinition.setFactoryBeanName(channelInterfaceBeanName);
- rootBeanDefinition.setUniqueFactoryMethodName(channelInterfaceMethodName);
- rootBeanDefinition.addQualifier(new AutowireCandidateQualifier(qualifier,
- qualifierValue));
- registry.registerBeanDefinition(name, rootBeanDefinition);
- }
找到StreamSendClient类中的@input和@output注解,将两个方法注入成Bean。Bean的名称为StreamSendClient中@Input注解的值,也就是testMessage。并设置定义 BeanDefinition 的生成该Bean的工厂类为StreamSendClient,生成该Bean(testMessage)的方法setUniqueFactoryMethodName为input()。
BindingBeanDefinitionRegistryUtils.registerChannelsQualifiedBeanDefinitions方法:
- public static void registerChannelsQualifiedBeanDefinitions(Class<?> parent,
- Class<?> type, final BeanDefinitionRegistry registry) {
-
- if (type.isInterface()) {
- RootBeanDefinition rootBeanDefinition = new RootBeanDefinition(
- BindableProxyFactory.class);
- rootBeanDefinition.addQualifier(new AutowireCandidateQualifier(
- Bindings.class, parent));
- rootBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(
- type);
- registry.registerBeanDefinition(type.getName(), rootBeanDefinition);
- }
- else {
- RootBeanDefinition rootBeanDefinition = new RootBeanDefinition(type);
- rootBeanDefinition.addQualifier(new AutowireCandidateQualifier(
- Bindings.class, parent));
- registry.registerBeanDefinition(type.getName(), rootBeanDefinition);
- }
- }
注入一个beanName为StreamSendClient,Qualifier名为Bindings,BeanClass为BindableProxyFactory(Bindable)类型的Bean对象。BindableProxyFactory实现了Bindable接口。
我们这里在看一下BindableProxyFactory源码。
- public class BindableProxyFactory implements MethodInterceptor, FactoryBean<Object>, Bindable, InitializingBean {
-
- private Map<String, ChannelHolder> inputHolders = new HashMap<>();
-
- private Map<String, ChannelHolder> outputHolders = new HashMap<>();
-
- //实现了动态代理,所以当获取input和output方法获取channel时,会通过这里获得
- public synchronized Object invoke(MethodInvocation invocation) throws Throwable {
- MessageChannel messageChannel = null;
- Method method = invocation.getMethod();
- if (MessageChannel.class.isAssignableFrom(method.getReturnType())) {
- Input input = AnnotationUtils.findAnnotation(method, Input.class);
- if (input != null) {
- String name = BindingBeanDefinitionRegistryUtils.getChannelName(input, method);
- messageChannel = this.inputHolders.get(name).getMessageChannel();
- }
- Output output = AnnotationUtils.findAnnotation(method, Output.class);
- if (output != null) {
- String name = BindingBeanDefinitionRegistryUtils.getChannelName(output, method);
- messageChannel = this.outputHolders.get(name).getMessageChannel();
- }
- }
- //ignore
- return messageChannel;
- }
-
- //实现了InitializingBean,在Bean生成后,会将input和output两个注解生成对应的channel
- //类,默认是DirectChannel类
- public void afterPropertiesSet() throws Exception {
- ReflectionUtils.doWithMethods(type, new ReflectionUtils.MethodCallback() {
- @Override
- public void doWith(Method method) throws IllegalArgumentException {
- Assert.notNull(channelFactory, "Channel Factory cannot be null");
- Input input = AnnotationUtils.findAnnotation(method, Input.class);
- if (input != null) {
- String name = BindingBeanDefinitionRegistryUtils.getChannelName(input, method);
- validateChannelType(method.getReturnType());
- MessageChannel sharedChannel = locateSharedChannel(name);
- if (sharedChannel == null) {
- inputHolders.put(name, new ChannelHolder(channelFactory.createSubscribableChannel(name), true));
- }
- else {
- inputHolders.put(name, new ChannelHolder(sharedChannel, false));
- }
- }
- }
- });
- ReflectionUtils.doWithMethods(type, new ReflectionUtils.MethodCallback() {
- @Override
- public void doWith(Method method) throws IllegalArgumentException {
- Output output = AnnotationUtils.findAnnotation(method, Output.class);
- if (output != null) {
- String name = BindingBeanDefinitionRegistryUtils.getChannelName(output, method);
- validateChannelType(method.getReturnType());
- MessageChannel sharedChannel = locateSharedChannel(name);
- if (sharedChannel == null) {
- outputHolders.put(name, new ChannelHolder(channelFactory.createSubscribableChannel(name), true));
- }
- else {
- outputHolders.put(name, new ChannelHolder(sharedChannel, false));
- }
- }
- }
-
- });
- }
-
- private void validateChannelType(Class<?> channelType) {
- Assert.isTrue(SubscribableChannel.class.equals(channelType) || MessageChannel.class.equals(channelType),
- "A bound channel should be either a '" + MessageChannel.class.getName() + "', " +
- " or a '" + SubscribableChannel.class.getName() + "'");
- }
-
- private MessageChannel locateSharedChannel(String name) {
- return this.sharedChannelRegistry != null ?
- this.sharedChannelRegistry.get(getNamespacePrefixedChannelName(name)) : null;
- }
-
- private String getNamespacePrefixedChannelName(String name) {
- return this.channelNamespace + "." + name;
- }
-
- @Override
- public synchronized Object getObject() throws Exception {
- if (this.proxy == null) {
- ProxyFactory factory = new ProxyFactory(this.type, this);
- this.proxy = factory.getProxy();
- }
- return this.proxy;
- }
-
- @Override
- public Class<?> getObjectType() {
- return this.type;
- }
-
- @Override
- public boolean isSingleton() {
- return true;
- }
-
-
- public void bindInputs(ChannelBindingService channelBindingService) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Binding inputs for %s:%s", this.channelNamespace, this.type));
- }
- for (Map.Entry<String, ChannelHolder> channelHolderEntry : this.inputHolders.entrySet()) {
- String inputChannelName = channelHolderEntry.getKey();
- ChannelHolder channelHolder = channelHolderEntry.getValue();
- if (channelHolder.isBindable()) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Binding %s:%s:%s", this.channelNamespace, this.type, inputChannelName));
- }
- channelBindingService.bindConsumer(channelHolder.getMessageChannel(), inputChannelName);
- }
- }
- }
-
- @Override
- public void bindOutputs(ChannelBindingService channelBindingService) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Binding outputs for %s:%s", this.channelNamespace, this.type));
- }
- for (Map.Entry<String, ChannelHolder> channelHolderEntry : this.outputHolders.entrySet()) {
- ChannelHolder channelHolder = channelHolderEntry.getValue();
- String outputChannelName = channelHolderEntry.getKey();
- if (channelHolderEntry.getValue().isBindable()) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Binding %s:%s:%s", this.channelNamespace, this.type, outputChannelName));
- }
- channelBindingService.bindProducer(channelHolder.getMessageChannel(), outputChannelName);
- }
- }
- }
-
- @Override
- public void unbindInputs(ChannelBindingService channelBindingService) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Unbinding inputs for %s:%s", this.channelNamespace, this.type));
- }
- for (Map.Entry<String, ChannelHolder> channelHolderEntry : this.inputHolders.entrySet()) {
- if (channelHolderEntry.getValue().isBindable()) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Unbinding %s:%s:%s", this.channelNamespace, this.type, channelHolderEntry.getKey()));
- }
- channelBindingService.unbindConsumers(channelHolderEntry.getKey());
- }
- }
- }
-
- @Override
- public void unbindOutputs(ChannelBindingService channelBindingService) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Unbinding outputs for %s:%s", this.channelNamespace, this.type));
- }
- for (Map.Entry<String, ChannelHolder> channelHolderEntry : this.outputHolders.entrySet()) {
- if (channelHolderEntry.getValue().isBindable()) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Binding %s:%s:%s", this.channelNamespace, this.type, channelHolderEntry.getKey()));
- }
- channelBindingService.unbindProducers(channelHolderEntry.getKey());
- }
- }
- }
- @Configuration
- public class BinderFactoryConfiguration {
-
- @Bean
- @ConditionalOnMissingBean(BinderFactory.class)
- public BinderFactory<?> binderFactory(BinderTypeRegistry binderTypeRegistry,
- ChannelBindingServiceProperties channelBindingServiceProperties) {
- Map<String, BinderConfiguration> binderConfigurations = new HashMap<>();
- Map<String, BinderProperties> declaredBinders = channelBindingServiceProperties.getBinders();
- boolean defaultCandidatesExist = false;
- Iterator<Map.Entry<String, BinderProperties>> binderPropertiesIterator = declaredBinders.entrySet().iterator();
- while (!defaultCandidatesExist && binderPropertiesIterator.hasNext()) {
- defaultCandidatesExist = binderPropertiesIterator.next().getValue().isDefaultCandidate();
- }
- for (Map.Entry<String, BinderProperties> binderEntry : declaredBinders.entrySet()) {
- BinderProperties binderProperties = binderEntry.getValue();
- if (binderTypeRegistry.get(binderEntry.getKey()) != null) {
- binderConfigurations.put(binderEntry.getKey(),
- new BinderConfiguration(binderTypeRegistry.get(binderEntry.getKey()),
- binderProperties.getEnvironment(), binderProperties.isInheritEnvironment(),
- binderProperties.isDefaultCandidate()));
- }
- else {
- Assert.hasText(binderProperties.getType(),
- "No 'type' property present for custom binder " + binderEntry.getKey());
- BinderType binderType = binderTypeRegistry.get(binderProperties.getType());
- Assert.notNull(binderType, "Binder type " + binderProperties.getType() + " is not defined");
- binderConfigurations.put(binderEntry.getKey(),
- new BinderConfiguration(binderType, binderProperties.getEnvironment(),
- binderProperties.isInheritEnvironment(), binderProperties.isDefaultCandidate()));
- }
- }
- if (!defaultCandidatesExist) {
- for (Map.Entry<String, BinderType> entry : binderTypeRegistry.getAll().entrySet()) {
- binderConfigurations.put(entry.getKey(),
- new BinderConfiguration(entry.getValue(), new Properties(), true, true));
- }
- }
- DefaultBinderFactory<?> binderFactory = new DefaultBinderFactory<>(binderConfigurations);
- binderFactory.setDefaultBinder(channelBindingServiceProperties.getDefaultBinder());
- return binderFactory;
- }
- }
主要就是创建一个DefaultBinderFactory的工厂。
ChannelBindingServiceConfiguration, BindingBeansRegistrar, BinderFactoryConfiguration三个装载Bean的内容大概介绍完毕了,现在开始说一下加载过程:
1、ChannelBindingServiceConfiguration类加载的Bean对象outputBindingLifecycle,inputBindingLifecycle。我们拿inputBindingLifecycle做分析,outputBindingLifecycle类似。
- @Bean
- @DependsOn("bindingService")
- public OutputBindingLifecycle outputBindingLifecycle() {
- return new OutputBindingLifecycle();
- }
-
- @Bean
- @DependsOn("bindingService")
- public InputBindingLifecycle inputBindingLifecycle() {
- return new InputBindingLifecycle();
- }
inputBindingLifecycle类实现了SmartLifecycle接口,在spring启动后会执行start方法。
- public class InputBindingLifecycle implements SmartLifecycle, ApplicationContextAware {
- public void start() {
- if (!running) {
- // retrieve the ChannelBindingService lazily, avoiding early initialization
- try {
- ChannelBindingService channelBindingService = this.applicationContext
- .getBean(ChannelBindingService.class);
- Map<String, Bindable> bindables = this.applicationContext
- .getBeansOfType(Bindable.class);
- for (Bindable bindable : bindables.values()) {
- //bindables.values即为@@EnableBinding({StreamSendClient.class})类,BeanClass为BindableProxyFactory
- bindable.bindInputs(channelBindingService);
- }
- }
- catch (BeansException e) {
- throw new IllegalStateException(
- "Cannot perform binding, no proper implementation found", e);
- }
- this.running = true;
- }
- }
- }
-
- BindableProxyFactory.bindInputs方法如下:
- public void bindInputs(ChannelBindingService channelBindingService) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Binding inputs for %s:%s", this.channelNamespace, this.type));
- }
- for (Map.Entry<String, ChannelHolder> channelHolderEntry : this.inputHolders.entrySet()) {
- String inputChannelName = channelHolderEntry.getKey();
- ChannelHolder channelHolder = channelHolderEntry.getValue();
- if (channelHolder.isBindable()) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Binding %s:%s:%s", this.channelNamespace, this.type, inputChannelName));
- }
- //这里继续进入
- channelBindingService.bindConsumer(channelHolder.getMessageChannel(), inputChannelName);
- }
- }
-
- public Collection<Binding<MessageChannel>> bindConsumer(MessageChannel inputChannel, String inputChannelName) {
- ....................
- validate(consumerProperties);
- for (String target : channelBindingTargets) {
- //继续进入binder.bindConsumer方法
- Binding<MessageChannel> binding = binder.bindConsumer(target, channelBindingServiceProperties.getGroup(inputChannelName), inputChannel, consumerProperties);
- bindings.add(binding);
- }
- this.consumerBindings.put(inputChannelName, bindings);
- return bindings;
- }
- }
-
- 会进入RabbitMessageChannelBinder.bindConsumer方法
- public Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel,
- ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
-
- String prefix = properties.getExtension().getPrefix();
- String exchangeName = applyPrefix(prefix, name);
- TopicExchange exchange = new TopicExchange(exchangeName);
- //创建交换器
- declareExchange(exchangeName, exchange);
-
- String queueName = applyPrefix(prefix, baseQueueName);
- boolean partitioned = !anonymousConsumer && properties.isPartitioned();
- boolean durable = !anonymousConsumer && properties.getExtension().isDurableSubscription();
- Queue queue;
-
- ......................
- //创建队列
- declareQueue(queueName, queue);
-
- if (partitioned) {
- String bindingKey = String.format("%s-%d", name, properties.getInstanceIndex());
- declareBinding(queue.getName(), BindingBuilder.bind(queue).to(exchange).with(bindingKey));
- }
- else {
- declareBinding(queue.getName(), BindingBuilder.bind(queue).to(exchange).with("#"));
- }
- Binding<MessageChannel> binding = doRegisterConsumer(baseQueueName, group, inputChannel, queue, properties);
- .................
- return binding;
- }
可以看到通过inputBindingLifecycle创建了RabbitMq的交换器(Exchange)和队列。
同理通过outputBindingLifecycle启动后会创建生产者。
2、ChannelBindingServiceConfiguration类加载的Bean对象StreamListenerAnnotationBeanPostProcessor。StreamListenerAnnotationBeanPostProcessor实现了BeanPostProcessor接口。会执行postProcessAfterInitialization方法。
- public class StreamListenerAnnotationBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware, SmartInitializingSingleton {
-
- public Object postProcessAfterInitialization(final Object bean, String beanName) throws BeansException {
- Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
- ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() {
- @Override
- public void doWith(final Method method) throws IllegalArgumentException, IllegalAccessException {
- // 步骤1
- StreamListener streamListener = AnnotationUtils.findAnnotation(method, StreamListener.class);
- if (streamListener != null) {
- Method targetMethod = checkProxy(method, bean);
- Assert.hasText(streamListener.value(), "The binding name cannot be null");
- //步骤2
- final InvocableHandlerMethod invocableHandlerMethod = messageHandlerMethodFactory.createInvocableHandlerMethod(bean, targetMethod);
- if (!StringUtils.hasText(streamListener.value())) {
- throw new BeanInitializationException("A bound component name must be specified");
- }
- if (mappedBindings.containsKey(streamListener.value())) {
- throw new BeanInitializationException("Duplicate @" + StreamListener.class.getSimpleName() +
- " mapping for '" + streamListener.value() + "' on " + invocableHandlerMethod.getShortLogMessage() +
- " already existing for " + mappedBindings.get(streamListener.value()).getShortLogMessage());
- }
- mappedBindings.put(streamListener.value(), invocableHandlerMethod);
- //步骤3
- SubscribableChannel channel = applicationContext.getBean(streamListener.value(),
- SubscribableChannel.class);
- final String defaultOutputChannel = extractDefaultOutput(method);
- if (invocableHandlerMethod.isVoid()) {
- Assert.isTrue(StringUtils.isEmpty(defaultOutputChannel), "An output channel cannot be specified for a method that " +
- "does not return a value");
- }
- else {
- Assert.isTrue(!StringUtils.isEmpty(defaultOutputChannel), "An output channel must be specified for a method that " +
- "can return a value");
- }
- //步骤4
- StreamListenerMessageHandler handler = new StreamListenerMessageHandler(invocableHandlerMethod);
- handler.setApplicationContext(applicationContext);
- handler.setChannelResolver(binderAwareChannelResolver);
- if (!StringUtils.isEmpty(defaultOutputChannel)) {
- handler.setOutputChannelName(defaultOutputChannel);
- }
- handler.afterPropertiesSet();
- //步骤5
- channel.subscribe(handler);
- }
- }
- });
- return bean;
- }
- }
postProcessAfterInitialization方法过程:
1、会将包含@StreamListener("testMessage")的方法找出来
2、创建一个InvocableHandlerMethod代理类,代理类执行我们创建的具体方法。
3、streamListener.value()的值为testMessage,该Bean实际获取的是工厂类为StreamSendClient,方法为input()获得的Bean对象testMessage。生成该Bean的具体方式上面已经说过在BindingBeanDefinitionRegistryUtils.registerChannelBeanDefinitions方法。将之前由BindableProxyFactory对象通过afterPropertiesSet方法生成的@input、@output注解对应的SubscribableChannel Bean查找出来。
PS:获取testMessage Bean对象时,先找StreamSendClient类,在调用input方法。因为前面设置了rootBeanDefinition.setFactoryBeanName(StreamSendClient);
rootBeanDefinition.setUniqueFactoryMethodName(input);
4、创建一个StreamListenerMessageHandler对象,构造方法中InvocableHandlerMethod作为入参。
5、将SubscribableChannel添加订阅StreamListenerMessageHandler。
这样当消息传过来的时候,就会由StreamListenerMessageHandler中的InvocableHandlerMethod,找到具体方法去处理了。
我们知道刚才RabbitMessageChannelBinder.bindConsumer方法中
- RabbitMessageChannelBinder.bindConsumer方法
- public Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel,
- ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
-
- String prefix = properties.getExtension().getPrefix();
- String exchangeName = applyPrefix(prefix, name);
- TopicExchange exchange = new TopicExchange(exchangeName);
- //创建交换器
- declareExchange(exchangeName, exchange);
-
- String queueName = applyPrefix(prefix, baseQueueName);
- boolean partitioned = !anonymousConsumer && properties.isPartitioned();
- boolean durable = !anonymousConsumer && properties.getExtension().isDurableSubscription();
- Queue queue;
-
- ......................
- //创建队列
- declareQueue(queueName, queue);
-
- if (partitioned) {
- String bindingKey = String.format("%s-%d", name, properties.getInstanceIndex());
- declareBinding(queue.getName(), BindingBuilder.bind(queue).to(exchange).with(bindingKey));
- }
- else {
- declareBinding(queue.getName(), BindingBuilder.bind(queue).to(exchange).with("#"));
- }
- Binding<MessageChannel> binding = doRegisterConsumer(baseQueueName, group, inputChannel, queue, properties);
- .................
- return binding;
- }
最后这句doRegisterConsumer方法就是找到RabbitMq消息对应处理方法的地方。
- private Binding<MessageChannel> doRegisterConsumer(final String name, String group, MessageChannel moduleInputChannel, Queue queue,
- final ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
- DefaultBinding<MessageChannel> consumerBinding;
- SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(
- this.connectionFactory);
- listenerContainer.setAcknowledgeMode(properties.getExtension().getAcknowledgeMode());
- listenerContainer.setChannelTransacted(properties.getExtension().isTransacted());
- listenerContainer.setDefaultRequeueRejected(properties.getExtension().isRequeueRejected());
- int concurrency = properties.getConcurrency();
- concurrency = concurrency > 0 ? concurrency : 1;
- listenerContainer.setConcurrentConsumers(concurrency);
- int maxConcurrency = properties.getExtension().getMaxConcurrency();
- if (maxConcurrency > concurrency) {
- listenerContainer.setMaxConcurrentConsumers(maxConcurrency);
- }
- listenerContainer.setPrefetchCount(properties.getExtension().getPrefetch());
- listenerContainer.setRecoveryInterval(properties.getExtension().getRecoveryInterval());
- listenerContainer.setTxSize(properties.getExtension().getTxSize());
- listenerContainer.setTaskExecutor(new SimpleAsyncTaskExecutor(queue.getName() + "-"));
- listenerContainer.setQueues(queue);
- int maxAttempts = properties.getMaxAttempts();
- if (maxAttempts > 1 || properties.getExtension().isRepublishToDlq()) {
- RetryOperationsInterceptor retryInterceptor = RetryInterceptorBuilder.stateless()
- .maxAttempts(maxAttempts)
- .backOffOptions(properties.getBackOffInitialInterval(),
- properties.getBackOffMultiplier(),
- properties.getBackOffMaxInterval())
- .recoverer(determineRecoverer(name, properties.getExtension().getPrefix(), properties.getExtension().isRepublishToDlq()))
- .build();
- listenerContainer.setAdviceChain(new Advice[] { retryInterceptor });
- }
- listenerContainer.setAfterReceivePostProcessors(this.decompressingPostProcessor);
- listenerContainer.setMessagePropertiesConverter(RabbitMessageChannelBinder.inboundMessagePropertiesConverter);
- listenerContainer.afterPropertiesSet();
- AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
- adapter.setBeanFactory(this.getBeanFactory());
- DirectChannel bridgeToModuleChannel = new DirectChannel();
- bridgeToModuleChannel.setBeanFactory(this.getBeanFactory());
- bridgeToModuleChannel.setBeanName(name + ".bridge");
- adapter.setOutputChannel(bridgeToModuleChannel);
- adapter.setBeanName("inbound." + name);
- DefaultAmqpHeaderMapper mapper = new DefaultAmqpHeaderMapper();
- mapper.setRequestHeaderNames(properties.getExtension().getRequestHeaderPatterns());
- mapper.setReplyHeaderNames(properties.getExtension().getReplyHeaderPatterns());
- adapter.setHeaderMapper(mapper);
- adapter.afterPropertiesSet();
- consumerBinding = new DefaultBinding<MessageChannel>(name, group, moduleInputChannel, adapter) {
- @Override
- protected void afterUnbind() {
- cleanAutoDeclareContext(properties.getExtension().getPrefix(), name);
- }
- };
- ReceivingHandler convertingBridge = new ReceivingHandler();
- convertingBridge.setOutputChannel(moduleInputChannel);
- convertingBridge.setBeanName(name + ".convert.bridge");
- convertingBridge.afterPropertiesSet();
- bridgeToModuleChannel.subscribe(convertingBridge);
- adapter.start();
- return consumerBinding;
- }
1、可以看到Spring Stream的创建了SimpleMessageListenerContainer,该类的作用就是RabbitMQ接收消息并让对应方法处理,在SpringRabbit中是找到@RabbitListener注解,详细分析可以看之前的SpringBoot整合Rabbit的文章。
这里我们只需要知道SimpleMessageListenerContainer是负责和RabbitMQ和本地监听方法交互的一个容器就行。
2、接着创建了一个AmqpInboundChannelAdapter对象,入参是SimpleMessageListenerContainer。并创建了一个名为bridgeToModuleChannel的DirectChannel对象,设置Adapter的OutputChannel为DirectChannel。之后调用adapter的afterPropertiesSet方法。afterPropertiesSet方法会调用自身的onInit方法。
- protected void onInit() {
- this.messageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
-
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
- Map<String, Object> headers =
- AmqpInboundChannelAdapter.this.headerMapper.toHeadersFromRequest(message.getMessageProperties());
- if (AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode()
- == AcknowledgeMode.MANUAL) {
- headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
- headers.put(AmqpHeaders.CHANNEL, channel);
- }
- sendMessage(getMessageBuilderFactory().withPayload(payload).copyHeaders(headers).build());
- }
-
- });
- this.messageListenerContainer.afterPropertiesSet();
- super.onInit();
- }
onInit方法可以看到,将SimpleMessageListenerContainer的消息监听设置为实现ChannelAwareMessageListener接口的自定义方法。在SpringBoot整合RabbitMQ中,setMessageListener的对象为MessagingMessageListenerAdapter。
3、创建一个ReceivingHandler对象,ReceivingHandler对象实现了MessageHandler接口。并设置ReceivingHandler的OutputChannel为BindableProxyFactory创建的messagechannel。之前已经分析了,在StreamListenerAnnotationBeanPostProcessor对象的postProcessAfterInitialization中,已经将该messagechannel添加了StreamListenerMessageHandler,StreamListenerMessageHandler中又创建了具体的处理Bean和Method。
4、将bridgeToModuleChannel添加观察者ReceivingHandler。
下面分析当一条MQ消息到来时经历的步骤:
1、首先会调用SimpleMessageListenerContainer的onMessage方法,详细分析可以看之前的SpringBoot整合Rabbit的文章。
2、onMessage会调用sendMessage方法。
- public void onMessage(Message message, Channel channel) throws Exception {
- Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
- Map<String, Object> headers =
- AmqpInboundChannelAdapter.this.headerMapper.toHeadersFromRequest(message.getMessageProperties());
- if (AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode()
- == AcknowledgeMode.MANUAL) {
- headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
- headers.put(AmqpHeaders.CHANNEL, channel);
- }
- sendMessage(getMessageBuilderFactory().withPayload(payload).copyHeaders(headers).build());
- }
- protected void sendMessage(Message<?> message) {
- ................
- try {
- //这里的OutputChannel就是之前的bridgeToModuleChannel
- this.messagingTemplate.send(getOutputChannel(), message);
- }
- ................
- }
- public void send(D destination, Message<?> message) {
- doSend(destination, message);
- }
- protected final void doSend(MessageChannel channel, Message<?> message) {
- ......................
-
- boolean sent = (timeout >= 0 ? channel.send(message, timeout) : channel.send(message));
-
- ..................
- }
channel.send(message)方法会调用AbstractMessageChannel的send方法
①:
- public final boolean send(Message<?> message, long timeout) {
- Assert.notNull(message, "message must not be null");
- Assert.notNull(message.getPayload(), "message payload must not be null");
- .............
- sent = this.doSend(message, timeout);
- if (countsEnabled) {
- channelMetrics.afterSend(metrics, sent);
- metricsProcessed = true;
- }
-
- if (debugEnabled) {
- logger.debug("postSend (sent=" + sent + ") on channel '" + this + "', message: " + message);
- }
- if (interceptorStack != null) {
- interceptors.postSend(message, this, sent);
- interceptors.afterSendCompletion(message, this, sent, null, interceptorStack);
- }
- return sent;
-
- ....................
- }
这里的doSend方法会调用AbstractSubscribableChannel的doSend方法
②:
- protected boolean doSend(Message<?> message, long timeout) {
- try {
- return getRequiredDispatcher().dispatch(message);
- }
- catch (MessageDispatchingException e) {
- String description = e.getMessage() + " for channel '" + this.getFullChannelName() + "'.";
- throw new MessageDeliveryException(message, description, e);
- }
- }
- //UnicastingDispatcher类的dispatch方法
- public final boolean dispatch(final Message<?> message) {
- if (this.executor != null) {
- Runnable task = createMessageHandlingTask(message);
- this.executor.execute(task);
- return true;
- }
- return this.doDispatch(message);
- }
-
- private boolean doDispatch(Message<?> message) {
- if (this.tryOptimizedDispatch(message)) {
- return true;
- }
- ....................
- return success;
- }
- protected boolean tryOptimizedDispatch(Message<?> message) {
- MessageHandler handler = this.theOneHandler;
- if (handler != null) {
- try {
- handler.handleMessage(message);
- return true;
- }
- catch (Exception e) {
- throw wrapExceptionIfNecessary(message, e);
- }
- }
- return false;
- }
最后会调用handler.handleMessage方法。由刚才创建可知bridgeToModuleChannel的handler为ReceivingHandler。所以会调用ReceivingHandler.handleMessage方法。ReceivingHandler继承自AbstractReplyProducingMessageHandler,AbstractReplyProducingMessageHandler继承自AbstractMessageHandler。所以会调用AbstractMessageHandler.handleMessage方法。
③:
- public final void handleMessage(Message<?> message) {
- ................
- try {
- ............
- this.handleMessageInternal(message);
- ............
- }
- catch (Exception e) {
- ...........
- }
- }
之后会执行 AbstractReplyProducingMessageHandler.handleMessageInternal方法
④:
- protected final void handleMessageInternal(Message<?> message) {
- Object result;
- if (this.advisedRequestHandler == null) {
- result = handleRequestMessage(message);
- }
- else {
- result = doInvokeAdvisedRequestHandler(message);
- }
- if (result != null) {
- sendOutputs(result, message);
- }
- else if (this.requiresReply && !isAsync()) {
- throw new ReplyRequiredException(message, "No reply produced by handler '" +
- getComponentName() + "', and its 'requiresReply' property is set to true.");
- }
- else if (!isAsync() && logger.isDebugEnabled()) {
- logger.debug("handler '" + this + "' produced no reply for request Message: " + message);
- }
- }
会执行ReceivingHandler.handleRequestMessage方法,对消息进行反序列化等。之后会执行sendOutputs。
- protected void sendOutputs(Object result, Message<?> requestMessage) {
- if (result instanceof Iterable<?> && shouldSplitOutput((Iterable<?>) result)) {
- for (Object o : (Iterable<?>) result) {
- this.produceOutput(o, requestMessage);
- }
- }
- else if (result != null) {
- this.produceOutput(result, requestMessage);
- }
- }
-
- protected void produceOutput(Object reply, final Message<?> requestMessage) {
- final MessageHeaders requestHeaders = requestMessage.getHeaders();
-
- Object replyChannel = null;
- if (getOutputChannel() == null) {
- ............
-
- }
-
- if (this.async && reply instanceof ListenableFuture<?>) {
- .......................
- }
- else {
- sendOutput(createOutputMessage(reply, requestHeaders), replyChannel, false);
- }
- }
- protected void sendOutput(Object output, Object replyChannel, boolean useArgChannel) {
- MessageChannel outputChannel = getOutputChannel();
- ....................
-
- if (replyChannel instanceof MessageChannel) {
- if (output instanceof Message<?>) {
- this.messagingTemplate.send((MessageChannel) replyChannel, (Message<?>) output);
- }
- else {
- this.messagingTemplate.convertAndSend((MessageChannel) replyChannel, output);
- }
- }
- ..................
- }
此处ReceivingHandler.sendOutput的getOutputChannel就是BindableProxyFactory创建的messagechannel。
3、之后会调用this.messagingTemplate.send((MessageChannel) replyChannel, (Message<?>) output);也就是重复之前的①、②、③、④步骤。进入到AbstractReplyProducingMessageHandler.handleMessageInternal方法。
- protected final void handleMessageInternal(Message<?> message) {
- Object result;
- if (this.advisedRequestHandler == null) {
- result = handleRequestMessage(message);
- }
- else {
- result = doInvokeAdvisedRequestHandler(message);
- }
- if (result != null) {
- sendOutputs(result, message);
- }
- else if (this.requiresReply && !isAsync()) {
- throw new ReplyRequiredException(message, "No reply produced by handler '" +
- getComponentName() + "', and its 'requiresReply' property is set to true.");
- }
- else if (!isAsync() && logger.isDebugEnabled()) {
- logger.debug("handler '" + this + "' produced no reply for request Message: " + message);
- }
- }
但是此时的messagechannel是BindableProxyFactory创建的,此时的观察者是StreamListenerMessageHandler。
- protected Object handleRequestMessage(Message<?> requestMessage) {
- try {
- return invocableHandlerMethod.invoke(requestMessage);
- }
- catch (Exception e) {
- if (e instanceof MessagingException) {
- throw (MessagingException) e;
- }
- else {
- throw new MessagingException(requestMessage, "Exception thrown while invoking " + invocableHandlerMethod.getShortLogMessage(), e);
- }
- }
- }
由之前源码解析可知,invocableHandlerMethod已经封装了对应的Bean和Method。这样就完成了从RabbitMQ到Spring Stream找到对应方法的解析。
1、Spring Stream的创建了SimpleMessageListenerContainer,用于监听RabbitMQ服务器。
2、创建一个AmqpInboundChannelAdapter对象,入参是SimpleMessageListenerContainer。两者做关联,让SimpleMessageListenerContainer收到信息后,调用AmqpInboundChannelAdapter中onInit方法里创建的onMessage信息。
3、创建一个名为bridgeToModuleChannel的DirectChannel对象,设置Adapter的OutputChannel为DirectChannel。
4、创建一个观察者ReceivingHandler,用于观察bridgeToModuleChannel。并设置ReceivingHandler的OutputChannel为BindableProxyFactory创建的messagechannel。
5、RabbitMQ发送信息后,第一次会到ReceivingHandler.handleRequestMessage方法,对消息进行反序列化等。之后会执行sendOutputs。
6、再次sendOutputs后,会调用BindableProxyFactory创建的messagechannel的观察者StreamListenerMessageHandler。StreamListenerMessageHandler.handleRequestMessage方法会通过invocableHandlerMethod调用已经封装了对应的Bean和Method。从而找到对应的类方法。
以上就是Spring Cloud Stream的简单分析,Spring Stream的使用需要结合Spring Integration。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。