赞
踩
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
官方定义 SpringCloud Stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 SpringCloud Stream 中的 binder 对象交互。通过我们配置来绑定,而 SpringCloud Stream 的 binder 对象负责与消息中间件交互。所以,我们只要搞清楚如何与 SpringCloud Stream 交互就可以方便使用消息驱动的方式。
通过使用 Spring Integration 来连接消息代理中间件以实现消息驱动事件。SpringCloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持 RabbitMQ 和 Kafka。
Spring Cloud Stream Reference Documentation
Spring Cloud Stream中文指导手册_书上有云的博客-CSDN博客
a.标准MQ
生产这/消费者之间靠消息(Message)媒介传递信息内容。
消息必须走特定的通道(MessageChannel)。
消息通道里的消息如何被处理呢?谁负责收发处理。
b.为什么用 Cloud Stream
比方说我们用到了 RabbitMQ 和 Kafka,由于这两个消息中间件的架构上不同,我们需要做不同的处理。例如 RabbitMQ 有 exchange,Kafka有Topic和Partitions 分区。
这些中间件的差异性给我们实际项目开发造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求我们想往另外一种消息队列进行迁移,这时候就是灾难性的,一大堆的东西都要重新推到重新做,因为它跟我们的系统耦合了,这时候SpringCloud Stream给我提供了一种解耦的方式。
SpringCloud Stream通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder中 INPUT对应于消费者,OUTPUT 对应于生成者。
c.Stream 中的消息通信方式遵循了发布-订阅模式
使用topic主题进行广播
Binder:很方便的连接中间件,屏蔽差异。
Channel:通道,是队列Queue的一种抽象,在消息通信系统中就是实现存储和转发的媒介,通过Channel对队列进行配置。
Source和Sink:简单的可理解为输入和输出。从Stream发布消息就是输出,接收Stream的消息就是输入。
MiddleWare:中间件,目前只支持RabbitMQ和Kafka。
Binder:Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。
@Output:注解标识输出通道,发布的消息将通过该通道离开应用程序。
@StreamListener:监听队列,用于消费者的队列的消息接收。
@EnableBinding:指信道channel和exchange绑定在一起。
RabbitMQ环境已经OK
新建3个子模块:
新建Moudle :cloud-stream-rabbitmq-provider8801
POM
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>cloud-study</artifactId>
- <groupId>com.cloud.study</groupId>
- <version>1.0</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
-
- <dependencies>
- <!-- stream-rabbit -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
- </dependency>
-
- <!-- Eureka client -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
- </dependency>
- <!-- 引入自己的 cloud-api-commons 模块-->
- <dependency>
- <groupId>com.cloud.study</groupId>
- <artifactId>cloud-api-commons</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
-
- <!--开启热部署-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- <scope>runtime</scope>
- <optional>true</optional>
- </dependency>
- <!-- lombok -->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <!-- test -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- </project>
YAML
- server:
- port: 8801
-
- spring:
- application:
- name: cloud-stream-provider
- cloud:
- stream:
- binders:
- defaultRabbit: #在此处配置要绑定的rabbitMQ的服务信息
- type: rabbit #消息组件类型
- environment: #s设置 rabbitmq的相关的配置环境
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: guest
- password: guest
- bindings: #服务的整合处理
- output: #这个名字是一个通道的名称
- destination: studyExchange #表示要适应的 Exchange 名称定义
- content-type: application/json #设置消息类型,本次为json,文本则设置 text/plain
- binder: defaultRabbit #设置要绑定的消息服务的具体设置
-
- eureka:
- client:
- register-with-eureka: true #表示是否将自己注册进EurekaServer,默认为true
- fetch-registry: true #是否从 EurekaServer 抓取自己有的注册信息,默认为true。单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
- service-url:
- #设置与 Eureka Server 交互的地址查询服务和注册服务都需要依赖这个地址
- defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/ #集群 Eureka 配置
- instance:
- instance-id: ${spring.application.name} #微服务名
- prefer-ip-address: true #显示IP
主启动 StreamMQMain8801
- @SpringBootApplication
- @EnableEurekaClient
- public class StreamMQMain8801 {
- public static void main(String[] args) {
- SpringApplication.run(StreamMQMain8801.class, args);
- }
- }
业务类
- public interface IMessageProvider {
- boolean send(String msg);
- }
- @Slf4j
- @EnableBinding(Source.class) //定义消息的推送管道 org.springframework.cloud.stream.messaging.Source
- public class MessageProviderImpl implements IMessageProvider {
- @Resource
- private MessageChannel output; //消息发送管道
-
- @Override
- public boolean send(String msg) {
- boolean r = output.send(MessageBuilder.withPayload(msg).build()); //org.springframework.messaging.support.MessageBuilder
- log.info("******** sendMsg: "+ msg);
- return r;
- }
- }
- @RestController
- @Slf4j
- public class SendMessageController {
- @Resource
- private IMessageProvider messageProvider;
-
- @GetMapping(value = "sendMessage")
- public String sendMessage(@RequestParam("msg") String msg){
- boolean r = messageProvider.send(msg);
- return (r ? "消息发送成功" : "消息发送失败") + " ,msg:"+msg;
- }
- }
测试
新建模块 cloud-stream-rabbitmq-consumer8802
POM文件
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>cloud-study</artifactId>
- <groupId>com.cloud.study</groupId>
- <version>1.0</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
-
- <dependencies>
- <!-- stream-rabbit -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
- </dependency>
-
- <!-- Eureka client -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
- </dependency>
- <!-- 引入自己的 cloud-api-commons 模块-->
- <dependency>
- <groupId>com.cloud.study</groupId>
- <artifactId>cloud-api-commons</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
-
- <!--开启热部署-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- <scope>runtime</scope>
- <optional>true</optional>
- </dependency>
- <!-- lombok -->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <!-- test -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- </project>
yaml
- server:
- port: 8802
-
- spring:
- application:
- name: cloud-stream-provider
- cloud:
- stream:
- binders:
- defaultRabbit: #在此处配置要绑定的rabbitMQ的服务信息
- type: rabbit #消息组件类型
- environment: #s设置 rabbitmq的相关的配置环境
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: guest
- password: guest
- bindings: #服务的整合处理
- input: #这个名字是一个通道的名称, 接收消息
- destination: studyExchange #表示要适应的 Exchange 名称定义
- content-type: application/json #设置消息类型,本次为json,文本则设置 text/plain
- binder: defaultRabbit #设置要绑定的消息服务的具体设置
-
- eureka:
- client:
- register-with-eureka: true #表示是否将自己注册进EurekaServer,默认为true
- fetch-registry: true #是否从 EurekaServer 抓取自己有的注册信息,默认为true。单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
- service-url:
- #设置与 Eureka Server 交互的地址查询服务和注册服务都需要依赖这个地址
- defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/ #集群 Eureka 配置
- instance:
- instance-id: ${spring.application.name} #微服务名
- prefer-ip-address: true #显示IP
主启动类
- @SpringBootApplication
- @EnableEurekaClient
- public class StreamMQMain8802 {
- public static void main(String[] args) {
- SpringApplication.run(StreamMQMain8802.class, args);
- }
- }
业务类:消息监听
- @Slf4j
- @Component
- @EnableBinding({Sink.class}) //org.springframework.cloud.stream.messaging.Sink
- public class ReceiverMessageListener {
- @Value("${server.port}")
- private String serverPort;
-
- @StreamListener(Sink.INPUT)
- public void input(Message<String> message){ //org.springframework.messaging.Message;
- log.info("消费者1号 ----->接收到消息: "+ message.getPayload() + "\t port:"+serverPort);
- }
- }
测试
按照上面的步骤创建 cloud-stream-rabbitmq-consumer8803 模块
启动项目:
访问 http://localhost:8801/sendMessage?msg=12 ,发现 8002/8003 都收到了消息,此时就存在重复消费的问题。
比如在如下场景中,订单系统我们做机器部署,产生一个订单时,如果一个订单同时被两个订单服务获取到,那么就会造成错误的数据,我们得避免这种情况。这时我们就可以使用 Stream 终端消息分组来解决。
在Stream 中处于统一group 中的多个消费者是竞争关系,就能保证消息只会被其中的一个应用消费一次。不同分组是可以重复消费的。
使用 分组和持久化属性 group 解决重复消费问题。
8002 为 testA,yaml中加入 group配置
- spring:
- application:
- name: cloud-stream-provider
- cloud:
- stream:
- binders:
- defaultRabbit: #在此处配置要绑定的rabbitMQ的服务信息
- type: rabbit #消息组件类型
- environment: #s设置 rabbitmq的相关的配置环境
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: guest
- password: guest
- bindings: #服务的整合处理
- input: #这个名字是一个通道的名称, 接收消息
- destination: studyExchange #表示要使用的 Exchange 名称定义
- content-type: application/json #设置消息类型,本次为json,文本则设置 text/plain
- binder: defaultRabbit #设置要绑定的消息服务的具体设置
- group: testA #定义分组的名称为 testA
8002 为 testB,yaml中加入 group配置
- spring:
- application:
- name: cloud-stream-provider
- cloud:
- stream:
- binders:
- defaultRabbit: #在此处配置要绑定的rabbitMQ的服务信息
- type: rabbit #消息组件类型
- environment: #s设置 rabbitmq的相关的配置环境
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: guest
- password: guest
- bindings: #服务的整合处理
- input: #这个名字是一个通道的名称, 接收消息
- destination: studyExchange #表示要使用的 Exchange 名称定义
- content-type: application/json #设置消息类型,本次为json,文本则设置 text/plain
- binder: defaultRabbit #设置要绑定的消息服务的具体设置
- group: testB #定义分组的名称为 testB
重启8802/8803 ,在 RabbitMQ 管理后台的 Exchanges 的Binder ,如下:
此时他们属于不同的分组,发送消息后,8802/8803都收到了消息。
修改 8803 的yaml,将 group 改成 testA,此时发送消息,只能被其中的一个微服务收到。
消息接收会按照客户端负载均衡原则,将不同的消息发布到同组的不同的微服务上。
比如,发送了消息A、B、C
- 8802会先接受到 A
- 8803再接收到 B
- 最后,8802接收到C
通过上述,解决了重复消费问题,再看看持久化。
在 yaml 里面配置了 group ,则就会将消息持久化。因此 group 分组信息强烈建议配置。
先按照如下步骤操作
在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的服务节点调用,来协同产生最后的请求结果,每一个前段请求都会形成一个复杂的分布式服务调用链路,链路中的任何一个环节出现高延时或错误都会引起整个请求最后的失败。
https://github.com/spring-cloud/spring-cloud-sleuthhttps://github.com/spring-cloud/spring-cloud-sleuthSpringCloud Sleuth 提供了一套完整的服务跟踪的解决方案。
在分布式系统中提供追踪解决方案并兼容支持了 zipkin。
SpringCloud 从 F 版其已不需要自己构建 Zipkin Server 了,只需要调用jar包即可。
下载地址: Central Repository: io/zipkin/zipkin-server
我们下载最新的 2.23.4 中的 zipkin-server-2.23.4-exec.jar
进入 jar 包所在目录,输入如下命令即可启动 zipkin
java -jar zipkin-server-2.23.4-exec.jar
出现如下页面说明启动成功
访问 http://localhost:9411/
完整的调用链路:表示请求链路,一条链路通过 Trace Id 唯一标识,Span 标识发起的请求信息,各Span 通过 parent id 关联起来。
我们精简一下上图调用过程
整个链路的依赖关系如下:
名词解释:
新建 Module :cloud-provider-sleuth-payment8001
pom
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>cloud-study</artifactId>
- <groupId>com.cloud.study</groupId>
- <version>1.0</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>cloud-provider-sleuth-payment8001</artifactId>
-
- <dependencies>
- <!-- sleuth+zipkin 依赖 -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-zipkin</artifactId>
- </dependency>
- <!-- Eureka client -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
- </dependency>
- <!-- 引入自己的 cloud-api-commons 模块-->
- <dependency>
- <groupId>com.cloud.study</groupId>
- <artifactId>cloud-api-commons</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
- <!--开启热部署-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- <scope>runtime</scope>
- <optional>true</optional>
- </dependency>
- <!-- lombok -->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <!-- test -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- </project>
yaml
- server:
- port: 8001
- spring:
- application:
- name: cloud-payment-sleuth-service
- zipkin:
- base-url: http://localhost:9411/ #zipkin服务地址
- sleuth:
- sampler:
- probability: 1 #采样率,0~1 ,1表示全部采集,一般使用 0.5
-
- eureka:
- client:
- register-with-eureka: true #表示是否将自己注册进EurekaServer,默认为true
- fetch-registry: true #是否从 EurekaServer 抓取自己有的注册信息,默认为true。单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
- service-url:
- #设置与 Eureka Server 交互的地址查询服务和注册服务都需要依赖这个地址
- defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/ #集群 Eureka 配置
- instance:
- instance-id: ${spring.application.name}
- prefer-ip-address: true #显示IP
主启动
- @EnableEurekaClient
- @SpringBootApplication
- public class PaymentSleuthMain8001 {
- public static void main(String[] args) {
- SpringApplication.run(PaymentSleuthMain8001.class, args);
- }
- }
业务类
- @RestController
- @Slf4j
- public class PaymentController {
-
- @GetMapping("/payment/zipkin")
- public String paymentZipkin() {
- return "你好,我是服务提供者";
- }
- }
新建 Module:cloud-consumer-sleuth-order80
POM
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>cloud-study</artifactId>
- <groupId>com.cloud.study</groupId>
- <version>1.0</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>cloud-consumer-sleuth-order80</artifactId>
-
- <dependencies>
- <!-- sleuth+zipkin 依赖 -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-zipkin</artifactId>
- </dependency>
- <!-- Eureka client -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
- </dependency>
- <!-- 引入自己的 cloud-api-commons 模块-->
- <dependency>
- <groupId>com.cloud.study</groupId>
- <artifactId>cloud-api-commons</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
- <!--开启热部署-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- <scope>runtime</scope>
- <optional>true</optional>
- </dependency>
- <!-- lombok -->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <!-- test -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- </project>
yaml
- server:
- port: 80
-
- spring:
- application:
- name: cloud-consume-sleuth-order
- zipkin:
- base-url: http://localhost:9411/ #zipkin服务地址
- sleuth:
- sampler:
- probability: 1 #采样率,0~1 ,1表示全部采集,一般使用 0.5
-
- eureka:
- client:
- register-with-eureka: true #表示是否将自己注册进EurekaServer,默认为true
- fetch-registry: true #是否从 EurekaServer 抓取自己有的注册信息,默认为true。单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
- service-url:
- #设置与 Eureka Server 交互的地址查询服务和注册服务都需要依赖这个地址
- defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/ #集群 Eureka 配置
- instance:
- instance-id: ${spring.application.name}
- prefer-ip-address: true #显示IP
主启动
- @EnableEurekaClient
- @SpringBootApplication
- public class OrderSleuthMain80 {
- public static void main(String[] args) {
- SpringApplication.run(OrderSleuthMain80.class, args);
- }
- }
配置类
- @Configuration
- public class ApplicationConfig {
- @Bean
- @LoadBalanced //赋予RestTemplate负载均衡的能力
- public RestTemplate getRestTemplate() {
- return new RestTemplate();
- }
- }
业务类
- @Slf4j
- @RestController
- public class OrderController {
- private static final String PAYMENT_URL = "http://cloud-payment-sleuth-service";//要写微服务的注册名称,即服务提供者的 spring.application.name
-
- @Autowired
- private RestTemplate restTemplate;
-
- @GetMapping("/consumer/payment/zipkin")
- public String paymentZipkin() {
- return restTemplate.getForObject(PAYMENT_URL+"/payment/zipkin", String.class);
- }
- }
分别启动 7001/7002 ,8001,80
多访问几次 http://localhost/consumer/payment/zipkin
打开 http://localhost:9411/ 的依赖面板,可以看到链路调用。可以自行研究各个按钮的具体功能。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。