赞
踩
之前在写Spring Boot基础教程的时候写过一篇《Spring Boot中使用RabbitMQ》。在该文中,我们通过简单的配置和注解就能实现向RabbitMQ中生产和消费消息。实际上我们使用的对RabbitMQ的starter就是通过Spring Cloud Stream中对RabbitMQ的支持来实现的。下面我们就通过本文来了解一下Spring Cloud Stream。
Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot来创建独立的、可用于生产的Spring应用程序。它通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动的微服务应用。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅、消费组以及消息分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。由于Spring Cloud Stream基于Spring Boot实现,所以它秉承了Spring Boot的优点,实现了自动化配置的功能帮忙我们可以快速的上手使用,但是目前为止Spring Cloud Stream只支持下面两个著名的消息中间件的自动化配置:
下面我们通过构建一个简单的示例来对Spring Cloud Stream有一个初步认识。该示例主要目标是构建一个基于Spring Boot的微服务应用,这个微服务应用将通过使用消息中间件RabbitMQ来接收消息并将消息打印到日志中。所以,在进行下面步骤之前请先确认已经在本地安装了RabbitMQ,具体安装步骤请参考此文。
创建一个基础的Spring Boot工程,命名为:stream-hello
编辑pom.xml中的依赖关系,引入Spring Cloud Stream对RabbitMQ的支持,具体如下:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Dalston.SR4</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
@EnableBinding(Sink.class) //本代码中的注解会在本文的末尾进行讲解
public class SinkReceiver {
private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);
@StreamListener(Sink.INPUT)
public void receive(Object payload) {
logger.info("Received: " + payload);
}
}
@SpringBootApplication
public class SinkApplication {
public static void main(String[] args) {
SpringApplication.run(SinkApplication.class, args);
}
}
到这里,我们快速入门示例的编码任务就已经完成了。下面我们分别启动RabbitMQ以及该Spring Boot应用,然后做下面的试验,看看它们是如何运作的。
我们先来看一下Spring Boot应用的启动日志。
从上面的日志内容中,我们可以获得以下信息:
在顺利完成上面快速入门的示例后,我们简单解释一下上面的步骤是如何将我们的Spring Boot应用连接上RabbitMQ来消费消息以实现消息驱动业务逻辑的。
首先,我们对Spring Boot应用做的就是引入spring-cloud-starter-stream-rabbit依赖,该依赖包是Spring Cloud Stream对RabbitMQ支持的封装,其中包含了对RabbitMQ的自动化配置等内容。从下面它定义的依赖关系中,我们还可以知道它等价于spring-cloud-stream-binder-rabbit依赖。
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
接着,我们再来看看这里用到的几个Spring Cloud Stream的核心注解,它们都被定义在SinkReceiver中:
public interface Sink {
String INPUT = "input"; //这个input代表“输入的通道”的名称
@Input(Sink.INPUT)
SubscribableChannel input();//这个input()方法是生产者输入的function,是固定的写法,和上面的那个input不是一个东西哦
}
它通过@Input
注解绑定了一个名为input
的通道。除了Sink
之外,Spring Cloud Stream还默认实现了绑定output
通道的Source
接口,还有结合了Sink
和Source
的Processor
接口,实际使用时我们也可以自己通过@Input
和@Output
注解来定义绑定消息通道的接口。当我们需要为@EnableBinding
指定多个接口来绑定消息通道的时候,可以这样定义:@EnableBinding(value = {Sink.class, Source.class})
。
@StreamListener
:该注解主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。在上面的例子中,我们通过@StreamListener(Sink.INPUT)
注解将receive
方法注册为对input
消息通道的监听处理器,所以当我们在RabbitMQ的控制页面中发布消息的时候,receive
方法会做出对应的响应动作。上面我们通过RabbitMQ的控制台完成了发送消息来验证了消息消费程序的功能,虽然这种方法比较low,但是通过上面的步骤,相信大家对RabbitMQ和Spring Cloud Stream的消息消费已经有了一些基础的认识。下面我们通过编写生产消息的单元测试用例来完善我们的入门内容。
package com.liu.streamhello; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Output; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import org.springframework.stereotype.Service; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @EnableBinding(value = {SinkApplicationTests.SinkSender.class}) public class SinkApplicationTests { @Autowired private SinkSender sinkSender; @Test public void sinkSenderTester() { sinkSender.output().send(MessageBuilder.withPayload("produce a message :http://blog.alliance.com").build()); } @Service public interface SinkSender { String OUTPUT = "input"; @Output(SinkSender.OUTPUT) MessageChannel output(); } }
2019-07-10 16:31:58.753 INFO 7288 --- [h-QvZLWrh2qDw-1] com.liu.streamhello.SinkReceiver : Received: produce a message :http://blog.alliance.com
在上面的单元测试中,我们通过@Output(SinkSender.OUTPUT)定义了一个输出通过,而该输出通道的名称为input,与前文中的Sink中定义的消费通道同名,所以这里的单元测试与前文的消费者程序组成了一对生产者与消费者。到这里,本文的内容就次结束,如果您能够独立的完成上面的例子,那么对于Spring Cloud Stream的基础使用算是入门了。
本文转载地址来源于:
http://blog.didispace.com/spring-cloud-starter-dalston-7-1/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。