当前位置:   article > 正文

stream 多个字段分组_SpringCloud 进阶:消息驱动之Spring Cloud Stream 消费者分组

springboot stream 根据多个字段group

我的博客:程序员笑笑生,欢迎浏览博客!

上一章 SpringCloud进阶:Spring Cloud Stream 核心组件当中,我们了解了Spring Cloud Stream的核心组件和Spring Integration的简介,本章我们将聊一聊消费者分组相关的知识。

前言

在实际的企业应用场景下,一条消息只能被一个消费者消费,但是在我们部署的应用中,通常会一个消费者应用部署了多台实例。Spring Cloud Stream利用消费者分组就解决这个问题,确保当生产者发送一条消息后,多个实例当中只有一个能够消费到这样的消息。

一 、多实例未分组消费者测试

在我们之前的章节中, SpringCloud进阶-消息驱动pring Cloud Stream中,我们创建了消费者服务:server-receiver和生产者:server-sender,接下来我们结合之前的注册中心Eureka搭建多实例的消费者,首先在server-receiver引入Eureka客户端的依赖:

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  4. </dependency>

在resource目录下新建applicaiton-s1.yml 、applicaiton-s2.yml 通过applicaiton.yml中配置spring.profiles.active属性激活不同的配置文件:

applicaiton-s1.yml

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. input:
  6. destination: mytopic
  7. binder: defaultRabbit
  8. binders:
  9. defaultRabbit:
  10. type: rabbit
  11. environment:
  12. spring:
  13. rabbitmq:
  14. host: localhost
  15. port: 5672
  16. server:
  17. port: 8081
  18. eureka:
  19. instance:
  20. hostname: eureka7001.com #eureka服务端的实例名称
  21. instance-id: receiver1
  22. client:
  23. service-url:
  24. # 与注册中心交互的url
  25. defaultZone: http://eureka7001.com:7001/eureka/
  26. enabled: true

applicaiton-s2.yml

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. input:
  6. destination: mytopic
  7. binder: defaultRabbit
  8. binders:
  9. defaultRabbit:
  10. type: rabbit
  11. environment:
  12. spring:
  13. rabbitmq:
  14. host: localhost
  15. port: 5672
  16. server:
  17. port: 8082
  18. eureka:
  19. instance:
  20. hostname: eureka7001.com #eureka服务端的实例名称
  21. instance-id: receiver2
  22. client:
  23. service-url:
  24. # 与注册中心交互的url
  25. defaultZone: http://eureka7001.com:7001/eureka/
  26. enabled: true

生产者:server-sender的代码不变化,也不需要注册到Eureak中,

  1. @RestController
  2. public class SenderController {
  3. @Autowired
  4. SenderSource source;
  5. @RequestMapping("/send")
  6. public String sender(String msg) {
  7. source.output().send(MessageBuilder.withPayload(msg).build());
  8. return "ok";
  9. }

先后启动Eureak和服务消费者server-receiver 两 个实例,最后在启动生产者:server-sender,我们看看Eureka中:

80fe47319e2ceff7ae4f0cb12269fcfa.png

显示了2个消费者,我们通过HTTP调用生产者的发送接口: http://localhost:8081/send?msg=test

我们看到receiver1:日志

917b7a9460783386ec8b4cb96b2bdc13.png

我们看到receiver2:日志

9aa81096f2ac39378fb594791ce94059.png

我们看到每个实例都会受到消息。这不是我们想要的,我们需要不管消费者服务有多少实例,确保只有一个实例消费信息。

二、添加分组配置

在Spring Cloud Stream中,如果不给消费者指定一个组Group,那么Spring Cloud Stream将会给当前的实例分配一个匿名的、独立的只有一个成员的消费组,这就导致了一个服务n个实例,就会有n个消费者分组

怎么样去配置是的所有的实例都是一个组呢?我们可以通过配置 spring.cloud.stream.bindings.input.group=group1就可以实现,我们在applicaiton-s1.yml 和 applicaiton-s2.yml 添加:

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. input:
  6. group: group1

重启项目后,再次通过生产者发送消息后,就能确保只有一个消费者收到消息了。

我们可以看到未分组之前是这样的:

我们看到每个实例都会受到消息。这不是我们想要的,我们需要不管消费者服务有多少实例,确保只有一个实例消费信息。

二、添加分组配置

在Spring Cloud Stream中,如果不给消费者指定一个组Group,那么Spring Cloud Stream将会给当前的实例分配一个匿名的、独立的只有一个成员的消费组,这就导致了一个服务n个实例,就会有n个消费者分组

怎么样去配置是的所有的实例都是一个组呢?我们可以通过配置 spring.cloud.stream.bindings.input.group=group1就可以实现,我们在applicaiton-s1.yml 和 applicaiton-s2.yml 添加:

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. input:
  6. group: group1

重启项目后,再次通过生产者发送消息后,就能确保只有一个消费者收到消息了。

我们可以看到未分组之前是这样的:

e23bb51c1ecd24dcc0306c574c285f2b.png

分组之后:

0864fb72b79d8528748894df40ca47bf.png

总结

本章主要是通过示例的方式,使用Spring Cloud Stream如何实现消费者分组,这也是在实际的开发中需要考虑的问题。

----END----

6ecd1b5c58704622a316cf68546c3169.png
SpringCloud基础教程(一)-微服务与SpringCloud​mp.weixin.qq.com
af826725e4dfb218a43d223b4cafa1b0.png
SpringCloud基础教程(二)-服务发现 Eureka​mp.weixin.qq.com
812df7adab73dd53dd886d891931a033.png
SpringCloud基础教程(三)-Eureka进阶​mp.weixin.qq.com
62b2589e47a369f0ad11e2516f3c97f8.png
SpringCloud 基础教程(四)-配置中心入门​mp.weixin.qq.com
5a23ae55e11360328b57ea95db712c2a.png
SpringCloud基础教程(五)-配置中心热生效和高可用​mp.weixin.qq.com
99aa439c5e6b4446247e370010b607c7.png
SpringCloud 基础教程(六)-负载均衡Ribbon​mp.weixin.qq.com
7b4625b108b9660567058ed74d1062b9.png
SpringCloud 基础教程(七)-Feign声明式服务调用​mp.weixin.qq.com
06d197f8febec45d99858c14d71c9918.png
SpringCloud 基础教程(八)-Hystrix熔断器(上)​mp.weixin.qq.com
05efa3ddd5b1e0e7ad16ccedb9a0faac.png
SpringCloud 基础教程(九)-Hystrix服务监控(下)​mp.weixin.qq.com
cff46afff195ee9be89fda6fed2d9060.png
SpringCloud 基础教程(十)-Zuul 服务网关​mp.weixin.qq.com
436f6aa9f1e271d08012d3036a7447fc.png
SpringCloud 基础教程(十一)- Sleuth 调用链追踪简介​mp.weixin.qq.com
51d1b1f9ffb630cec70590d88a515f10.png
SpringCloud 基础教程(十二)-Zipkin 分布式链路追踪系统搭建​mp.weixin.qq.com
3c94780062f1bc278b7dcbdd2aa52dd3.png
SpringCloud 进阶: 消息驱动(入门) Spring Cloud Stream【Greenwich.SR3】​mp.weixin.qq.com
87f6868abf922cadcb2b2ef7d91fd428.png
SpringCloud进阶:Spring Cloud Stream核心组件​mp.weixin.qq.com
243c9c580b7c1d76683e94e101f3cef8.png

更多精彩内容,请期待...

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/840773
推荐阅读
相关标签
  

闽ICP备14008679号