赞
踩
概述
平时我们进行开发的时候,一个庞大的系统可能会有多个不同的技术团队去进行开发和维护,那么一个系统的不同子系统里,所使用的MQ消息队列可能都是不一样的:
RabbitMQ
RocketMQ
Kafka
ActiveMQ
ZeroMQ
JMS
有很多,跨团队跨项目进行MQ的使用其实会有很多的麻烦。不论是对开发还是维护来讲,都会很烦躁。
所以我们引入了stream这个组件,这个组件相当于是基于mq的封装,通过stream就能适配所有的mq,如此我们只需要关注stream的使用,具体使用了什么mq,我们不需要关注,如此一来减少了开发维护成本,也减少了新人入职的学习成本。
举个例子,如果把各种mq比作不同的数据库,那么stream就可以比作是navcat,不同的数据库都可以在navcat中进行可视化操作,那么我们只需要关注所见即所得就行(鼠标随便点点就行),具体不同数据库执行的什么脚本我们可以不需要在意即可。
基于stream的mq模型
如上如所示,通过binder绑定器,我们就能去操作切换到不同的mq了,原先我们是通过springboot去结合不同mq然后去实现代码,而现在,只需要结合binder就行,假设我们更换了mq产品,如果我们采用stream,那么我们的业务代码不需要更改,如果没有使用stream,那么所有的代码实现都得改。从中就能看出stream的优势。
output: 消息输出通道,其实也是channel管道,生产者把消息发送给binder。
input: 消息输入通道,也是channel管道,生产者的消息往这里扔,binder把消息通过input发给消费者。
理解output和input:代码层面构建消息以后输出到binder(MQ),再由binder输入到另一个代码层处理消费。
往往在编码之前先去了解熟悉模型,会更有利于编码,这也和数学公式一个道理,公式会了,解答各种代数啊几何啊就迎刃而解。
1.在生产者和消费者的微服务pom中引入依赖,这里我们使用rabbitMQ,所以直接是用stream-rabbit即可:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2.yml中引入配置
spring:
cloud:
stream:
bindings: # 绑定通道和交换机
myInput: # 定义消费者的通道
# 自定义交换机名字,也就是消息从底层mq中输出到消费端进行消费
destination: streamExchange
myOutput: #定义生产者的通道
# 自定义交换机名字,也就是代码里构建消息,交给底层mq的交换机
destination: streamExchange
3.构建自定义通道
/** * 构建通道channel */ @Component public interface MyStreamChannel { String INPUT = "myInput"; String OUTPUT = "myOutput"; @Input(MyStreamChannel.INPUT) SubscribableChannel input(); @Output(MyStreamChannel.OUTPUT) MessageChannel output(); }
4.发送消息 - 构建接口与实现
public interface StreamService {
public void sendStream();
}
/** * 开启绑定器 * 绑定通道channel */ @Component @EnableBinding(MyStreamChannel.class) public class StreamServiceImpl implements StreamService { // 注入管道output,用于发送消息 @Autowired private MyStreamChannel myStreamChannel; @Override public void sendStream() { AppUser user = new AppUser(); user.setId("test1001"); user.setNickname("imooc"); // 发送消息 myStreamChannel.output().send(MessageBuilder.withPayload(user).build()); } }
5.消费者消费
@Component
@EnableBinding(MyStreamChannel.class)
public class MyStreamConsumer {
@StreamListener(MyStreamChannel.INPUT)
public void receive(AppUser user) {
System.out.println(user);
}
}
6.通过controller调用并且发送:
@Autowired
private StreamService streamService;
@RequestMapping("/stream/producer")
public Object streamProducer() {
streamService.sendStream();
return GraceJSONResult.ok();
}
题外话
stream可以对接rabbitmq和kafka,看似很强大,但是我们实际中并不会使用stream,主要看团队需求,我们的项目很大,也有很多子项目,消息框架有RabbitMQ,kafka,JMS,但是我们并不会使用stream。一方面负责消息方面的开发人员本身就需要对这些框架中间件熟悉,其次我们也会有自己的一些对mq的封装,提供一些对外的api封装,所以使用起来并不是麻烦。其次,很少会出现切换mq框架这样的场景,因为技术选型前期是敲定的,如果经历开发阶段,会经常变更一些框架,那就是架构师的选型失败。
引子
目前我们可以在多个服务中的消费者去进行消息消费业务处理,那么这其实也就是发布订阅模式,微信公众号就是如此的形式。但是在有些场景下,消息只能被消费一次,就比如说在高并发下,通过mq进行流量削峰,假设有1万个请求都是对订单支付状态做修改,并且现在有10台节点的消费者,那么按照现在的模式,每笔订单都会被处理10次,这样就是重复消费了,我们应该要让不同的消费者只处理1次,也就是10000/10,每个节点消费1千个请求即可。这才是我们的目的。
我们可以通过消息分组,来实现重复消费的问题。通过分组group,只要被group分组了,那么这个group里的所有消费者都只会对消息消费一次,彼此是隔离关系。举个例子,消费者都是人,饭店里的饭桌就是分组group,现在我上了很多饺子,总共1万只饺子,谁吃了饺子就代表这个饺子被消费了,别人无法再吃那个饺子了。
注意:那如果把消息发送给不同的分组呢?那么不同的分组都会去消费,只是组内成员是不会重复消费的。
实现消息分组
首先我们为微服务设置分组,分组是设置在input通道节点下
在文章服务中设置分组为:girls
在用户服务中设置分组为:boys
myInput:
destination: streamExchange
group: grils
myInput:
destination: streamExchange
group: boys
修改controller
for (int i = 0 ; i < 10 ; i ++) {
streamService.eat("我吃了第" + (i + 1) + "只饺子");
}
修改生产者
public void eat(String dumpling);
@Override
public void eat(String dumpling) {
myStreamChannel.output()
.send(MessageBuilder.withPayload(dumpling).build());
}
修改消费者:
@StreamListener(MyStreamChannel.INPUT)
public void receive(String dumpling) {
System.out.println(dumpling);
}
消息持久化
使用group分组以后还有一个优势就是支持消息的持久化,看如下操作:
停止2个用户服务集群
发送消息
观察:此时文章服务消费,用户服务由于停了,所以无法消费
恢复2个用户服务集群
观察:此时用户服务可以消费原来的消息,这就是未消费的消息做了持久化,不会造成消息丢失
也就是说,如果你没有使用group,那么如果你宕机重启恢复后,原来的消息就无法消费处理了。其实就是只要在一个桌子上吃饭,哪怕你离开上厕所了,你的饺子还是你的,回来以后还能吃。如果你在饭厅吃大锅饭,你离开了,服务器就把你的碗给收走了。道理也是一样的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。