当前位置:   article > 正文

spring boot 连接emqx并实现发布订阅_emqx中哪个配置可以让订阅,发布可以同时连接

emqx中哪个配置可以让订阅,发布可以同时连接

1、安装依赖

<dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.14</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2、配置连接信息

#MQTT????
#MQTT-???
spring.mqtt.username=xxxxxx
#MQTT-??
spring.mqtt.password=xxxxx
#MQTT-??????????????????????tcp://127.0.0.1:61613?tcp://47.123.33.66:61613
spring.mqtt.url=tcp://127.0.0.1:1883
#MQTT-??????????ID
spring.mqtt.client-id=emq2
#MQTT-?????????????????????
spring.mqtt.topic=xiaomingming
#timeout ??????
spring.mqtt.timeout=20
#keep alive
spring.mqtt.keep-alive=20

spring.mqtt.qos=2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3、新建mqtt配置类

package com.example.springbootemq.config;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@Configuration
public class MqttConfig {
    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client-id}")
    private String clientId;

    @Value("${spring.mqtt.timeout}")
    private Integer timeout;

    @Value("${spring.mqtt.keep-alive}")
    private Integer keepAlive;

    @Value("${spring.mqtt.qos}")
    private Integer qos;

    @Value("${spring.mqtt.topic}")
    private String topic;

    private MqttClient client;

    /**
     * 项目启动时自动连接 MQTT
     */
    @PostConstruct
    public void init() {
        connect();
    }
    //断线手动重连重新订阅
    public void reConnectSubscribe() {
        while (true){
            try {
                log.warn("开始重连重订阅");
                Thread.sleep(2000);

                this.client.connect(connOpts);
                //订阅
                this.client.subscribe("myTest",2);
                break;
            }catch (Exception ex){
                ex.printStackTrace();
            }

        }
    }


    /**
     * 连接 MQTT
     */
    public void connect() {
        try {
            client = new MqttClient(hostUrl, clientId, new MemoryPersistence());
            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(username);
            connOpts.setPassword(password.toCharArray());
            // 保留会话
            connOpts.setCleanSession(true);
            // 设置超时时间,单位秒
            connOpts.setConnectionTimeout(timeout);
            // 设置心跳时间,单位秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            connOpts.setKeepAliveInterval(keepAlive);
            // 设置回调
            client.setCallback(new OnMessageCallback());
            // 建立连接
            client.connect(connOpts);
            //订阅
            client.subscribe(topic,2);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }

    /**
     * 订阅
     *
     * @param topic 主题
     */
    public void subscribe(String topic) {
        try {
            client.subscribe(topic, qos);
        } catch (MqttException me) {
            me.printStackTrace();
        }
    }

    /**
     * 消息发布
     *
     * @param topic 主题
     * @param data  消息
     */
    public void publish(String topic, String data) {
        try {
            MqttMessage message = new MqttMessage(data.getBytes());
            message.setQos(qos); // 消息服务质量等级
            message.setRetained(true); // 保留消息
            client.publish(topic, message);
        } catch (MqttException me) {
            me.printStackTrace();
        }
    }

    /**
     * 断开连接
     */
    public void disconnect() {
        try {
            client.disconnect();
            client.close();
        } catch (MqttException me) {
            me.printStackTrace();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143

4、定义消息回调类OnMessageCallback

package com.example.springbootemq.config;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class OnMessageCallback implements MqttCallback {

	private MqttConfig mqttConfig;

	//构造函数注入对象
	public OnMessageCallback(MqttConfig mqttConfig) {
        this.mqttConfig = mqttConfig;
    }
    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
        this.mqttConfig.reConnectSubscribe();
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题:" + topic);
        System.out.println("接收消息Qos:" + message.getQos());
        System.out.println("接收消息内容:" + new String(message.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

5、控制器测试

package com.example.springbootemq.controller;

import com.example.springbootemq.config.MqttConfig;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/mqtt")

public class MqttController {
    @Resource
    private MqttConfig mqttConfig;

    @GetMapping("/send/{message}")
    public void sendMessage(@PathVariable("message") String message) {
        String subTopic = "testtopic/#";
        String pubTopic = "testtopic/1";
        String topicTest="zmtest";
        String data = "hello MQTT test";
        mqttConfig.publish(topicTest, message);

//        // 订阅
//        mqttConfig.subscribe(subTopic);
//        // 发布消息
//        mqttConfig.publish(pubTopic, data);
        // 断开连接
        // mqttConfig.disconnect();
    }

    @GetMapping("/receive")
    public void receiveMessage() {
        String subTopic = "testtopic/#";
        String pubTopic = "testtopic/1";
        String topicTest="zmtest";
        String data = "hello MQTT test";
//        mqttConfig.publish(topicTest, data);

//        // 订阅
        mqttConfig.subscribe(topicTest);
//        // 发布消息
//        mqttConfig.publish(pubTopic, data);
        // 断开连接
        // mqttConfig.disconnect();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/748081
推荐阅读
相关标签
  

闽ICP备14008679号