当前位置:   article > 正文

MQTT配置_mqtt中options设置

mqtt中options设置
  1. # mqtt:
  2. # 服务器连接地址,如果有多个,用逗号隔开
  3. host: tcp://*********:1883
  4. # 连接服务器默认客户端ID
  5. clientId: ******
  6. # 默认的消息推送主题,实际可在调用接口时指定
  7. topic: testtopic/***/
  8. # 用户名
  9. username: ***
  10. # 密码
  11. password: ***
  12. # 连接超时
  13. timeout: 30
  14. # 心跳
  15. keepalive: 30
  1. package com.ruoyi.web.controller.Mqtt;
  2. import org.eclipse.paho.client.mqttv3.*;
  3. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  4. import org.springframework.boot.ApplicationArguments;
  5. import org.springframework.boot.ApplicationRunner;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class MqttConsumer implements ApplicationRunner {
  9. private static MqttClient client;
  10. @Override
  11. public void run(ApplicationArguments args) {
  12. System.out.println("初始化并启动mqtt......");
  13. this.connect();
  14. }
  15. /**
  16. * 连接mqtt服务器
  17. */
  18. private void connect() {
  19. try {
  20. // 1 创建客户端
  21. getClient();
  22. // 2 设置配置
  23. MqttConnectOptions options = getOptions();
  24. String[] topic = PropertiesUtil.MQTT_TOPIC.split(",");
  25. // 3 消息发布质量
  26. int[] qos = getQos(topic.length);
  27. // 4 最后设置
  28. create(options, topic, qos);
  29. } catch (Exception e) {
  30. System.out.println("mqtt连接异常:" + e);
  31. }
  32. }
  33. /**
  34. * 创建客户端 --- 1 ---
  35. */
  36. public void getClient() {
  37. try {
  38. if (null == client) {
  39. client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence());
  40. }
  41. System.out.println("创建mqtt客户端:" );
  42. } catch (Exception e) {
  43. System.out.println("创建mqtt客户端异常:\" + e:" );
  44. }
  45. }
  46. /**
  47. * 生成配置对象,用户名,密码等 --- 2 ---
  48. */
  49. public MqttConnectOptions getOptions() {
  50. MqttConnectOptions options = new MqttConnectOptions();
  51. options.setUserName(PropertiesUtil.MQTT_USER_NAME);
  52. options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());
  53. // 设置超时时间
  54. options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);
  55. // 设置会话心跳时间
  56. options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
  57. // 是否清除session
  58. options.setCleanSession(false);
  59. System.out.println("--生成mqtt配置对象");
  60. return options;
  61. }
  62. /**
  63. * qos --- 3 ---
  64. */
  65. public int[] getQos(int length) {
  66. int[] qos = new int[length];
  67. for (int i = 0; i < length; i++) {
  68. /**
  69. * MQTT协议中有三种消息发布服务质量:
  70. *
  71. * QOS0: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
  72. * QOS1: “至少一次”,确保消息到达,但消息重复可能会发生。
  73. * QOS2: “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大
  74. */
  75. qos[i] = 1;
  76. }
  77. System.out.println("--设置消息发布质量");
  78. return qos;
  79. }
  80. /**
  81. * 装载各种实例和订阅主题 --- 4 ---
  82. */
  83. public void create(MqttConnectOptions options, String[] topic, int[] qos) {
  84. try {
  85. client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
  86. System.out.println("--添加回调处理类");
  87. client.connect(options);
  88. } catch (Exception e) {
  89. System.out.println("装载实例或订阅主题异常:" + e);
  90. }
  91. }
  92. /**
  93. * 订阅某个主题
  94. *
  95. * @param topic
  96. * @param qos
  97. */
  98. public static void subscribe(String topic, int qos) {
  99. try {
  100. System.out.println("topic:" + topic);
  101. client.subscribe(topic, qos);
  102. } catch (MqttException e) {
  103. e.printStackTrace();
  104. }
  105. }
  106. /**
  107. * 发布,非持久化
  108. *
  109. * qos根据文档设置为1
  110. *
  111. * @param topic
  112. * @param msg
  113. */
  114. public static void publish(String topic, String msg) {
  115. publish(1, false, topic, msg);
  116. }
  117. /**
  118. * 发布
  119. */
  120. public static void publish(int qos, boolean retained, String topic, String pushMessage) {
  121. MqttMessage message = new MqttMessage();
  122. message.setQos(qos);
  123. message.setRetained(retained);
  124. message.setPayload(pushMessage.getBytes());
  125. MqttTopic mTopic = client.getTopic(topic);
  126. if (null == mTopic) {
  127. System.out.println("topic:" + topic + " 不存在");
  128. }
  129. MqttDeliveryToken token;
  130. try {
  131. token = mTopic.publish(message);
  132. token.waitForCompletion();
  133. if (!token.isComplete()) {
  134. System.out.println("消息发送成功");
  135. }
  136. } catch (MqttPersistenceException e) {
  137. e.printStackTrace();
  138. } catch (MqttException e) {
  139. e.printStackTrace();
  140. }
  141. }
  142. }
  143. package com.ruoyi.web.controller.Mqtt;
  144. import org.eclipse.paho.client.mqttv3.*;
  145. import java.util.Arrays;
  146. /**
  147. * mqtt回调处理类
  148. */
  149. public class MqttConsumerCallback implements MqttCallbackExtended {
  150. private MqttClient client;
  151. private MqttConnectOptions options;
  152. private String[] topic;
  153. private int[] qos;
  154. public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
  155. this.client = client;
  156. this.options = options;
  157. this.topic = topic;
  158. this.qos = qos;
  159. }
  160. /**
  161. * 断开重连
  162. */
  163. @Override
  164. public void connectionLost(Throwable cause) {
  165. System.out.println("MQTT连接断开,发起重连......");
  166. try {
  167. if (null != client && !client.isConnected()) {
  168. client.reconnect();
  169. System.out.println("尝试重新连接");
  170. } else {
  171. client.connect(options);
  172. System.out.println("尝试建立新连接");
  173. }
  174. } catch (Exception e) {
  175. e.printStackTrace();
  176. }
  177. }
  178. /**
  179. * 接收到消息调用令牌中调用
  180. */
  181. @Override
  182. public void deliveryComplete(IMqttDeliveryToken token) {
  183. System.out.println("deliveryComplete---------" + Arrays.toString(topic));
  184. }
  185. /**
  186. * 消息处理
  187. */
  188. @Override
  189. public void messageArrived(String topic, MqttMessage message) {
  190. try {
  191. String msg = new String(message.getPayload());
  192. // System.out.println("收到topic:" + topic + " 消息:" + msg);
  193. // System.out.println("收到消息后执行具体的业务逻辑操作,比如将消息存储进数据库");
  194. } catch (Exception e) {
  195. System.out.println("处理mqtt消息异常:" + e);
  196. }
  197. }
  198. /**
  199. * mqtt连接后订阅主题
  200. */
  201. @Override
  202. public void connectComplete(boolean b, String s) {
  203. try {
  204. if (null != topic && null != qos) {
  205. if (client.isConnected()) {
  206. client.subscribe(topic, qos);
  207. System.out.println("mqtt连接成功,客户端ID:" + PropertiesUtil.MQTT_CLIENT_ID);
  208. System.out.println("--订阅主题::" + Arrays.toString(topic));
  209. } else {
  210. System.out.println("mqtt连接失败,客户端ID:" + PropertiesUtil.MQTT_CLIENT_ID);
  211. }
  212. }
  213. } catch (Exception e) {
  214. System.out.println("mqtt订阅主题异常:" + e);
  215. }
  216. }
  217. }
  218. package com.ruoyi.web.controller.Mqtt;
  219. import java.io.IOException;
  220. import java.io.InputStream;
  221. import java.util.Properties;
  222. /**
  223. * 获取配置信息
  224. **/
  225. public class PropertiesUtil {
  226. public static String MQTT_HOST;
  227. public static String MQTT_CLIENT_ID;
  228. public static String MQTT_USER_NAME;
  229. public static String MQTT_PASSWORD;
  230. public static String MQTT_TOPIC;
  231. public static Integer MQTT_TIMEOUT;
  232. public static Integer MQTT_KEEP_ALIVE;
  233. /**
  234. * mqtt配置
  235. */
  236. static {
  237. Properties properties = loadMqttProperties();
  238. MQTT_HOST = properties.getProperty("host");
  239. MQTT_CLIENT_ID = properties.getProperty("clientId");
  240. MQTT_USER_NAME = properties.getProperty("username");
  241. MQTT_PASSWORD = properties.getProperty("password");
  242. MQTT_TOPIC = properties.getProperty("topic");
  243. MQTT_TIMEOUT = Integer.valueOf(properties.getProperty("timeout"));
  244. MQTT_KEEP_ALIVE = Integer.valueOf(properties.getProperty("keepalive"));
  245. }
  246. private static Properties loadMqttProperties() {
  247. InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/application.yml");
  248. Properties properties = new Properties();
  249. try {
  250. properties.load(inputstream);
  251. return properties;
  252. } catch (IOException e) {
  253. throw new RuntimeException(e);
  254. } finally {
  255. try {
  256. if (inputstream != null) {
  257. inputstream.close();
  258. }
  259. } catch (IOException e) {
  260. throw new RuntimeException(e);
  261. }
  262. }
  263. }
  264. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/188655
推荐阅读
相关标签
  

闽ICP备14008679号