赞
踩
返回首页 > 【笔记】Spring Cloud Alibaba Nacos
接上一节问题:
单使用@RefreshScope + @Value以上4种场景均不能生效(需要重启spring boot项目,mqtt服务器才能重新连接)
当我们修改nacos值时,发现控制台打印信息如下
查看 RefreshEventListener.class 代码,发现当naocs变化时handle方法会调用,并打印变化的key
- public void handle(RefreshEvent event) {
- if (this.ready.get()) {
- log.debug("Event received " + event.getEventDesc());
- Set<String> keys = this.refresh.refresh();
- log.info("Refresh keys changed: " + keys);
- }
-
- }
所以,只需要参考 RefreshEventListener.class 新建一个监听类 MqttConfigListener.java即可
说明:关键看
MqttConfigListener.java
- package com.hzd.listener;
-
- import com.hzd.mqtt.client.EmqClient;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.context.event.ApplicationReadyEvent;
- import org.springframework.cloud.context.refresh.ContextRefresher;
- import org.springframework.cloud.endpoint.event.RefreshEvent;
- import org.springframework.cloud.endpoint.event.RefreshEventListener;
- import org.springframework.context.ApplicationEvent;
- import org.springframework.context.event.SmartApplicationListener;
- import org.springframework.stereotype.Component;
-
- import java.util.Set;
- import java.util.concurrent.atomic.AtomicBoolean;
-
- /**
- * Coding by 李炯 on 2022/11/11 11:24
- */
- @Component
- @Slf4j
- public class MqttConfigListener implements SmartApplicationListener {
-
- @Autowired
- private EmqClient emqClient;
- private ContextRefresher refresh;
- private AtomicBoolean ready = new AtomicBoolean(false);
-
- public MqttConfigListener(ContextRefresher refresh) {
- this.refresh = refresh;
- }
-
- public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
- return ApplicationReadyEvent.class.isAssignableFrom(eventType) || RefreshEvent.class.isAssignableFrom(eventType);
- }
-
- public void onApplicationEvent(ApplicationEvent event) {
- if (event instanceof ApplicationReadyEvent) {
- this.handle((ApplicationReadyEvent)event);
- } else if (event instanceof RefreshEvent) {
- this.handle((RefreshEvent)event);
- }
-
- }
-
- public void handle(ApplicationReadyEvent event) {
- this.ready.compareAndSet(false, true);
- }
-
- public void handle(RefreshEvent event) {
- if (this.ready.get()) {
- Set<String> keys = this.refresh.refresh();
- log.info("Refresh keys changed>>>>>>>>: " + keys.toString());
- if (keys.toString().contains("mqtt")){
- emqClient.disConnect();//断开之前的连接
- emqClient.init();//重新连接EMQX
- }
- }
-
- }
- }
EmqClient.java
- package com.hzd.mqtt.client;
-
-
- import com.hzd.mqtt.enums.QosEnum;
- import com.hzd.mqtt.properties.MqttProperties;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.cloud.context.config.annotation.RefreshScope;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Primary;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
-
- /**
- * Created by
- */
- @Component
- @Slf4j
- public class EmqClient {
-
- private IMqttClient mqttClient;
-
- @Autowired
- private MqttProperties mqttProperties;
-
- @Autowired
- private MqttCallback mqttCallback;
-
- @Autowired
- private MqttProperties properties;
-
- @PostConstruct
- public void init() {
- MqttClientPersistence mempersitence = new MemoryPersistence();
- try {
- mqttClient = new MqttClient(mqttProperties.getBrokerUrl(), mqttProperties.getClientId(), mempersitence);
- } catch (MqttException e) {
- log.error("初始化客户端mqttClient对象失败,errormsg={},brokerUrl={},clientId={}", e.getMessage(), mqttProperties.getBrokerUrl(), mqttProperties.getClientId());
- }
- //连接服务端
- connect(properties.getUsername(), properties.getPassword());
- //订阅一个主题
- String topic_hzd = "/hzd/pub/#";
- String topic_device_connect = "/device/connect/#";
- subscribe(topic_hzd, QosEnum.QoS2);
- subscribe(topic_device_connect, QosEnum.QoS2);
- log.info("EmqClient连接成功,订阅成功.topic={},{}", topic_hzd, topic_device_connect);
-
- }
-
-
- /**
- * 连接broker
- *
- * @param username
- * @param password
- */
- public void connect(String username, String password) {
- MqttConnectOptions options = new MqttConnectOptions();
- options.setAutomaticReconnect(true);
- options.setUserName(username);
- options.setPassword(password.toCharArray());
- options.setCleanSession(true);
-
- mqttClient.setCallback(mqttCallback);
-
- try {
- mqttClient.connect(options);
- } catch (MqttException e) {
- log.error("mqtt客户端连接服务端失败,失败原因{}", e.getMessage());
- }
- }
-
- /**
- * 断开连接
- */
- @PreDestroy
- public void disConnect() {
- try {
- mqttClient.disconnect();
- } catch (MqttException e) {
- log.error("断开连接产生异常,异常信息{}", e.getMessage());
- }
- }
-
- /**
- * 重连
- */
- public void reConnect() {
- try {
- mqttClient.reconnect();
- } catch (MqttException e) {
- log.error("重连失败,失败原因{}", e.getMessage());
- }
- }
-
- /**
- * 发布消息
- *
- * @param topic
- * @param msg
- * @param qos
- * @param retain
- */
- public void publish(String topic, String msg, QosEnum qos, boolean retain) {
-
- MqttMessage mqttMessage = new MqttMessage();
- mqttMessage.setPayload(msg.getBytes());
- mqttMessage.setQos(qos.value());
- mqttMessage.setRetained(retain);
- try {
- mqttClient.publish(topic, mqttMessage);
- } catch (MqttException e) {
- log.error("发布消息失败,errormsg={},topic={},msg={},qos={},retain={}", e.getMessage(), topic, msg, qos.value(), retain);
- }
-
- }
-
- /**
- * 订阅
- *
- * @param topicFilter
- * @param qos
- */
- public void subscribe(String topicFilter, QosEnum qos) {
- try {
- mqttClient.subscribe(topicFilter, qos.value());
- } catch (MqttException e) {
- log.error("订阅主题失败,errormsg={},topicFilter={},qos={}", e.getMessage(), topicFilter, qos.value());
- }
-
- }
-
- /**
- * 取消订阅
- *
- * @param topicFilter
- */
- public void unSubscribe(String topicFilter) {
- try {
- mqttClient.unsubscribe(topicFilter);
- } catch (MqttException e) {
- log.error("取消订阅失败,errormsg={},topicfiler={}", e.getMessage(), topicFilter);
- }
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。