赞
踩
Spring Cloud Stream 是一个用于构建消息驱动型微服务的框架。它是 Spring Cloud 生态系统中的一个组件,建立在 Spring Boot 之上,旨在简化和统一消息中间件的集成和使用。
Spring Cloud Stream 支持多种常见的消息中间件,例如 Apache Kafka、RabbitMQ 等,开发人员可以根据需求选择合适的消息中间件进行集成。
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Middleware:中间件,目前只支持RabbitMQ和Kafka
Binder:Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的与中间件进行连接。
与RabbitMQ整合依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
与Kafka整合依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
这里以对接RabbitMQ为例
<!--rabbit依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
spring: application: name: cloud-stream-provider cloud: stream: # 配置要绑定的rabbitmq的服务信息(可以配置多个) binders: # 定义的名称,用户Binding整合 defaultRabbit: # 消息组件类型 type: rabbit # 设置rabbitmq相关的环境配置 environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest # 配置多个展示 例如: # rabbit1: # type: rabbit # environment: # spring: # rabbitmq: # host: <host1> # rabbit2: # type: rabbit # environment: # spring: # rabbitmq: # host: <host2> # 服务的整合信息 bindings: # 信道的名称(订单通道) order_output: # Exchange名称(交换机名称) destination: order_exchange # 设置消息类型,本次为Json content-type: application/json # 设置要绑定的消息服务(前面的binders中配置的服务信息defaultRabbit) binder: defaultRabbit
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.stereotype.Component; @Component public interface StreamClient { /** * 订单 */ String ORDER_OUTPUT = "order_output"; /** * 订单(消息发送通道) * @return */ @Output(ORDER_OUTPUT) MessageChannel orderOutput(); }
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import javax.annotation.Resource; @EnableBinding(StreamClient.class) @Component public class MessageProvider { private static final Integer NO = 0; @Resource private StreamClient streamClient; /** * 发送消息 */ public void sendMessage() { // 构建message对象传给MQ String number = "订单号:" + (NO + 1); boolean send = streamClient.orderOutput().send(MessageBuilder.withPayload(number).build()); System.out.println(send); System.out.println(number); } }
@RestController
public class TestController {
@Resource
private MessageProvider messageProvider;
@GetMapping("/send")
public void sendTest() throws InterruptedException {
for (int i = 0; i < 5; i++){
messageProvider.sendMessage();
Thread.sleep(1000);
}
}
}
访问地址http://localhost:15672 ,保证能正常访问即可
可以看到链接RabbitMQ服务的一些信息
在RabbitMQ点击Exchanges 可以看到创建了order_exchange交换机
http://localhost:15672/#/exchanges
跟生产者一样的
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: # 配置要绑定的rabbitmq的服务信息 binders: # 定义的名称,用户Binding整合 defaultRabbit: # 消息组件类型 type: rabbit # 设置rabbitmq相关的环境配置 environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 信道的名称(订单通道) order_input: # 通道绑定的交换机 destination: order_exchange # 设置消息类型,本次为Json content-type: application/json # 设置要绑定的消息服务(前面的binders中配置的服务信息defaultRabbit) binder: defaultRabbit # 队列名就是:destination+group也就是(order_exchange.order_queue) # 配置了group队列就会持久化,不会丢失 group: order_queue
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface StreamClient { /** * 订单 */ String ORDER_INPUT = "order_input"; /** * 订单(消息接收通道) * @return */ @Input(ORDER_INPUT) SubscribableChannel orderInput(); }
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
// 绑定指定的消息通道
@EnableBinding(StreamClient.class)
@Slf4j
public class ReceiverMessageServer {
@StreamListener(StreamClient.ORDER_INPUT)
public void receive(String message) {
log.info("订单消费者:{}", message);
}
}
启动后会自动向绑定的交换机添加一个队列
生产中为了提到效率往往会有多个消费者来处理消息
再建一个消费者2(直接复制一份前面消费者代码即可)
修改端口号、服务名
与前面的区别:SpringCloudStream中的配置,这里紧紧把group: order_queue这个给注释了
server: port: 8803 spring: application: name: cloud-stream-consumer2 cloud: stream: # 配置要绑定的rabbitmq的服务信息 binders: # 定义的名称,用户Binding整合 defaultRabbit: # 消息组件类型 type: rabbit # 设置rabbitmq相关的环境配置 environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 信道的名称(订单通道) order_input: # 通道绑定的交换机 destination: order_exchange # 设置消息类型,本次为Json content-type: application/json # 设置要绑定的消息服务(前面的binders中配置的服务信息defaultRabbit) binder: defaultRabbit # # 队列名就是:destination+group也就是(order_exchange.order_queue) # # 配置了group队列就会持久化,不会丢失 # group: order_queue
RabbitMQ中的order_exchange交换机中多绑定了一个队列
这时生产者发送消息,发现两个消费者都消费了相同的消息
很简单只要在消费者2的配置文件中中添加分组配置即可
发现消费者1、消费者2不在重复消费消息,问题解决
解决了重复消费的问题后,再去看RabbitMQ中的队列,发现刚才消费者2启动时创建的队列不见了,队列不见了,在往这个队列中发送消息肯定就收不到了,消息也就不能持久化,消息持久化前提是队列持久化
刚才配置中去掉了group配置,因为配置了group这个队列就不是临时队列了,会持久化
队列名称是以destination.group 前面的配置对应的队列名也就是:order_exchange.order_queue
假如不设置group属性的时候,默认是启动一个消费者,就会创建一个消费队列,启动多个服务就会创建多个队列。stream默认使用的是RabbitMQ的topic交换机。当发送者向这个交换机发送消息的时候,两个队列就都会接收到
把消费者2中的group修改一下,验证上面的结果
这里改成user_queue
验证两个事情:
启动服务
根据上面结果验证队列名正确
停掉消费者2服务
发现队列还在,验证队列持久化成功
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。