当前位置:   article > 正文

Spring Cloud Stream的介绍和使用

spring cloud stream

Spring Cloud Stream是什么

Spring Cloud Stream 是一个用于构建消息驱动型微服务的框架。它是 Spring Cloud 生态系统中的一个组件,建立在 Spring Boot 之上,旨在简化和统一消息中间件的集成和使用。

Spring Cloud Stream的优点

  1. 提供了一种声明式的方式来定义输入和输出消息通道,使开发人员能够更专注于业务逻辑的实现,而不必关心底层消息传递机制。
  2. 通过抽象和封装消息中间件的细节,屏蔽了不同消息中间件之间的差异,降低了切换消息中间件的成本。
  3. 开发人员可以通过注解或配置文件来定义消息通道、消息转换、消息分组等属性。
  4. 提供了一些机制来处理消息消费过程中的错误情况,例如消息重试、错误通知和死信队列等,确保消息的可靠性和可恢复性。
  5. 简化了消息驱动型微服务的开发和集成,提供了一种统一的编程模式,使得开发人员能够更轻松地构建可伸缩、可靠的分布式系统。

Spring Cloud Stream 支持多种常见的消息中间件,例如 Apache Kafka、RabbitMQ 等,开发人员可以根据需求选择合适的消息中间件进行集成。

Spring Cloud Stream设计结构

img

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

Middleware:中间件,目前只支持RabbitMQ和Kafka

Binder:Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的与中间件进行连接。

官方对应的pom依赖

与RabbitMQ整合依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

与Kafka整合依赖

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

Spring Cloud Stream 中常用的注解

  1. @EnableBinding:用于在应用程序中启用绑定器。通过该注解,可以将应用程序绑定到指定的消息通道,以实现消息的发送和接收。
  2. @Input:用于定义输入消息通道。通过该注解,可以将当前组件或方法绑定到指定的输入通道,以接收消息。
  3. @Output:用于定义输出消息通道。通过该注解,可以将当前组件或方法绑定到指定的输出通道,以发送消息。
  4. @StreamListener:用于定义消息监听器方法。通过该注解,可以将方法标记为消息监听器,当从输入通道接收到消息时,会自动触发该方法的执行。
  5. @EnableBinding(Sink.class):是一种简化的启用输入通道绑定的方式,将应用程序绑定到默认的输入通道 Sink.INPUT。
  6. @EnableBinding(Source.class):是一种简化的启用输出通道绑定的方式,将应用程序绑定到默认的输出通道 Source.OUTPUT。

Spring Cloud Stream具体使用

这里以对接RabbitMQ为例

消息生产者

添加依赖
<!--rabbit依赖        -->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
application配置
spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      # 配置要绑定的rabbitmq的服务信息(可以配置多个)
      binders:
        # 定义的名称,用户Binding整合
        defaultRabbit:
          # 消息组件类型
          type: rabbit
          # 设置rabbitmq相关的环境配置
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
        # 配置多个展示 例如:
#        rabbit1:
#          type: rabbit
#          environment:
#            spring:
#              rabbitmq:
#                host: <host1>
#          rabbit2:
#            type: rabbit
#            environment:
#              spring:
#                rabbitmq:
#                  host: <host2>
      # 服务的整合信息
      bindings:
        # 信道的名称(订单通道)
        order_output:
          # Exchange名称(交换机名称)
          destination: order_exchange
          # 设置消息类型,本次为Json
          content-type: application/json
          # 设置要绑定的消息服务(前面的binders中配置的服务信息defaultRabbit)
          binder: defaultRabbit
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
添加接口
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

@Component
public interface StreamClient {

    /**
     * 订单
     */
    String ORDER_OUTPUT = "order_output";

    /**
     * 订单(消息发送通道)
     * @return
     */
    @Output(ORDER_OUTPUT)
    MessageChannel orderOutput();



}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
添加实现类
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@EnableBinding(StreamClient.class)
@Component
public class MessageProvider {

    private static final Integer NO = 0;

    @Resource
    private StreamClient streamClient;

    /**
     * 发送消息
     */
    public void sendMessage() {
        // 构建message对象传给MQ
        String number = "订单号:" + (NO + 1);
        boolean send = streamClient.orderOutput().send(MessageBuilder.withPayload(number).build());
        System.out.println(send);
        System.out.println(number);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
添加测试类
@RestController
public class TestController {

    @Resource
    private MessageProvider messageProvider;

    @GetMapping("/send")
    public void sendTest() throws InterruptedException {
        for (int i = 0; i < 5; i++){
            messageProvider.sendMessage();
            Thread.sleep(1000);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
启动RabbitMQ服务

访问地址http://localhost:15672 ,保证能正常访问即可

img

启动生产者服务

可以看到链接RabbitMQ服务的一些信息

img

自动创建交换机

在RabbitMQ点击Exchanges 可以看到创建了order_exchange交换机

http://localhost:15672/#/exchanges

img

消息消费者

添加依赖

跟生产者一样的

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
application配置
server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      # 配置要绑定的rabbitmq的服务信息
      binders:
        # 定义的名称,用户Binding整合
        defaultRabbit:
          # 消息组件类型
          type: rabbit
          # 设置rabbitmq相关的环境配置
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        # 信道的名称(订单通道)
        order_input:
          # 通道绑定的交换机
          destination: order_exchange
          # 设置消息类型,本次为Json
          content-type: application/json
          # 设置要绑定的消息服务(前面的binders中配置的服务信息defaultRabbit)
          binder: defaultRabbit
          # 队列名就是:destination+group也就是(order_exchange.order_queue)
          # 配置了group队列就会持久化,不会丢失
          group: order_queue
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
添加接口
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface StreamClient {

    /**
     * 订单
     */
    String ORDER_INPUT = "order_input";

    /**
     * 订单(消息接收通道)
     * @return
     */
    @Input(ORDER_INPUT)
    SubscribableChannel orderInput();

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
添加实现类
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

// 绑定指定的消息通道
@EnableBinding(StreamClient.class)
@Slf4j
public class ReceiverMessageServer {

    @StreamListener(StreamClient.ORDER_INPUT)
    public void receive(String message) {
        log.info("订单消费者:{}", message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
启动消费者服务

img

自动创建队列

启动后会自动向绑定的交换机添加一个队列

img

img

生产者发送消息img
消费者接收消息

img

消息重复消费

生产中为了提到效率往往会有多个消费者来处理消息

新建一个消费者

再建一个消费者2(直接复制一份前面消费者代码即可)

修改application配置

修改端口号、服务名

与前面的区别:SpringCloudStream中的配置,这里紧紧把group: order_queue这个给注释了

server:
  port: 8803

spring:
  application:
    name: cloud-stream-consumer2
  cloud:
    stream:
      # 配置要绑定的rabbitmq的服务信息
      binders:
        # 定义的名称,用户Binding整合
        defaultRabbit:
          # 消息组件类型
          type: rabbit
          # 设置rabbitmq相关的环境配置
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        # 信道的名称(订单通道)
        order_input:
          # 通道绑定的交换机
          destination: order_exchange
          # 设置消息类型,本次为Json
          content-type: application/json
          # 设置要绑定的消息服务(前面的binders中配置的服务信息defaultRabbit)
          binder: defaultRabbit
#          # 队列名就是:destination+group也就是(order_exchange.order_queue)
#          # 配置了group队列就会持久化,不会丢失
#          group: order_queue
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

启动服务

img

自动创建队列

RabbitMQ中的order_exchange交换机中多绑定了一个队列

img

img

生产者发送消息

这时生产者发送消息,发现两个消费者都消费了相同的消息

img

img

解决重复消费问题

很简单只要在消费者2的配置文件中中添加分组配置即可

原先配置

img

修改后配置

img

再次发送消息

img

img

发现消费者1、消费者2不在重复消费消息,问题解决

消息持久化

问题

解决了重复消费的问题后,再去看RabbitMQ中的队列,发现刚才消费者2启动时创建的队列不见了,队列不见了,在往这个队列中发送消息肯定就收不到了,消息也就不能持久化,消息持久化前提是队列持久化

img

原因

刚才配置中去掉了group配置,因为配置了group这个队列就不是临时队列了,会持久化

队列名称是以destination.group 前面的配置对应的队列名也就是:order_exchange.order_queue

img

假如不设置group属性的时候,默认是启动一个消费者,就会创建一个消费队列,启动多个服务就会创建多个队列。stream默认使用的是RabbitMQ的topic交换机。当发送者向这个交换机发送消息的时候,两个队列就都会接收到

验证

把消费者2中的group修改一下,验证上面的结果

这里改成user_queue

验证两个事情:

  1. 队列名称是不是:destination.group 也就是 (order_exchange.uesr_queue)
  2. 队列会不会持久化

img

启动服务

img

img

根据上面结果验证队列名正确

停掉消费者2服务

img

img

img

发现队列还在,验证队列持久化成功

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号