当前位置:   article > 正文

springcloud 入门(8) springcloud Stream_colud stream : 'id' header is read-only

colud stream : 'id' header is read-only

项目版本

1、jdk:1.8
2、springboot 2.1.6.RELEASE ,springcloud Greenwich.SR6

介绍

在系统开发里面难免用到消息队列,但各个的消息队列又有所区别,SpringCloudStream 的 作用就是屏蔽各种消息队列的区别,对消息队列的 API进行进一步的抽象,使得在springcloud 里面能更加方便的集成各种消息系统。通过使用springcloud Stream ,可以有效简化开发人员对消息中间件的使用复杂程度,让系统开发人员能够有更多精力去关注核心业务逻辑的处理。目前springcloud Stream只支持两大著名的消息中间件,rabbitmq 和 kafka。

Spring Cloud Stream 应用模型

Spring Cloud Stream 应用程序由一个中间件中立的核心组成。 应用程序通过在外部代理公开的目标和代码中的输入/输出参数之间建立绑定来与外部世界进行通信。 建立绑定所需的代理特定细节由特定于中间件的 Binder 实现处理。
以下图片来自官方
来自官方

入门使用

消息生产者

这里使用rabbit作为消息中间件,自行安装rabbitmq
1、新建一个cloud-stream-provider,添加依赖

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2、创建消息消费者类

@EnableBinding(Sink.class)
public class SinkReceiver {

    @StreamListener(Sink.INPUT)
    public void receive(Object receivedMessage){
        System.out.println("receivedMessage="+receivedMessage);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3、启动CloudStreamProviderApplication
启动之后登录rabbitmq后台http://localhost:15672/,使用默认账号密码guest登录
connections一栏能看到我们的连接
在这里插入图片描述点击127.0.0.1:50433可以查看connection详情
在这里插入图片描述再点击127.0.0.1:5672 (1)可以查看Channel详情
在这里插入图片描述点击input.anonymous.leanfVQMRM-6dPfz1XIkCw 就能查看到具体的队列详情
在这里插入图片描述下拉有个Publish message,在这里就可以发布数据
在这里插入图片描述发布完之后就能在控制台看到发布的数据了
在这里插入图片描述
怎么知道这个队列input.anonymous.leanfVQMRM-6dPfz1XIkCw一定是启动的那个呢
其实在启动的时候默认就给我们分配了一个,查看控制台信息可以看到
在这里插入图片描述Stream尝鲜完成了,接下来是个简单入门使用
1、创建一个消息生产者类

@EnableBinding(Source.class)
public class SourceProvider {

    @Resource
    @Qualifier("output")
    private MessageChannel messageChannel;

    public void send(Object sendMessage){
        messageChannel.send(MessageBuilder.withPayload(sendMessage).build());
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2、修改配置文件

server.port=8301
spring.application.name=cloud-stream-provider

#消息组件类型 rabbitmq1为自定义的rabbitmq实例名称,如果有多个消息队列实例的话可以参照下面这样
# type:消息中间件类型
spring.cloud.stream.binders.rabbitmq1.type=rabbit
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.host=localhost
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.virtual-host=/

# 实例2
#spring.cloud.stream.binders.rabbitmq2.type=rabbit
#spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.host=localhost
#spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.username=guest
#spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.password=guest
#spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.port=5672

# 要使用的 Exchange 名称
spring.cloud.stream.bindings.output.destination=streamExchange
#设置消息类型
spring.cloud.stream.bindings.output.content-type=application/json
#要绑定的消息服务的实例名
spring.cloud.stream.bindings.output.binder=rabbitmq1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

3、创建测试类用于发送消息

@RunWith(SpringRunner.class)
@SpringBootTest(classes = CloudStreamProviderApplication.class)
public class CloudStreamProviderTest{

    @Autowired
    private SourceProvider sourceProvider;

    @Test
    public void test(){
        sourceProvider.send("hello,this is first message");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

只有生产者没有消费者是没用的,接下来创建消费者

消息消费者

1、新建一个stream-consumer,添加依赖

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2、创建消息接收类SinkReceiver


@EnableBinding(Sink.class)
public class SinkReceiver {

    @StreamListener(Sink.INPUT)
    public void receive(Message<?> receivedMessage){
        System.out.println("receivedMessage="+receivedMessage.getPayload());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

3、修改配置文件

server.port=8401

spring.application.name=cloud-stream-consumer

#消息组件类型 rabbitmq1为自定义的rabbitmq实例名称,如果有多个消息队列实例的话可以参照下面这样
# type:消息中间件类型
spring.cloud.stream.binders.rabbitmq1.type=rabbit
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.host=localhost
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.port=5672

# 要使用的 Exchange 名称 ,input是rabbitmq的channel名称,后面可以自定义
spring.cloud.stream.bindings.input.destination=streamExchange
#设置消息类型
spring.cloud.stream.bindings.input.content-type=application/json
#要绑定的消息服务的实例名
spring.cloud.stream.bindings.input.binder=rabbitmq1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

启动CloudStreamConsumerApplication,可以看到rabbitmq首页下方有一个消费者
在这里插入图片描述调用CloudStreamProviderTest#test,查看stream-consumer控制台,会发现收到一条消息

receivedMessage=hello,this is first message
  • 1

注解解释

  1. @EnableBinding:用来指定一个或多个定义了@input或@output注解接口,以此实现对消息通道(channel)的绑定。

  2. @StreamListener:主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。

自定义消息管道

改造cloud-stream-provider、stream-consumer

1、新建IProcessor.java

public interface IProcessor {

    /**
     * output channel name.
     */
    String OUTPUT = "stream_output";

    /**
     * Input channel name.
     */
    String INPUT = "stream_input";


    /**
     * @return output channel
     */
    @Output(IProcessor.OUTPUT)
    MessageChannel output();


    /**
     * @return input channel.
     */
    @Input(IProcessor.INPUT)
    SubscribableChannel input();

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

2、修改配置文件
修改cloud-stream-provider配置文件,把channel名称从out改为stream_output

# 要使用的 Exchange 名称 ,output是rabbitmq的channel名称,后面可以自定义
spring.cloud.stream.bindings.stream_output.destination=streamExchange
#设置消息类型
spring.cloud.stream.bindings.stream_output.content-type=application/json
#要绑定的消息服务的实例名
spring.cloud.stream.bindings.stream_output.binder=rabbitmq1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

修改stream-consumer配置文件,把channel名称从out改为stream_input

# 要使用的 Exchange 名称 ,input是rabbitmq的channel名称,后面可以自定义
spring.cloud.stream.bindings.stream_input.destination=streamExchange
#设置消息类型
spring.cloud.stream.bindings.stream_input.content-type=application/json
#要绑定的消息服务的实例名
spring.cloud.stream.bindings.stream_input.binder=rabbitmq1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3、新建类用于测试
3.1 cloud-stream-provider消息生产者端
ProcessorProvider.java

@EnableBinding(IProcessor.class)
public class ProcessorProvider {
    @Resource
    @Qualifier(IProcessor.OUTPUT)
    private MessageChannel messageChannel;
    
    public void send(Object sendMessage){
        messageChannel.send(MessageBuilder.withPayload(sendMessage).build());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

修改测试类

@RunWith(SpringRunner.class)
@SpringBootTest(classes = CloudStreamProviderApplication.class)
public class CloudStreamProviderTest {

    @Autowired
    private SourceProvider sourceProvider;
    @Autowired
    private ProcessorProvider processorProvider;

    @Test
    public void test(){
        sourceProvider.send("hello,this is first message");
    }

    @Test
    public void testProcessor(){
        processorProvider.send("hello,this is testProcessor message");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

3.2 stream-consumer 消息消费端
消息消费类 ProcessorConsumer.java

@EnableBinding(IProcessor.class)
public class ProcessorConsumer {

    @StreamListener(IProcessor.INPUT)
    public void receive(Message<?> receivedMessage){
        System.out.println("receivedMessage="+receivedMessage.getPayload());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4、测试
调用CloudStreamProviderTest#testProcessor方法,stream-consumer受到消息,控制台显示

receivedMessage=hello,this is testProcessor message
  • 1

这样一个简单的自定义消息管道就完成了

消费分组

在现实的微服务架构中,我们的每一个微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例。在很多情况下,消息生产者发送消息给某个具体微服务时,只希望被消费一次,当进行集群部署时,虽然他们都属于同一个项目,但是消息会被多次消费,为了解决这个问题,springcloud Stream 提出了消费组的概念。
可以通过 spring.cloud.stream.bindings.input.group 属性为应用指定一个组名,这样这个应用的多个实例在接收到消息的时候,只会有一个成员真正收到消息并进行处理。

测试:
启动两个cloud-stream-consumer消费端,关于springboot 同一个项目在idea多个运行,请看这篇文章
调用CloudStreamProviderTest#testProcessor,会发现cloud-stream-consumer控制台一个会打印收到的消费消息,一个没有

消息分区

通过多次调用CloudStreamProviderTest#testProcessor,会发现多个cloud-stream-consumer实例的话会轮流收到消息,没有办法控制到底是哪一个实例消费,也就是说,对于同一条消息,它多次到达之后可能是由不同的实例进行消费的。但是对于一些业务场景,需要对一些具有相同特征的消息设置每次都被同一个消费实例处理,比如,一些用于监控服务,为了统计某段时间内消息生产者发送的报告内容,监控服务需要在自身聚合这些数据,那么消息生产者可以为消息增加一个固有的特征 ID 来进行分区,使得拥有这些 ID 的消息每次都能被发送到一个特定的实例上实现累计统计的效果,否则这些数据就会分散到各个不同的节点导致监控结果不一致的情况。而分区概念的引入就是为了解决这样的问题:当生产者将消息数据发送给多个消费者实例时,保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理。

修改消费者cloud-stream-consumer端配置文件,增加如下配置

# 是否开启消息分区 Default: 'false'
spring.cloud.stream.bindings.stream_input.consumer.partitioned=true
# 当前消费者总的实例数量
spring.cloud.stream.instance-count=2
# 当前实例的索引号,最大值为spring.cloud.stream.instance-count 减 1
spring.cloud.stream.instance-index=1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

修改生产者端cloud-stream-provider配置文件,增加如下配置

# 分区键的表达式规则,可以根据实际的输出消息规则配置spEL来生成合适的分区键
spring.cloud.stream.bindings.stream_output.producer.partition-key-expression=payload
# 消息分区数量
spring.cloud.stream.bindings.stream_output.producer.partition-count=1
  • 1
  • 2
  • 3
  • 4

修改完成之后,同时启动多个消费者并为每个消费者指定不同的索引,会发现消息被发送到消息组时,只有一个实例在接收和处理消息。

设置RoutingKey

RoutingKey 其 实 是 RabbitMq 的 概 念 , 在 RabbitMq 里 面 其 实 有 好 几 种 Exchange , 在 SpringCloudStream 里面默认就是使用的最通用的 Topic 如果没有配置 RoutingKey,它使用的 RoutingKey 其实就是#,既类似于 fanout 的广播类型 其实也可以使用 RoutingKey 来实现类似于 direct 直连类型

修改消费端配置文件

#绑定routing-key
spring.cloud.stream.rabbit.bindings.stream_input.consumer.binding-routing-key=routing_key_a
  • 1
  • 2

修改消息生产端配置文件

#绑定routing-key
spring.cloud.stream.rabbit.bindings.stream_output.producer.routing-key-expression='routing_key_a'
  • 1
  • 2

springcloudstream

springcloud Stream 使用rabbitmq作为消息中间件的部分就简单完成了,有兴趣的也可以使用kafka作为消息中间件。

GitHub地址:
https://github.com/ArronSun/micro-services-practice.git

参考书籍:

《springcloud微服务实战》

能力一般,水平有限,如有错误,请多指出。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/185319
推荐阅读
相关标签
  

闽ICP备14008679号