赞
踩
1、安装依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.14</version>
</dependency>
2、配置连接信息
#MQTT???? #MQTT-??? spring.mqtt.username=xxxxxx #MQTT-?? spring.mqtt.password=xxxxx #MQTT-??????????????????????tcp://127.0.0.1:61613?tcp://47.123.33.66:61613 spring.mqtt.url=tcp://127.0.0.1:1883 #MQTT-??????????ID spring.mqtt.client-id=emq2 #MQTT-????????????????????? spring.mqtt.topic=xiaomingming #timeout ?????? spring.mqtt.timeout=20 #keep alive spring.mqtt.keep-alive=20 spring.mqtt.qos=2
3、新建mqtt配置类
package com.example.springbootemq.config; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component @Configuration public class MqttConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client-id}") private String clientId; @Value("${spring.mqtt.timeout}") private Integer timeout; @Value("${spring.mqtt.keep-alive}") private Integer keepAlive; @Value("${spring.mqtt.qos}") private Integer qos; @Value("${spring.mqtt.topic}") private String topic; private MqttClient client; /** * 项目启动时自动连接 MQTT */ @PostConstruct public void init() { connect(); } //断线手动重连重新订阅 public void reConnectSubscribe() { while (true){ try { log.warn("开始重连重订阅"); Thread.sleep(2000); this.client.connect(connOpts); //订阅 this.client.subscribe("myTest",2); break; }catch (Exception ex){ ex.printStackTrace(); } } } /** * 连接 MQTT */ public void connect() { try { client = new MqttClient(hostUrl, clientId, new MemoryPersistence()); // MQTT 连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(username); connOpts.setPassword(password.toCharArray()); // 保留会话 connOpts.setCleanSession(true); // 设置超时时间,单位秒 connOpts.setConnectionTimeout(timeout); // 设置心跳时间,单位秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 connOpts.setKeepAliveInterval(keepAlive); // 设置回调 client.setCallback(new OnMessageCallback()); // 建立连接 client.connect(connOpts); //订阅 client.subscribe(topic,2); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } /** * 订阅 * * @param topic 主题 */ public void subscribe(String topic) { try { client.subscribe(topic, qos); } catch (MqttException me) { me.printStackTrace(); } } /** * 消息发布 * * @param topic 主题 * @param data 消息 */ public void publish(String topic, String data) { try { MqttMessage message = new MqttMessage(data.getBytes()); message.setQos(qos); // 消息服务质量等级 message.setRetained(true); // 保留消息 client.publish(topic, message); } catch (MqttException me) { me.printStackTrace(); } } /** * 断开连接 */ public void disconnect() { try { client.disconnect(); client.close(); } catch (MqttException me) { me.printStackTrace(); } } }
4、定义消息回调类OnMessageCallback
package com.example.springbootemq.config; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class OnMessageCallback implements MqttCallback { private MqttConfig mqttConfig; //构造函数注入对象 public OnMessageCallback(MqttConfig mqttConfig) { this.mqttConfig = mqttConfig; } @Override public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 System.out.println("连接断开,可以做重连"); this.mqttConfig.reConnectSubscribe(); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 System.out.println("接收消息主题:" + topic); System.out.println("接收消息Qos:" + message.getQos()); System.out.println("接收消息内容:" + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }
5、控制器测试
package com.example.springbootemq.controller; import com.example.springbootemq.config.MqttConfig; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController @RequestMapping("/mqtt") public class MqttController { @Resource private MqttConfig mqttConfig; @GetMapping("/send/{message}") public void sendMessage(@PathVariable("message") String message) { String subTopic = "testtopic/#"; String pubTopic = "testtopic/1"; String topicTest="zmtest"; String data = "hello MQTT test"; mqttConfig.publish(topicTest, message); // // 订阅 // mqttConfig.subscribe(subTopic); // // 发布消息 // mqttConfig.publish(pubTopic, data); // 断开连接 // mqttConfig.disconnect(); } @GetMapping("/receive") public void receiveMessage() { String subTopic = "testtopic/#"; String pubTopic = "testtopic/1"; String topicTest="zmtest"; String data = "hello MQTT test"; // mqttConfig.publish(topicTest, data); // // 订阅 mqttConfig.subscribe(topicTest); // // 发布消息 // mqttConfig.publish(pubTopic, data); // 断开连接 // mqttConfig.disconnect(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。