当前位置:   article > 正文

SpringCloud微服务组件-消息驱动stream_使用streambridge如何给指定交换机指定队列发送消息和消费

使用streambridge如何给指定交换机指定队列发送消息和消费

SpringCloud微服务组件-消息驱动stream

一 定义

概述

平时我们进行开发的时候,一个庞大的系统可能会有多个不同的技术团队去进行开发和维护,那么一个系统的不同子系统里,所使用的MQ消息队列可能都是不一样的:

RabbitMQ
RocketMQ
Kafka
ActiveMQ
ZeroMQ
JMS
有很多,跨团队跨项目进行MQ的使用其实会有很多的麻烦。不论是对开发还是维护来讲,都会很烦躁。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

所以我们引入了stream这个组件,这个组件相当于是基于mq的封装,通过stream就能适配所有的mq,如此我们只需要关注stream的使用,具体使用了什么mq,我们不需要关注,如此一来减少了开发维护成本,也减少了新人入职的学习成本。

举个例子,如果把各种mq比作不同的数据库,那么stream就可以比作是navcat,不同的数据库都可以在navcat中进行可视化操作,那么我们只需要关注所见即所得就行(鼠标随便点点就行),具体不同数据库执行的什么脚本我们可以不需要在意即可。
基于stream的mq模型
在这里插入图片描述
如上如所示,通过binder绑定器,我们就能去操作切换到不同的mq了,原先我们是通过springboot去结合不同mq然后去实现代码,而现在,只需要结合binder就行,假设我们更换了mq产品,如果我们采用stream,那么我们的业务代码不需要更改,如果没有使用stream,那么所有的代码实现都得改。从中就能看出stream的优势。
output: 消息输出通道,其实也是channel管道,生产者把消息发送给binder。
input: 消息输入通道,也是channel管道,生产者的消息往这里扔,binder把消息通过input发给消费者。
理解output和input:代码层面构建消息以后输出到binder(MQ),再由binder输入到另一个代码层处理消费。
往往在编码之前先去了解熟悉模型,会更有利于编码,这也和数学公式一个道理,公式会了,解答各种代数啊几何啊就迎刃而解。

二 使用配置

1.在生产者和消费者的微服务pom中引入依赖,这里我们使用rabbitMQ,所以直接是用stream-rabbit即可:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

2.yml中引入配置

spring:
  cloud:
    stream:
      bindings:         # 绑定通道和交换机
        myInput:        # 定义消费者的通道
          # 自定义交换机名字,也就是消息从底层mq中输出到消费端进行消费
          destination: streamExchange
        myOutput:       #定义生产者的通道
          # 自定义交换机名字,也就是代码里构建消息,交给底层mq的交换机
          destination: streamExchange
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

3.构建自定义通道

/**
 * 构建通道channel
 */
@Component
public interface MyStreamChannel {

    String INPUT = "myInput";
    String OUTPUT = "myOutput";

    @Input(MyStreamChannel.INPUT)
    SubscribableChannel input();

    @Output(MyStreamChannel.OUTPUT)
    MessageChannel output();

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

4.发送消息 - 构建接口与实现

public interface StreamService {
    public void sendStream();
}
  • 1
  • 2
  • 3
/**
 * 开启绑定器
 * 绑定通道channel
 */
@Component
@EnableBinding(MyStreamChannel.class)
public class StreamServiceImpl implements StreamService {

    // 注入管道output,用于发送消息
    @Autowired
    private MyStreamChannel myStreamChannel;

    @Override
    public void sendStream() {
        AppUser user = new AppUser();
        user.setId("test1001");
        user.setNickname("imooc");

        // 发送消息
        myStreamChannel.output().send(MessageBuilder.withPayload(user).build());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

5.消费者消费

@Component
@EnableBinding(MyStreamChannel.class)
public class MyStreamConsumer {
    @StreamListener(MyStreamChannel.INPUT)
    public void receive(AppUser user) {
        System.out.println(user);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

6.通过controller调用并且发送:

@Autowired
private StreamService streamService;

@RequestMapping("/stream/producer")
public Object streamProducer() {
    streamService.sendStream();
    return GraceJSONResult.ok();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

题外话

stream可以对接rabbitmq和kafka,看似很强大,但是我们实际中并不会使用stream,主要看团队需求,我们的项目很大,也有很多子项目,消息框架有RabbitMQ,kafka,JMS,但是我们并不会使用stream。一方面负责消息方面的开发人员本身就需要对这些框架中间件熟悉,其次我们也会有自己的一些对mq的封装,提供一些对外的api封装,所以使用起来并不是麻烦。其次,很少会出现切换mq框架这样的场景,因为技术选型前期是敲定的,如果经历开发阶段,会经常变更一些框架,那就是架构师的选型失败。

三 常用功能

3.1 消息分组和持久化

引子

目前我们可以在多个服务中的消费者去进行消息消费业务处理,那么这其实也就是发布订阅模式,微信公众号就是如此的形式。但是在有些场景下,消息只能被消费一次,就比如说在高并发下,通过mq进行流量削峰,假设有1万个请求都是对订单支付状态做修改,并且现在有10台节点的消费者,那么按照现在的模式,每笔订单都会被处理10次,这样就是重复消费了,我们应该要让不同的消费者只处理1次,也就是10000/10,每个节点消费1千个请求即可。这才是我们的目的。

我们可以通过消息分组,来实现重复消费的问题。通过分组group,只要被group分组了,那么这个group里的所有消费者都只会对消息消费一次,彼此是隔离关系。举个例子,消费者都是人,饭店里的饭桌就是分组group,现在我上了很多饺子,总共1万只饺子,谁吃了饺子就代表这个饺子被消费了,别人无法再吃那个饺子了。

注意:那如果把消息发送给不同的分组呢?那么不同的分组都会去消费,只是组内成员是不会重复消费的。
实现消息分组

首先我们为微服务设置分组,分组是设置在input通道节点下

在文章服务中设置分组为:girls
在用户服务中设置分组为:boys
  • 1
  • 2
myInput:
  destination: streamExchange
  group: grils
  • 1
  • 2
  • 3
myInput:
  destination: streamExchange
  group: boys
  • 1
  • 2
  • 3

修改controller

for (int i = 0 ; i < 10 ; i ++) {
    streamService.eat("我吃了第" + (i + 1) + "只饺子");
}
  • 1
  • 2
  • 3

修改生产者

public void eat(String dumpling);
  • 1
@Override
public void eat(String dumpling) {
    myStreamChannel.output()
            .send(MessageBuilder.withPayload(dumpling).build());
}
  • 1
  • 2
  • 3
  • 4
  • 5

修改消费者:

@StreamListener(MyStreamChannel.INPUT)
public void receive(String dumpling) {
    System.out.println(dumpling);
}
  • 1
  • 2
  • 3
  • 4

消息持久化

使用group分组以后还有一个优势就是支持消息的持久化,看如下操作:

停止2个用户服务集群
发送消息
观察:此时文章服务消费,用户服务由于停了,所以无法消费
恢复2个用户服务集群
观察:此时用户服务可以消费原来的消息,这就是未消费的消息做了持久化,不会造成消息丢失
也就是说,如果你没有使用group,那么如果你宕机重启恢复后,原来的消息就无法消费处理了。其实就是只要在一个桌子上吃饭,哪怕你离开上厕所了,你的饺子还是你的,回来以后还能吃。如果你在饭厅吃大锅饭,你离开了,服务器就把你的碗给收走了。道理也是一样的。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/840713
推荐阅读
相关标签
  

闽ICP备14008679号