当前位置:   article > 正文

【实际应用-第七篇 物联网 mqtt模拟实现单向通信 】

【实际应用-第七篇 物联网 mqtt模拟实现单向通信 】

概要

两个springboot项目,一个作为发布方,一个作为订阅方,模拟设备向服务器发送mqtt请求上报消息,示例使用的是emqx官网的免费mqtt服务器,实际开发中可以使用搭建的mqtt服务器

发布方(模拟设备)

引入pom

 <dependency>
    <groupId>org.eclipse.paho</groupId>
     <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
     <version>1.2.5</version>
 </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

回调MqttCallback

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class OnMessageCallback implements MqttCallback {
    @Override
    public void connectionLost(Throwable cause) {
    }
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

发布的方法


import com.sci.web.system.controller.dev.OnMessageCallback;
import io.swagger.annotations.*;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/system/send")
public class TGlzzMenuController {


    @ApiOperation("发送mqtt消息")
    @GetMapping("/sendMqtt")
    public void sendMqtt() {
        String pubTopic = "publish111/topic";
        String content = "Hello World";
        int qos = 2;
//      emqx免费服务器
        String broker = "tcp://broker.emqx.io:1883";
        String clientId = "publishClient11";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
//            connOpts.setUserName("test111");
//            connOpts.setPassword("test111".toCharArray());
            // 保留会话

            // 设置回调
            client.setCallback(new OnMessageCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    System.out.println("发送成功");
                }
            });

            // 建立连接
            client.connect(connOpts);
            // 消息发布所需参数
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(pubTopic, message);

        } catch (MqttException me) {
//          这里有失败原因
            me.printStackTrace();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

订阅方(模拟服务器)

引入pom

  <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

回调MqttCallback


import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class OnMessageCallback implements MqttCallback {
    @Override
    public void connectionLost(Throwable cause) {
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

订阅的方法


import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Component
public class MqttClientConfiguration{
    
    private MqttClient client;
//  emqx免费服务器
    private String broker = "tcp://broker.emqx.io:1883";
    private String clientId = "receiveClient11";
    //  订阅主题
    private String subscribeTopic = "publish111/topic";
    private MqttConnectOptions connOpts;

    @PostConstruct
    public void connect() {
        try {
            client = new MqttClient(broker, clientId);

//          MQTT 连接选项
            connOpts = new MqttConnectOptions();
//          connOpts.setUserName("test111");
//          connOpts.setPassword("test111".toCharArray());
//          保留会话
            connOpts.setCleanSession(true);

//          建立连接
            client.connect(connOpts);
//          设置回调
            client.setCallback(new OnMessageCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    // 连接丢失后,一般在这里面进行重连
                    System.out.println("连接断开,可以做重连");
                }
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    // subscribe后得到的消息会执行到这里面
                    System.out.println("接收消息主题:" + topic);
                    System.out.println("接收消息内容:" + new String(message.getPayload()));
                }
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                }
            });
//          订阅
            client.subscribe(subscribeTopic);
        } catch (MqttException me) {
//          这里有失败原因
            me.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

测试结果

  • 发布方
    在这里插入图片描述
  • 订阅方
    在这里插入图片描述

实际应用

在订阅方的回调中把收到的数据解析存起来
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/842166
推荐阅读
相关标签
  

闽ICP备14008679号