赞
踩
- # mqtt:
- # 服务器连接地址,如果有多个,用逗号隔开
- host: tcp://*********:1883
- # 连接服务器默认客户端ID
- clientId: ******
- # 默认的消息推送主题,实际可在调用接口时指定
- topic: testtopic/***/
- # 用户名
- username: ***
- # 密码
- password: ***
- # 连接超时
- timeout: 30
- # 心跳
- keepalive: 30
- package com.ruoyi.web.controller.Mqtt;
-
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import org.springframework.boot.ApplicationArguments;
- import org.springframework.boot.ApplicationRunner;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class MqttConsumer implements ApplicationRunner {
-
- private static MqttClient client;
-
- @Override
- public void run(ApplicationArguments args) {
- System.out.println("初始化并启动mqtt......");
- this.connect();
- }
-
- /**
- * 连接mqtt服务器
- */
- private void connect() {
- try {
- // 1 创建客户端
- getClient();
- // 2 设置配置
- MqttConnectOptions options = getOptions();
- String[] topic = PropertiesUtil.MQTT_TOPIC.split(",");
- // 3 消息发布质量
- int[] qos = getQos(topic.length);
- // 4 最后设置
- create(options, topic, qos);
- } catch (Exception e) {
- System.out.println("mqtt连接异常:" + e);
- }
- }
-
- /**
- * 创建客户端 --- 1 ---
- */
- public void getClient() {
- try {
- if (null == client) {
- client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence());
- }
-
- System.out.println("创建mqtt客户端:" );
- } catch (Exception e) {
- System.out.println("创建mqtt客户端异常:\" + e:" );
- }
- }
-
- /**
- * 生成配置对象,用户名,密码等 --- 2 ---
- */
- public MqttConnectOptions getOptions() {
- MqttConnectOptions options = new MqttConnectOptions();
- options.setUserName(PropertiesUtil.MQTT_USER_NAME);
- options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());
- // 设置超时时间
- options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);
- // 设置会话心跳时间
- options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
- // 是否清除session
- options.setCleanSession(false);
- System.out.println("--生成mqtt配置对象");
- return options;
- }
-
- /**
- * qos --- 3 ---
- */
- public int[] getQos(int length) {
-
- int[] qos = new int[length];
- for (int i = 0; i < length; i++) {
- /**
- * MQTT协议中有三种消息发布服务质量:
- *
- * QOS0: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- * QOS1: “至少一次”,确保消息到达,但消息重复可能会发生。
- * QOS2: “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大
- */
- qos[i] = 1;
- }
- System.out.println("--设置消息发布质量");
- return qos;
- }
-
- /**
- * 装载各种实例和订阅主题 --- 4 ---
- */
- public void create(MqttConnectOptions options, String[] topic, int[] qos) {
- try {
- client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
- System.out.println("--添加回调处理类");
- client.connect(options);
- } catch (Exception e) {
- System.out.println("装载实例或订阅主题异常:" + e);
- }
- }
- /**
- * 订阅某个主题
- *
- * @param topic
- * @param qos
- */
- public static void subscribe(String topic, int qos) {
- try {
- System.out.println("topic:" + topic);
- client.subscribe(topic, qos);
- } catch (MqttException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 发布,非持久化
- *
- * qos根据文档设置为1
- *
- * @param topic
- * @param msg
- */
- public static void publish(String topic, String msg) {
- publish(1, false, topic, msg);
- }
-
- /**
- * 发布
- */
- public static void publish(int qos, boolean retained, String topic, String pushMessage) {
- MqttMessage message = new MqttMessage();
- message.setQos(qos);
- message.setRetained(retained);
- message.setPayload(pushMessage.getBytes());
- MqttTopic mTopic = client.getTopic(topic);
- if (null == mTopic) {
- System.out.println("topic:" + topic + " 不存在");
- }
- MqttDeliveryToken token;
- try {
- token = mTopic.publish(message);
- token.waitForCompletion();
-
- if (!token.isComplete()) {
- System.out.println("消息发送成功");
- }
- } catch (MqttPersistenceException e) {
- e.printStackTrace();
- } catch (MqttException e) {
- e.printStackTrace();
- }
- }
- }
-
-
-
-
-
-
-
- package com.ruoyi.web.controller.Mqtt;
-
- import org.eclipse.paho.client.mqttv3.*;
-
- import java.util.Arrays;
-
- /**
- * mqtt回调处理类
- */
-
- public class MqttConsumerCallback implements MqttCallbackExtended {
-
- private MqttClient client;
- private MqttConnectOptions options;
- private String[] topic;
- private int[] qos;
-
- public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
- this.client = client;
- this.options = options;
- this.topic = topic;
- this.qos = qos;
- }
-
- /**
- * 断开重连
- */
- @Override
- public void connectionLost(Throwable cause) {
- System.out.println("MQTT连接断开,发起重连......");
- try {
- if (null != client && !client.isConnected()) {
- client.reconnect();
- System.out.println("尝试重新连接");
- } else {
- client.connect(options);
- System.out.println("尝试建立新连接");
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 接收到消息调用令牌中调用
- */
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- System.out.println("deliveryComplete---------" + Arrays.toString(topic));
- }
-
- /**
- * 消息处理
- */
- @Override
- public void messageArrived(String topic, MqttMessage message) {
- try {
- String msg = new String(message.getPayload());
- // System.out.println("收到topic:" + topic + " 消息:" + msg);
- // System.out.println("收到消息后执行具体的业务逻辑操作,比如将消息存储进数据库");
- } catch (Exception e) {
- System.out.println("处理mqtt消息异常:" + e);
- }
- }
-
- /**
- * mqtt连接后订阅主题
- */
- @Override
- public void connectComplete(boolean b, String s) {
- try {
- if (null != topic && null != qos) {
- if (client.isConnected()) {
- client.subscribe(topic, qos);
- System.out.println("mqtt连接成功,客户端ID:" + PropertiesUtil.MQTT_CLIENT_ID);
- System.out.println("--订阅主题::" + Arrays.toString(topic));
- } else {
- System.out.println("mqtt连接失败,客户端ID:" + PropertiesUtil.MQTT_CLIENT_ID);
- }
- }
- } catch (Exception e) {
- System.out.println("mqtt订阅主题异常:" + e);
- }
- }
- }
-
-
-
-
- package com.ruoyi.web.controller.Mqtt;
-
- import java.io.IOException;
- import java.io.InputStream;
- import java.util.Properties;
-
- /**
- * 获取配置信息
- **/
- public class PropertiesUtil {
-
- public static String MQTT_HOST;
- public static String MQTT_CLIENT_ID;
- public static String MQTT_USER_NAME;
- public static String MQTT_PASSWORD;
- public static String MQTT_TOPIC;
- public static Integer MQTT_TIMEOUT;
- public static Integer MQTT_KEEP_ALIVE;
-
- /**
- * mqtt配置
- */
- static {
- Properties properties = loadMqttProperties();
- MQTT_HOST = properties.getProperty("host");
- MQTT_CLIENT_ID = properties.getProperty("clientId");
- MQTT_USER_NAME = properties.getProperty("username");
- MQTT_PASSWORD = properties.getProperty("password");
- MQTT_TOPIC = properties.getProperty("topic");
- MQTT_TIMEOUT = Integer.valueOf(properties.getProperty("timeout"));
- MQTT_KEEP_ALIVE = Integer.valueOf(properties.getProperty("keepalive"));
- }
-
- private static Properties loadMqttProperties() {
- InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/application.yml");
- Properties properties = new Properties();
- try {
- properties.load(inputstream);
- return properties;
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- try {
- if (inputstream != null) {
- inputstream.close();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。