当前位置:   article > 正文

spring cloud bus的使用及使用bus发布自定义事件_springcloudbus自定义消息

springcloudbus自定义消息

1. spring cloud bus介绍

Spring cloud bus使用轻量级消息代理将分布式系统的节点连接起来,可以使用此代理,广播状态更改(例如配置更改)或其他管理指令。它可以用作应用程序之间的通信通道。该项目提供了两种消息传输处理:AMQP broker 和Kafka 2. 如果你想使用activemq或其他的消息中间件作为消息传输,那么需要实现spring cloud stream消息驱动的绑定(spring cloud bus其实也是使用stream实现的rabbitmq和kafka)

2. 构建项目

我们使用rabbitmq作为中间件

引入依赖:

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-bus-amqp</artifactId>
  4. </dependency>

application.yml中添加连接rabbitmq的配置:

  1. spring:
  2. rabbitmq:
  3. host: mybroker.com
  4. port: 5672
  5. username: user
  6. password: secret

3. 端点

spring cloud bus 提供了两个端点/actuator/bus-refresh and /actuator/bus-env,他们分别对应了Spring Cloud Commons中的/actuator/refresh和/actuator/env

 

3.1 /actuator/bus-refresh

可以清除RefreshScope中的缓存并且重新绑定@ConfigurationProperties,一般是配合config组件使用,当config server 中的配置修改时可动态刷新config client中加载的yml配置

暴露端口:

management.endpoints.web.exposure.include=bus-refresh

 

3.2 /actuator/bus-env

更新每个实例中指定的environment 键值对

暴露端口:

management.endpoints.web.exposure.include=bus-env

3.3 destination参数

可以为端口后添加destination参数来指定你想广播的服务实例。

假设我们现在使用/bus-refresh端点

当你想指定某个具体服务实例更新时:/actuator/bus-refresh?destination=customers:9000

   参数值为(spring.application.name:server.port)

当你想指定服务更新时:/actuator/bus-refresh?destination=customers:**

   **表示customers服务的所有实例

4. 事件追踪

bus中的事件大致只有3中:

  • EnvironmentChangeRemoteApplicationEvent:对应/bus-env的端点事件
  • RefreshRemoteApplicationEvent:对应/bus-refresh端点的事件
  • AckRemoteApplicationEvent:这是一个确认事件,没有什么含义,只是说确认事件已发送或已接受
 
spring.cloud.bus.trace.enabled=true

默认是关闭的,因为它是靠本地项目的内存来存储追踪记录的,会比较消耗资源。

开启后会显示发送的每个事件和来自每个服务实例的所有确认事件(ack)

通过访问端点/actuator/trace可查看追踪记录

  1. {
  2. "timestamp": "2015-11-26T10:24:44.411+0000",
  3. "info": {
  4. "signal": "spring.cloud.bus.ack",
  5. "type": "RefreshRemoteApplicationEvent",
  6. "id": "c4d374b7-58ea-4928-a312-31984def293b",
  7. "origin": "stores:8081",
  8. "destination": "*:**"
  9. }
  10. },
  11. {
  12. "timestamp": "2015-11-26T10:24:41.864+0000",
  13. "info": {
  14. "signal": "spring.cloud.bus.sent",
  15. "type": "RefreshRemoteApplicationEvent",
  16. "id": "c4d374b7-58ea-4928-a312-31984def293b",
  17. "origin": "customers:9000",
  18. "destination": "*:**"
  19. }
  20. },
  21. {
  22. "timestamp": "2015-11-26T10:24:41.862+0000",
  23. "info": {
  24. "signal": "spring.cloud.bus.ack",
  25. "type": "RefreshRemoteApplicationEvent",
  26. "id": "c4d374b7-58ea-4928-a312-31984def293b",
  27. "origin": "customers:9000",
  28. "destination": "*:**"
  29. }
  30. }

这里打印的追踪大致意思就是RefreshRemoteApplicationEvent 事件从customers:9000服务实例发出(signal=...sent),并广播了所有的服务实例。

轨迹:我们这里是查看的customers:9000该实例的追踪记录,当请求bus-refresh端口后,该实例显示确认了该事件(第3个json),然后发送了事件到消息中间件中(第2个json),最后因为接受到了来自消息中间件的远程事件,所以再次打印了ack确认日志(第一个json)

5. 源码分析

我们分析下/bus-refresh的整个操作流程

5.1 Spring本地事件发布

需查看类:RefreshBusEndpoint

因为我们是通过/bus-refresh端点作为入口更新系统的配置信息的,所以我们可以找到RefreshBusEndpoint类:

  1. @Endpoint(
  2. id = "bus-refresh"
  3. )
  4. public class RefreshBusEndpoint extends AbstractBusEndpoint {
  5. ...
  6. @WriteOperation
  7. public void busRefresh() {
  8. this.publish(new RefreshRemoteApplicationEvent(this, this.getInstanceId(), (String)null));
  9. }
  10. }

通过该类,我们发现,其实在我们调用/bus-refresh端口时,它是发布了一个RefreshRemoteApplicationEvent事件(spring本地事件),注意是本地事件。RefreshRemoteApplicationEvent的父类是RemoteApplicationEvent

5.2 通过Spring本地事件监听,将消息发送到了消息中间件中

查看类:BusAutoConfiguration

在BusAutoConfiguration配置类中,我们找到了这么一段代码:

  1. @EventListener(
  2. classes = {RemoteApplicationEvent.class}
  3. )
  4. public void acceptLocal(RemoteApplicationEvent event) {
  5. if (this.serviceMatcher.isFromSelf(event) && !(event instanceof AckRemoteApplicationEvent)) {
  6. this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
  7. }
  8. }

发现在这个配置类中,有个本地事件监听器,且监听的是RemoteApplicationEvent事件,而在RefreshBusEndpoint中发布的事件对象的父类正好就是RemoteApplicationEvent事件,所以我们可以断定,这个监听器是能够监听到刚刚发布的事件的。

 从代码中可以看出来它进行了判断,从方法名上就可以看出来,判断了这个监听到的事件是否是自己发的,并且是否是Ack事件,当然我们这里是满足的,所以执行了this.cloudBusOutboundChannel.send()方法,这个方法的意思就是向管道中发送消息,这个管道其实就是spring cloud stream 与 消息中间件绑定的管道,通俗来讲就是它把这个事件发送到rabbitmq上去了。

 

补充:this.cloudBusOutboundChannel对象其实是和消息中间件的channel通道相连接的,查找BusAutoConfiguration类发现,其实该对象和rabbitmq相连接的exchange名称是springCloudBus

  1. @Autowired
  2. @Output("springCloudBusOutput")
  3. public void setCloudBusOutboundChannel(MessageChannel cloudBusOutboundChannel) {
  4. this.cloudBusOutboundChannel = cloudBusOutboundChannel;
  5. }

5.3 Stream远程事件监听

查看类:BusAutoConfiguration

通过上面的操作,事件已经被发送到rabbitmq上去了,那我们本地就要进行远程事件的监听了,监听的就是this.cloudBusOutboundChannel对象和rabbitmq相连接的通道,远程事件的监听使用 @StreamListener()注解

  1. @StreamListener("springCloudBusInput")
  2. public void acceptRemote(RemoteApplicationEvent event) {
  3. if (event instanceof AckRemoteApplicationEvent) {
  4. if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event) && this.applicationEventPublisher != null) {
  5. this.applicationEventPublisher.publishEvent(event);
  6. }
  7. } else {
  8. if (this.serviceMatcher.isForSelf(event) && this.applicationEventPublisher != null) {
  9. if (!this.serviceMatcher.isFromSelf(event)) {
  10. this.applicationEventPublisher.publishEvent(event);
  11. }
  12. if (this.bus.getAck().isEnabled()) {
  13. AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, this.serviceMatcher.getServiceId(), this.bus.getAck().getDestinationService(), event.getDestinationService(), event.getId(), event.getClass());
  14. this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(ack).build());
  15. this.applicationEventPublisher.publishEvent(ack);
  16. }
  17. }
  18. if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
  19. this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(), event.getDestinationService(), event.getId(), event.getClass()));
  20. }
  21. }
  22. }
查看代码我们可以发现接受到事件后会经过很多判断,判断是否是ack事件,或该事件是不是自己发送的(this.serviceMatcher.isFromSelf(event),或者该事件是否为我而来,即消息是不是发给我的(his.serviceMatcher.isForSelf(event):上面讲过当/bus-refresh时可以指定服务实例发送事件,所以虽然能监听到事件,但不一定这个事件就是针对你的

截取下代码:

  1. if (this.serviceMatcher.isForSelf(event) && this.applicationEventPublisher != null) {
  2. if (!this.serviceMatcher.isFromSelf(event)) {
  3. this.applicationEventPublisher.publishEvent(event);
  4. }
  5. }
这段代码意思是:事件就是针对我来的且不是我自己发送的这个事件,那么就又会在本地发送该事件,发送后会被5.4节中的RefreshListener所监听到。如果是自己发的,那么就不做任何处理了,因为他在发送消息到中间件时,就已经被RefreshListener监听器处理过了。

5.4 Spring本地事件监听

查看:RefreshListener类

  1. public class RefreshListener implements ApplicationListener<RefreshRemoteApplicationEvent> {
  2. private static Log log = LogFactory.getLog(RefreshListener.class);
  3. private ContextRefresher contextRefresher;
  4. public RefreshListener(ContextRefresher contextRefresher) {
  5. this.contextRefresher = contextRefresher;
  6. }
  7. public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
  8. Set<String> keys = this.contextRefresher.refresh();
  9. log.info("Received remote refresh request. Keys refreshed " + keys);
  10. }
  11. }

该类是通过实现ApplicationListener的接口来监听RefreshRemoteApplicationEvent事件的,然后执行了this.contextRefresher.refresh()方法来刷新程序中的context上下文。

5.5 流程总结

1. 假设我们访问端口为8080的user服务,当访问端点/bus-refresh时,会被RefreshBusEndpoint执行,该类会发送一个spring本地事件,事件名为RefreshRemoteApplicationEvent(在8080这个实例中进行事件传播)

2. 发送后,会被两个位置的监听器监听:

  • 一个是RefreshListener,该监听器监听到后直接refresh上下文
  • 一个是BusAutoConfiguration,该配置类中存在监听RemoteApplicationEvent事件的监听器,被这个监听器监听后,会向rabbitmq发送事件消息实体

3. 在bus中的应用都会进行strem的远程事件监听(@StreamListener("springCloudBusInput"),应用监听到strem事件后,会判断事件的发送和发往的情况

  • 如果该事件是发向自己的,并且不是自己发出去的,那么会再次将这个event通过spring本地事件发出去,让当前实例的RefreshListener监听器监听到并执行refresh操作
  • 如果该事件既是自己发出去的,也是发向自己的,那么就不执行任何操作了,因为在发出去的时候已经被RefreshListener监听器监听过了
  • 如果事件不是发向自己的,不执行任何操作

6. 通过bus实现自定义事件发送

6.1 创建事件

事件需要继承RemoteApplicationEvent类

  1. public class CustomEvent extends RemoteApplicationEvent {
  2. private CustomEvent() {
  3. //一定要有,序列化时会用到
  4. }
  5. public CustomEvent(String msg, String originService,
  6. String destinationService) {
  7. super(msg, originService, destinationService);
  8. }
  9. }

6.2 添加配置类

  1. @Configuration
  2. @RemoteApplicationEventScan(basePackageClasses = CustomEvent.class)
  3. public class BusConfiguration {
  4. }
@RemoteApplicationEventScan(basePackageClasses = CustomEvent.class)用来告诉bus自己实现的事件在哪个包下,也可直接写在启动类上

6.3  添加事件监听器

  1. @Configuration
  2. public class CustomEventListener {
  3. @Value("${server.port}")
  4. private String port;
  5. @EventListener
  6. public void onCustomRemoteApplicationEvent(CustomEvent event) {
  7. System.out.printf("CustomRemoteApplicationEvent - " +
  8. " port : %s , Source : %s , originService : %s , destinationService : %s \n",
  9. port,
  10. event.getSource(),
  11. event.getOriginService(),
  12. event.getDestinationService());
  13. }
  14. }

我们这里监听器只是打印了一句话

6.4 事件发布者

  1. @RestController
  2. public class Controller {
  3. /**
  4. * Spring Cloud bus 外部化配置
  5. */
  6. @Autowired
  7. private BusProperties busProperties;
  8. /**
  9. * 事件发布者
  10. */
  11. @Autowired
  12. private ApplicationEventPublisher eventPublisher;
  13. @PostMapping("/bus/event/publish/custom")
  14. public boolean publishUserEvent(String msg, @RequestParam(value = "destination", required = false) String destination) {
  15. //这里由于我没有定义ID ,这里Spring Cloud Bus 自己默认实现了ID
  16. String instanceId = busProperties.getId();
  17. CustomEvent event = new CustomEvent(msg, instanceId, destination);
  18. eventPublisher.publishEvent(event);
  19. return true;
  20. }
  21. }

代码中的instanceId可以通过spring.cloud.bus.id属性设置:

下面是官网的原话,我谷歌翻译了以下

6.5 配置文件

  1. server:
  2. port: 8082
  3. management:
  4. endpoints:
  5. web:
  6. exposure:
  7. include: "*"
  8. spring:
  9. rabbitmq:
  10. host: 127.0.0.1
  11. port: 5672
  12. username: springcloud
  13. password: springcloud
  14. application:
  15. name: bus-custom-event

6.5启动测试 

分别以8082和8083端口启动项目(ideal中可以通过修改program arguments参数来对同一个项目并行启动)

启动后,访问:http://localhost:8082/bus/event/publish/custom?destination=bus-custom-event:8083

会发现启动的两个项目都可以监听到事件并打印信息

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/1011864
推荐阅读
相关标签
  

闽ICP备14008679号