赞
踩
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
生产者、消费者之间靠消息媒介传递信息内容
Message
消息必须走特定的通道
消息通道MessageChannel
消息通道MessageChannel的子接口SubscribableChannel,由消息处理器MessageHandler消息处理器所订阅
stream凭什么可以统一底层差异?
Binder:
input:消息消费者 output:消息生产者
Topic主题进行广播:
Binder:
Channel:
Source和Sink:
生产者模块:cloud-stream-rabbitmq-provider8801
消费者模块:cloud-stream-rabbitmq-consumer8802
消费者模块:cloud-stream-rabbitmq-consumer8803
新建moudle: cloud-stream-rabbitmq-provider8801
pom:
<!--stream rabbit-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
yml:
server: port: 8801 spring: application: name: cloud-stream-provider cloud: #配置要绑定的rabbitmq的服务信息 stream: binders: defaultRabbit: #表示定义的名称,用于binding整合 type: rabbit #消息组件类型 environment: #设置rabbitmq的相关的环境配置 bindings: #服务的整合处理 output: #这个名字是一个通道的名称,output表示这是一个消息的生产者 destination: studyExchange #表示要使用的Exchange名称定义 content-type: application/json #设置消息类型,本次为json,文本则设置为“text/plain” binder: defaultRabbit #设置要绑定的消息服务的具体设置 rabbitmq: host: localhost port: 5672 username: guest password: guest eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka/ instance: lease-expiration-duration-in-seconds: 5 #设置五秒的时间间隔 lease-renewal-interval-in-seconds: 2 #设置心跳的间隔时间 instance-id: send-8801.com #在信息列表显示的主机名称 prefer-ip-address: true #访问的路径变为ip地址
业务类:
发送消息接口:
public interface MessageProvider {
public String send();
}
实现类:
@Service public class MessageProviderImpl implements MessageProvider { /** * 消息推送的桥梁 */ @Autowired private StreamBridge streamBridge; @Override public String send() { String serial = UUID.randomUUID().toString(); //send方法第一个参数是binding的通道名称,在yml文件中定义,第二个参数是想要发送的消息,利用MessageBuilder构建消息 streamBridge.send("output", MessageBuilder.withPayload(serial).build()); System.out.println("消息serial: "+serial); return null; } }
控制层:
@RestController
public class SendMessageController {
@Autowired
private MessageProvider messageProvider;
@RequestMapping("/sendMsg")
public String sendMessage(){
return messageProvider.send();
}
}
测试:
启动eureka7001
启动rabbitmq
启动消息生产者8801
查看RabbitMQ后台,此处的交换机名称为yml中配置的交换机名称:
访问:http://localhost:8801/sendMessage
可以看到多次访问之后消息波峰发生了变化,说明消息发送成功
新建module:cloud-stream-rabbitmq-consumer8802
pom:
<!--stream rabbit-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
yml:
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: #配置要绑定的rabbitmq的服务信息 stream: binders: defaultRabbit: #表示定义的名称,用于binding整合 type: rabbit #消息组件类型 environment: #设置rabbitmq的相关的环境配置 bindings: #服务的整合处理 input: #这个名字是一个通道的名称,output表示这是一个消息的生产者 destination: studyExchange #表示要使用的Exchange名称定义 content-type: application/json #设置消息类型,本次为json,文本则设置为“text/plain” binder: defaultRabbit #设置要绑定的消息服务的具体设置 rabbitmq: host: localhost port: 5672 username: guest password: guest eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka/ instance: lease-expiration-duration-in-seconds: 5 #设置五秒的时间间隔 lease-renewal-interval-in-seconds: 2 #设置心跳的间隔时间 instance-id: receive-8802.com #在信息列表显示的主机名称 prefer-ip-address: true #访问的路径变为ip地址
业务类:
@Slf4j
@Service
public class StreamConsumerService {
/**
* 方法名(即Consumer的bean实例名)需要是yml配置中的通道名,应用程序启动后会自动接收生产者发送的消息
*/
@Bean
public Consumer<String> output(){
return message -> log.info("消息:"+message);
}
}
测试:
先后启动 eureka7001,provider8801,consumer8802
测试8801向8802发送消息:访问 http://localhost:8801/sendMsg
查看控制台是否打印出消息发送和接受的信息
依照 consumer8802,clone出来一份 consumer8803:
启动:
运行后有两个问题:
重复消费问题:
目前是8802、8803同事都收到了消息,存在重复消费的问题
解决方法:分组和持久化属性group
生产实际案例:
结论: 不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费
分组
原理:微服务应用放置于同一个group中,就能保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
8802、8803都变成不同组,group不相同:
consumer8802修改yml:
在 binder 下面添加分组信息,与binder对齐:
binder: defaultRabbit #设置要绑定的消息服务的具体设置
group: groupA #设置分组名
consumer8803修改yml:
binder: defaultRabbit #设置要绑定的消息服务的具体设置
group: groupB #设置分组名
在两个消费者分组不同的情况下,消息会被两个微服务重复消费
设置两个微服务分组相同时,消息不会被重复消费,消息会以轮询的方式分发到多个微服务中去
持久化
停止8802、8803并去除掉8802的分组
8801先发送四条消息到rabbitmq
先启动8802,无分组属性配置,后台没有打出来消息
再启动8803,有分组属性配置,后台打出来了rabbitmq上的消息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。