赞
踩
SpringCloudStream 是一个构建高扩展和事件驱动的微服务系统的框架,用于连接共有消息系统,官网地址: spring.io/projects/sp… 。整体上是把各种花里胡哨的MQ产品抽象成了一套非常简单的统一的编程框架,以实现事件驱动的编程模型。社区官方实现了RabbitMQ,Apache Kafka,Kafka Stream和Amazon Kinesis这几种产品,而其他还有很多产品比如RocketMQ,都是由产品方自行提供扩展实现。
所以可以看到,对于RabbitMQ,使用SpringCloudStream框架算是一种比较成熟的集成方案。但是需要主要注意的是,SpringCloudStream框架集成的版本通常是比RabbitMQ落后几个版本的,使用时需要注意。
SpringCloudStream框架封装出了三个最基础的概念来对各种消息中间件提供统一的抽象:
可以看到,这个模型非常简单,使用时也会非常方便。但是简单,意味着SCStream中的各种概念模型,与RabbitMQ的基础概念之间是有比较大的差距的,例如Exchange、Queue这些原生概念,集成到SCStream框架时,都需要注意如何配置,如何转换。
RabbitMQ的SpringCloudStream支持是由Spring社区官网提供的,所以这也是相当成熟的一种集成方案。但是要注意,SpringCloudStream框架集成的版本通常是比RabbitMQ产品本身落后几个版本的,使用时需要注意。
他的核心依赖也就一个:
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <!-- artifactId>spring-cloud-starter-stream-rabbit</artifactId -->
- <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
- </dependency>
- 复制代码
这两个Maven依赖没有什么特别大的区别,实际上,他们的github代码库是在一起的。仓库地址:github.com/spring-clou…
依赖的版本通常建议使用SpringCloud的整体版本控制。 org.springframework.cloud#spring-cloud-dependencies#Hoxton.SR6,这样各个组件之间的版本比较安全。不建议贸然尝试新版本。
- spring.rabbitmq.addresses=192.168.253.131:5672,192.168.253.132:5672,192.168.253.133:5672
- spring.rabbitmq.username=admin
- spring.rabbitmq.password=admin
- spring.rabbitmq.virtual-host=/mirror
- 复制代码
需要在springboot启动类上加上如下注解
- @EnableBinding({Source.class, Sink.class})
- 复制代码
- @Component
- @EnableBinding(Sink.class)
- public class MessageReceiver {
- private Logger logger = LoggerFactory.getLogger(MessageReceiver.class);
-
- @EventListener
- @StreamListener(Sink.INPUT)
- public void process(Object message) {
- System.out.println("received message : " + message);
- logger.info("received message : {}", message);
- }
- }
- 复制代码
- @RestController
- @EnableBinding(Source.class)
- public class SendMessageController {
-
- @Autowired
- private Source source;
-
- @GetMapping("/send")
- public Object send(String message) {
- MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message);
- source.output().send(messageBuilder.build());
- return "message sended : "+message;
- }
- 复制代码
启动服务之后,会在RabbitMQ服务中自动创建topic类型的交换机(scstreamExchange)及一个匹配所有routingKey(#)的队列(scstreamExchange.myinput-1),并且会进行绑定,如下:
这里可以看到,当前消费者不光收到了MQ消息,还收到了一些系统事件(received message相关信息)。这些系统事件需要添加@EventListener注解才能接收到。
下面去掉@EventListener再次测试一下
SpringCloudStream在使用的时候默认会创建自己的交换机和队列,如果要使用我们自己已有的,就需要进行一下配置,如一个fanout类型的exchange,绑定了四个队列的模式
配置信息:
- #-----设置消息生产者
- spring.cloud.stream.bindings.output.destination=fanoutExchange
- #队列类型
- spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=fanout
- #不用自己创建、用现有
- spring.cloud.stream.rabbit.bindings.output.producer.bind-queue=false
-
-
- #-----设置消息消费者
- spring.cloud.stream.bindings.input.destination=fanoutExchange
- spring.cloud.stream.rabbit.bindings.input.consumer.exchange-type=fanout
-
- #接收消息的队列
- spring.cloud.stream.bindings.input.group=fanout.q1
- #不自动创建队列
- spring.cloud.stream.rabbit.bindings.input.consumer.bind-queue=false
- #设置queue的名字只有group的名字,不包括destination
- spring.cloud.stream.rabbit.bindings.input.consumer.queue-name-group-only=true
- spring.cloud.stream.bindings.input.content-type=text/plain
- 复制代码
通过以上的设置,发送消息的时候就可以给fanoutExchange交换机发送消息,这样和fanoutExchange交换机绑定的队列就都可以收到消息。
需要注意的是,SpringCloudStream中创建交换机和队列的时候,会将交换机的名称作为前缀如下:
因此使用我们自己创建的交换机和队列的时候,需要观察一下是否也是按照如上规则创建的,如果队列的前缀没有交换机的名称,则需要加如下配置
- spring.cloud.stream.rabbit.bindings.input.consumer.queue-name-group-only=true
- 复制代码
如下,测试项fanoutExchange发送消息之后,队列fanout.q1就可以收到消息
而和fanoutExchange绑定的其他三个队列的消息则仍处于待消费状态,如下
1-7-2-1、配置文件
使用topic或者direct模式的时候,都会使用routingkey,但是使用SpringCloudStream的时候是无法直接穿routingKey的,这就需要在消息发送的时候设置header来进行设置
如使用topic的模式来发送,首先需要修改配置信息,如下:
- #--------------使用routingkey------
- #-----设置消息生产者
- spring.cloud.stream.bindings.output.destination=topicExchange
- #队列类型
- spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=topic
- #是否持久化
- spring.cloud.stream.rabbit.bindings.output.producer.exchange-durable=true
- #不用自己创建、用现有
- spring.cloud.stream.rabbit.bindings.output.producer.bind-queue=false
- #设置routingkey
- spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.routingkey
-
- #-----设置消息消费者
- spring.cloud.stream.bindings.input.destination=topicExchange
- spring.cloud.stream.rabbit.bindings.input.consumer.exchange-type=topic
- spring.cloud.stream.rabbit.bindings.input.consumer.exchange-durable=true
- #接收消息的队列
- spring.cloud.stream.bindings.input.group=hebei.eco
- #不自动创建队列
- spring.cloud.stream.rabbit.bindings.input.consumer.bind-queue=false
- #设置queue的名字只有group的名字,不包括destination
- spring.cloud.stream.rabbit.bindings.input.consumer.queue-name-group-only=true
- #设置接收消息的routingkey
- spring.cloud.stream.rabbit.bindings.input.consumer.binding-routing-key=*.eco
- spring.cloud.stream.bindings.input.content-type=text/plain
- 复制代码
有routingkey的配置和fanout类型没有routingkey配置不同的有
1、在发送端需要指定routingkey,headers为固定设置,routingkey为具体的key值,如name=zhangsan,则可以设置headers.name
- #设置routingkey
- spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.routingkey
- 复制代码
2、在消费端配置消费的routingkey,此处配置的routingkey,就可以设置*
或者#
进行匹配,如下
- #设置接收消息的routingkey
- spring.cloud.stream.rabbit.bindings.input.consumer.binding-routing-key=*.eco
- 复制代码
1-7-2-2、发送端代码修改
如下在MessageBuilder中需要设置Header的key和value,也就是routingkey的key和value值,在上配置中设置的key为routing,则在发送端的代码中header就设置为routingkey
- @RestController
- @EnableBinding(Source.class)
- public class SendMessageController {
-
- @Autowired
- private Source source;
-
- @GetMapping("/send")
- public Object send(String message,String routingkey) {
- // MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message);
- MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message).setHeader("routingkey",routingkey);
- source.output().send(messageBuilder.build());
- return "message sended : "+message;
- }
- 复制代码
1-7-2-3、测试
首先设置routingkey的值为abcd,这样是无法收到消息的,因为消费端设置的routingkey的值为:*.eco
设置routingkey为:abcd.eco,消息就可以正常接收了,如果设置abcd.123.eco,消息就无法接收了,除非将routingkey设置为#.eco
在SpringBoot的autoconfigure包当中,有个 RabbitProperties类,这个类就会解析application.properties中以spring.rabbitmq开头的配置。里面配置了跟RabbitMQ相关的主要参数,包含服务器地址等。里面对每个参数也都提供了默认值。如果不进行配置,默认就是访问本地的RabbitMQ服务。
- #这几个是默认配置。
- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
- spring.rabbitmq.virtual-host=/
- 复制代码
既然是要对接RabbitMQ,那么最终还是需要与RabbitMQ服务器进行交互的。从RabbitMQ的管理页面上来看,SCStream帮我们在RabbitMQ的根虚拟机上创建了一个topic类型的scstreamExchange交换机,然后在这个交换机上绑定了一个scstreamExchange.stream队列,绑定的RoutingKey是#。 而程序中的消息发送者是将消息发送到scstreamExchange交换机,然后RabbitMQ将消息转发到scstreamExchange.stream队列,消息接收者从队列接收到消息。这个流程,就是Spring Cloud Stream在背后为我们做的事情。 在这里可以尝试对应RabbitMQ的基础概念以及SCStream框架中的基础概念,整理一下他们之间的对应关系。
SCStream框架帮我们屏蔽了与消息中间件的交互细节,开发人员甚至都不需要感知消息中间件的存在,将更多的关注点放到业务处理的细节里。实际上,就我们这个简单的示例,只需要将maven中的spring-cloud-starter-stream-rabbit依赖,换成spring-cloud-starter-stream-kafka,就可以完成与本地Kafka服务的交互,代码不需要做任何的改动。
在RabbitMQ的实现中,所有个性化的属性配置实现都是以spring.cloud.stream.rabbit开头,支持对binder、producer、consumer进行单独配置。
- #绑定exchange
- spring.cloud.stream.binding.<bindingName>.destination=fanoutExchange
- #绑定queue
- spring.cloud.stream.binding.<bindingName>.group=myQueue
- #不自动创建queue
- spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.bindQueue=false
- #不自动声明exchange(自动声明的exchange都是topic)
- spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.declareExchange=false
- #队列名只声明组名(前面不带destination前缀)
- spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.queueNameGroupOnly=true
- #绑定rouytingKey
- spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.bindingRoutingKey=myRoutingKey
- #绑定exchange类型
- spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.exchangeType=<type>
- #绑定routingKey
- spring.cloud.stream.rabbit.bindings.<bindingName>.producer.routingKeyExpression='myRoutingKey'
- 复制代码
通过这些配置可以按照RabbitMQ原生的方式进行声明。例如,SCStream自动创建的Exchange都是Topic类型的,如果想要用其他类型的Exchange交换机,就可以手动创建交换机,然后在应用中声明不自动创建交换机。
所有可配置的属性,参见github仓库中的说明。例如,如果需要声明一个Quorum仲裁队列,那么只要给这个Binding配置quorum.enabled属性,值为true就可以了。
Stream队列目前尚不支持。RabbitMQ周边生态的发展肯定是比产品自身的发展速度要慢的,由此也可见,目前阶段,Stream队列离大规模使用还是有一点距离的。
分组可以让消息实现负载均衡的策略,例如大并发过来之后,生成端会发送大量消息,而消费端消费速度较慢就可以生成多个分组,然后生产端根据策略向不同的分组发送消息,就可以加快消息的消费速度
SCStream中的消费者分组策略,其实整体来看是一种类似于Kafka的分组消费机制。即,不同group的消费者,都会消费到所有的message消息,而在同一个goup中,每个message消息,只会被消费一次。这种分组消费的策略,严格来说,在RabbitMQ中是不存在的,RabbitMQ是通过不同类型的Exchange来实现不同的消费策略。而使用SCStream框架,就可以直接在RabbitMQ中实现这种分组消费的策略
- #消息生产者端配置
- #启动发送者分区
- spring.cloud.stream.bindings.output.producer.partitioned=true
- #指定参与消息分区的消费端节点数量
- spring.cloud.stream.bindings.output.producer.partition-count=2
- #只有消费端分区ID为1的消费端能接收到消息
- spring.cloud.stream.bindings.output.producer.partition-key-expression=1
-
- #消息消费者端配置
- #启动消费分区
- spring.cloud.stream.bindings.input.consumer.partitioned=true
- #参与分区的消费端节点个数
- spring.cloud.stream.bindings.input.consumer.instance-count=2
- #设置该实例的消费端分区ID
- spring.cloud.stream.bindings.input.consumer.instance-index=1
- 复制代码
通过这样的分组策略,当前这个消费者实例就只会消费奇数编号的消息,而偶数编号的消息则不会发送到这个消费者中。**注意:**这并不是说偶数编号的消息就不会被消费,只是不会被当前这个实例消费而已。
SCStream框架虽然实现了这种分组策略机制,但是其实是不太严谨的,当把分区数量和分区ID不按套路分配时,并没有太多的检查和日志信息,但是就是收不到消息。
另外,在@StreamListener注解中还有condition属性也可以配置消费者的分配逻辑,该属性支持一个SPELl表达式,只接收满足条件的消息。
当设置了分组消费的时候,绑定的队列及routingkey就变成了如下关系
上面的配置只设置固定的消费分组,实际场景中显然是不行的,这样就可以通过使用header来进行处理
可以配置headers.routingkey来进行动态发送
- spring.cloud.stream.bindings.output.destination=scstreamExchange
- #指定参与消息分区的消费端节点数量
- spring.cloud.stream.bindings.output.producer.partition-count=2
- #只有消费端分区ID为1的消费端能接收到消息
- #spring.cloud.stream.bindings.output.producer.partition-key-expression=0
- spring.cloud.stream.bindings.output.producer.partition-key-expression=headers.routingkey
-
- #这个input就对应Sink.INPUT strem中默认的消费队列
- spring.cloud.stream.bindings.input.destination=scstreamExchange
- spring.cloud.stream.bindings.input.group=myinput
- #参与分区的消费端节点个数
- spring.cloud.stream.bindings.input.consumer.instance-count=2
- #设置该实例的消费端分区ID
- spring.cloud.stream.bindings.input.consumer.instance-index=0
- #启动消费分区
- spring.cloud.stream.bindings.input.consumer.partitioned=true
- 复制代码
然后在发送端代码就可以通过设置header的routingkey来指定发送的分组了
- @GetMapping("/send")
- public Object send(String message,String routingkey) {
- // MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message);
- MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message).setHeader("routingkey",routingkey);
- source.output().send(messageBuilder.build());
- return "message sended : "+message;
- }
- 复制代码
测试:
无法收到消息: http://localhost:8080/send?message=fdsfgg&routingkey=1
可以收到消息: http://localhost:8080/send?message=fdsfgg&routingkey=0
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。