赞
踩
两个springboot项目,一个作为发布方,一个作为订阅方,模拟设备向服务器发送mqtt请求上报消息,示例使用的是emqx官网的免费mqtt服务器,实际开发中可以使用搭建的mqtt服务器
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class OnMessageCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
import com.sci.web.system.controller.dev.OnMessageCallback; import io.swagger.annotations.*; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/system/send") public class TGlzzMenuController { @ApiOperation("发送mqtt消息") @GetMapping("/sendMqtt") public void sendMqtt() { String pubTopic = "publish111/topic"; String content = "Hello World"; int qos = 2; // emqx免费服务器 String broker = "tcp://broker.emqx.io:1883"; String clientId = "publishClient11"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient client = new MqttClient(broker, clientId, persistence); // MQTT 连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); // connOpts.setUserName("test111"); // connOpts.setPassword("test111".toCharArray()); // 保留会话 // 设置回调 client.setCallback(new OnMessageCallback() { @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("发送成功"); } }); // 建立连接 client.connect(connOpts); // 消息发布所需参数 MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); client.publish(pubTopic, message); } catch (MqttException me) { // 这里有失败原因 me.printStackTrace(); } } }
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class OnMessageCallback implements MqttCallback { @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }
import org.eclipse.paho.client.mqttv3.*; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class MqttClientConfiguration{ private MqttClient client; // emqx免费服务器 private String broker = "tcp://broker.emqx.io:1883"; private String clientId = "receiveClient11"; // 订阅主题 private String subscribeTopic = "publish111/topic"; private MqttConnectOptions connOpts; @PostConstruct public void connect() { try { client = new MqttClient(broker, clientId); // MQTT 连接选项 connOpts = new MqttConnectOptions(); // connOpts.setUserName("test111"); // connOpts.setPassword("test111".toCharArray()); // 保留会话 connOpts.setCleanSession(true); // 建立连接 client.connect(connOpts); // 设置回调 client.setCallback(new OnMessageCallback() { @Override public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 System.out.println("连接断开,可以做重连"); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 System.out.println("接收消息主题:" + topic); System.out.println("接收消息内容:" + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); // 订阅 client.subscribe(subscribeTopic); } catch (MqttException me) { // 这里有失败原因 me.printStackTrace(); } } }
在订阅方的回调中把收到的数据解析存起来
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。