当前位置:   article > 正文

【nacos】5.3 nacos 热更新EMQX服务器地址等配置_nacos mqtt

nacos mqtt

返回首页 > 【笔记】Spring Cloud Alibaba Nacos

上一节问题:

1. 问题

1.1 如果修改一些需要预加载的配置呢,如下场景是否不用启动服务器能立即生效?

  • 修改连接Mqtt服务器,并订阅
  • 修改连接TCP服务器
  • 修改TCP客户端端口
  • 修改mysql等数据库地址,端口,账号,密码

    单使用@RefreshScope + @Value以上4种场景均不能生效(需要重启spring boot项目,mqtt服务器才能重新连接)

1.2 如何才能生效?

当我们修改nacos值时,发现控制台打印信息如下

 查看 RefreshEventListener.class 代码,发现当naocs变化时handle方法会调用,并打印变化的key

  1. public void handle(RefreshEvent event) {
  2. if (this.ready.get()) {
  3. log.debug("Event received " + event.getEventDesc());
  4. Set<String> keys = this.refresh.refresh();
  5. log.info("Refresh keys changed: " + keys);
  6. }
  7. }

所以,只需要参考 RefreshEventListener.class 新建一个监听类 MqttConfigListener.java即可

2. 实战

案例:修改nacos配置,实施重连emqx服务器(mqtt)

说明:关键看

  • MqttConfigListener.handle(RefreshEvent event)
  • emqClient.init();//重新连接EMQX

  • MqttConfigListener.java

  1. package com.hzd.listener;
  2. import com.hzd.mqtt.client.EmqClient;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.commons.logging.Log;
  5. import org.apache.commons.logging.LogFactory;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.context.event.ApplicationReadyEvent;
  8. import org.springframework.cloud.context.refresh.ContextRefresher;
  9. import org.springframework.cloud.endpoint.event.RefreshEvent;
  10. import org.springframework.cloud.endpoint.event.RefreshEventListener;
  11. import org.springframework.context.ApplicationEvent;
  12. import org.springframework.context.event.SmartApplicationListener;
  13. import org.springframework.stereotype.Component;
  14. import java.util.Set;
  15. import java.util.concurrent.atomic.AtomicBoolean;
  16. /**
  17. * Coding by 李炯 on 2022/11/11 11:24
  18. */
  19. @Component
  20. @Slf4j
  21. public class MqttConfigListener implements SmartApplicationListener {
  22. @Autowired
  23. private EmqClient emqClient;
  24. private ContextRefresher refresh;
  25. private AtomicBoolean ready = new AtomicBoolean(false);
  26. public MqttConfigListener(ContextRefresher refresh) {
  27. this.refresh = refresh;
  28. }
  29. public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
  30. return ApplicationReadyEvent.class.isAssignableFrom(eventType) || RefreshEvent.class.isAssignableFrom(eventType);
  31. }
  32. public void onApplicationEvent(ApplicationEvent event) {
  33. if (event instanceof ApplicationReadyEvent) {
  34. this.handle((ApplicationReadyEvent)event);
  35. } else if (event instanceof RefreshEvent) {
  36. this.handle((RefreshEvent)event);
  37. }
  38. }
  39. public void handle(ApplicationReadyEvent event) {
  40. this.ready.compareAndSet(false, true);
  41. }
  42. public void handle(RefreshEvent event) {
  43. if (this.ready.get()) {
  44. Set<String> keys = this.refresh.refresh();
  45. log.info("Refresh keys changed>>>>>>>>: " + keys.toString());
  46. if (keys.toString().contains("mqtt")){
  47. emqClient.disConnect();//断开之前的连接
  48. emqClient.init();//重新连接EMQX
  49. }
  50. }
  51. }
  52. }
  • EmqClient.java
  1. package com.hzd.mqtt.client;
  2. import com.hzd.mqtt.enums.QosEnum;
  3. import com.hzd.mqtt.properties.MqttProperties;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.eclipse.paho.client.mqttv3.*;
  6. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.cloud.context.config.annotation.RefreshScope;
  11. import org.springframework.context.annotation.Bean;
  12. import org.springframework.context.annotation.Primary;
  13. import org.springframework.stereotype.Component;
  14. import javax.annotation.PostConstruct;
  15. import javax.annotation.PreDestroy;
  16. /**
  17. * Created by
  18. */
  19. @Component
  20. @Slf4j
  21. public class EmqClient {
  22. private IMqttClient mqttClient;
  23. @Autowired
  24. private MqttProperties mqttProperties;
  25. @Autowired
  26. private MqttCallback mqttCallback;
  27. @Autowired
  28. private MqttProperties properties;
  29. @PostConstruct
  30. public void init() {
  31. MqttClientPersistence mempersitence = new MemoryPersistence();
  32. try {
  33. mqttClient = new MqttClient(mqttProperties.getBrokerUrl(), mqttProperties.getClientId(), mempersitence);
  34. } catch (MqttException e) {
  35. log.error("初始化客户端mqttClient对象失败,errormsg={},brokerUrl={},clientId={}", e.getMessage(), mqttProperties.getBrokerUrl(), mqttProperties.getClientId());
  36. }
  37. //连接服务端
  38. connect(properties.getUsername(), properties.getPassword());
  39. //订阅一个主题
  40. String topic_hzd = "/hzd/pub/#";
  41. String topic_device_connect = "/device/connect/#";
  42. subscribe(topic_hzd, QosEnum.QoS2);
  43. subscribe(topic_device_connect, QosEnum.QoS2);
  44. log.info("EmqClient连接成功,订阅成功.topic={},{}", topic_hzd, topic_device_connect);
  45. }
  46. /**
  47. * 连接broker
  48. *
  49. * @param username
  50. * @param password
  51. */
  52. public void connect(String username, String password) {
  53. MqttConnectOptions options = new MqttConnectOptions();
  54. options.setAutomaticReconnect(true);
  55. options.setUserName(username);
  56. options.setPassword(password.toCharArray());
  57. options.setCleanSession(true);
  58. mqttClient.setCallback(mqttCallback);
  59. try {
  60. mqttClient.connect(options);
  61. } catch (MqttException e) {
  62. log.error("mqtt客户端连接服务端失败,失败原因{}", e.getMessage());
  63. }
  64. }
  65. /**
  66. * 断开连接
  67. */
  68. @PreDestroy
  69. public void disConnect() {
  70. try {
  71. mqttClient.disconnect();
  72. } catch (MqttException e) {
  73. log.error("断开连接产生异常,异常信息{}", e.getMessage());
  74. }
  75. }
  76. /**
  77. * 重连
  78. */
  79. public void reConnect() {
  80. try {
  81. mqttClient.reconnect();
  82. } catch (MqttException e) {
  83. log.error("重连失败,失败原因{}", e.getMessage());
  84. }
  85. }
  86. /**
  87. * 发布消息
  88. *
  89. * @param topic
  90. * @param msg
  91. * @param qos
  92. * @param retain
  93. */
  94. public void publish(String topic, String msg, QosEnum qos, boolean retain) {
  95. MqttMessage mqttMessage = new MqttMessage();
  96. mqttMessage.setPayload(msg.getBytes());
  97. mqttMessage.setQos(qos.value());
  98. mqttMessage.setRetained(retain);
  99. try {
  100. mqttClient.publish(topic, mqttMessage);
  101. } catch (MqttException e) {
  102. log.error("发布消息失败,errormsg={},topic={},msg={},qos={},retain={}", e.getMessage(), topic, msg, qos.value(), retain);
  103. }
  104. }
  105. /**
  106. * 订阅
  107. *
  108. * @param topicFilter
  109. * @param qos
  110. */
  111. public void subscribe(String topicFilter, QosEnum qos) {
  112. try {
  113. mqttClient.subscribe(topicFilter, qos.value());
  114. } catch (MqttException e) {
  115. log.error("订阅主题失败,errormsg={},topicFilter={},qos={}", e.getMessage(), topicFilter, qos.value());
  116. }
  117. }
  118. /**
  119. * 取消订阅
  120. *
  121. * @param topicFilter
  122. */
  123. public void unSubscribe(String topicFilter) {
  124. try {
  125. mqttClient.unsubscribe(topicFilter);
  126. } catch (MqttException e) {
  127. log.error("取消订阅失败,errormsg={},topicfiler={}", e.getMessage(), topicFilter);
  128. }
  129. }
  130. }

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号