赞
踩
Spring Cloud Stream 是一个构建消息驱动微服务的框架。
Spring Cloud Stream解决了开发人员无感知的使用消息中间件的问题,因为Spring Cloud Stream对消息中间件的进一步封装,可以做到代码层面对消息中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka等),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程;
不需要自己写配置类,直接在配置文件中配置关键信息即可
应用程序通过input(相当于消费者consumer)、output(相当于生产者producer)来与Spring Cloud Stream中Binder交互,而Binder负责与消息中间件交互,因此,我们只需关注如何与Binder交互即可,而无需关注与具体消息中间件的交互。
组成 | 说明 |
---|---|
Binder | Binder是应用与消息中间件之间的封装,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现; |
@Input | 该注解标识输入通道,通过该输入通道接收消息进入应用程序 |
@Output | 该注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EnableBinding | 将信道channel和exchange绑定在一起 |
消息推送流程
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
spring: #rabbitmq配置 rabbitmq: host: 192.168.10.1 port: 30002 username: rabbitmq password: rabbitmq cloud: #消息生产者 stream: binders: # rabbitmqBinder是一个key,这个名字是一个通道的名称,在代码中会用到 rabbitmqBinder: # 中间件类型 type: rabbit # bindings: # @Output注解中的值 myOutput: #设置要绑定的消息服务的binder,就是 rabbitmqBinder binder: rabbitmqBinder #destination表示要使用的Exchange名称定义,注意默认是Topic模式 destination: myExchange
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @Description: 自定义消息通道
*/
public interface MessageSource {
//channel名称
String OUTPUT = "myOutput";
@Output(MessageSource.OUTPUT)
MessageChannel output();
}
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import java.util.Date; /** * @Description: 消息推送 */ @EnableBinding(MessageSource.class) public class MessageSender { /** * 消息的发送管道 */ @Autowired private MessageSource messageSource; public void publish(String msg) { messageSource.output().send (MessageBuilder.withPayload(msg).build()); System.out.println("消息发送:<" + msg + "> 完成,时间:" + new Date()); } }
@RestController
public class IndexController{
@Autowired
MessageSender messageSender;
@RequestMapping("Sender")
public void gen(){
messageSender.publish("hello Stream");
}
}
spring: rabbitmq: host: 192.168.10.1 port: 30002 username: rabbitmq password: rabbitmq cloud: #消息生产者 stream: binders: # rabbitmqBinder是一个key,这个名字是一个通道的名称,在代码中会用到 rabbitmqBinder: # 中间件类型 type: rabbit bindings: # 与@Input注解中值对应 myInput: #设置要绑定的消息服务的binder,就是 rabbitmqBinder binder: rabbitmqBinder #destination表示要使用的Exchange名称定义,注意默认是Topic模式 destination: myExchange
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * @Description: 自定义可订阅通道 */ public interface MessageSink { /** * Input channel name. */ String INPUT = "myInput"; /** * 消费者信道 */ @Input(MessageSink.INPUT) SubscribableChannel input(); }
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import java.util.Date;
@EnableBinding(MessageSink.class)
public class MessageReceiver {
@StreamListener(MessageSink.INPUT)
public void input(Message message) {
System.out.println("消息接收:<" + message.getPayload() + "> 完成,时间:" + new Date());
}
}
rabbitmq中消息是非持久化数据,当消费者服务宕机或关闭服务重启无消费者后,生产者继续推送数据,重启后的消费者无法接受到之前的消息,造成消息丢失
该模式多了一个路由routeKey,当消息发送到交换机时,交换机在准备发送给绑定的队列时,会再判断指定的routeKey是否已和改队列进行绑定,如果未绑定,则不发送数据过去。
和路由模式类似,同样是一个生产者多个消费者,中间多了个交换机(exchange),一条消息可以被多个消费者获取。同样是传key,但是他的key是可以模糊匹配的
可以使用通配符进行模糊匹配
符号’#" 匹配一个或多个词
符号"*”匹配不多不少一个词
不处理路由键,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
Fanout交换机转发消息是最快的。
消费者修改配置文件,增加group进行数据持久化
spring:
cloud:
#消息生产者
stream:
bindings:
# 与@Input注解中值对应
myInput:
# 指定分组可以进行消息持久化,分组相同只有一个消费者会消费消息
# null或空字符串值表示匿名组不被共享。
# null或空字符串值表示匿名组不被共享。
# 队列名称会变为为destination.group(交换机.分组)
group: rabbitmqGroupTwo
如果不同的消费者都想接受消息,则可以配置不同的group
此时消费端的微服务宕机或重启,该队列信息依然会被保留在 RabbitMQ中,后续依然可以进行消费;
当项目集群部署了很多份,那么就会变成多个消费者,但是业务可能需要的是一个消息只消费一次,所以此时需要加个分组,就可以实现同一个分组里面的消费者只会有一个消费者能接收到消息;
默认情况下点击rabbitmq的交换机查看如图
spring:
cloud:
#消息生产者
stream:
## rabbitmq 特有的
rabbit:
bindings:
# 与@Input注解中值对应
myInput:
# 附加特定于消费者的属性
consumer:
# 设置一个RoutingKey路由key
bindingRoutingKey: stream.routingKey.#
更多rabbitmq配置信息请参考如下类
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
配置类参考
KafkaBinderConfigurationProperties
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。