当前位置:   article > 正文

MQTT第一话 -- Docker安装emqx以及Springboot集成emqx_emqx集成

emqx集成

本文主要记录mqtt消息件emqx的安装及使用

1.docker安装emqx

基于liunx centos7,docker-compose,emqx:4.4.4。

1.1 yaml文件

version: '3.7'
services:
  emqx01:
    image: emqx:4.4.4
    container_name: emqx01
    ports:
    - "1893:1883" #tcp连接
    - "18083:18083" #控制台
    - "8083:8083" #控制台工具websocket ws用
    - "8084:8084" #控制台工具websocket wss用
    environment:
    - TZ=Asia/Shanghai
    networks:
    - my-net
networks:
  #新增的网络 内部服务名调用
  my-net:
    external: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

1.2 启动日志

[root@m emqx]# docker-compose ps -a
 Name               Command               State                                                                                 Ports                                                                              
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
emqx01   /usr/bin/docker-entrypoint ...   Up      11883/tcp, 0.0.0.0:18093->18083/tcp, 0.0.0.0:1893->1883/tcp, 4369/tcp, 4370/tcp, 5369/tcp, 6369/tcp, 6370/tcp, 8081/tcp, 0.0.0.0:8093->8083/tcp,                 
                                                  0.0.0.0:8094->8084/tcp, 8883/tcp      

[root@m emqx]# docker-compose logs -f
Attaching to emqx01
emqx01    | Starting emqx on node e15d8dee531f@172.19.0.2
emqx01    | Start mqtt:tcp:internal listener on 127.0.0.1:11883 successfully.
emqx01    | Start mqtt:tcp:external listener on 0.0.0.0:1883 successfully.
emqx01    | Start mqtt:ws:external listener on 0.0.0.0:8083 successfully.
emqx01    | Start mqtt:ssl:external listener on 0.0.0.0:8883 successfully.
emqx01    | Start mqtt:wss:external listener on 0.0.0.0:8084 successfully.
emqx01    | Start http:management listener on 8081 successfully.
emqx01    | 2022-06-27T18:40:03.425652+08:00 [warning] [Dashboard] Using default password for dashboard 'admin' user. Please use './bin/emqx_ctl admins' command to change it. NOTE: the default password in config file is only used to initialise the database record, changing the config file after database is initialised has no effect.
emqx01    | Start http:dashboard listener on 18083 successfully.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

1.3 控制台界面

从日志可以看出,dashboard listener on 18083,admin/public
在这里插入图片描述

2.springboot集成emqx

基于springboot2.5.6

2.1 mqtt依赖如下

<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-mqtt</artifactId>
	<version>5.5.13</version>
</dependency>
<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-stream</artifactId>
	<version>5.5.13</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2.2 yaml配置文件

mqtt:
  test:
    url: tcp://192.168.0.221:1883
    clientId: testmqtt #连接的一个标识
    topic: mqtt/test
    userName: admin
    passWord: public
    timeout: 5000
    keepAlive: 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2.3 启动注入

@Slf4j
@ConfigurationProperties("mqtt.test")
@Component
@Data
public class MqttConfig {

    String url;
    String clientId;
    String topic;
    String userName;
    String passWord;
    Integer timeout;
    Integer keepAlive;


    @Bean
    public MqttClient initClient() {
        try {
            MqttClient client = new MqttClient(url, clientId);
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(userName);
            options.setPassword(passWord.toCharArray());
            options.setCleanSession(true);
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepAlive);
            client.setCallback(new MqttConsume());
            IMqttToken iMqttToken = client.connectWithResult(options);
            boolean complete = iMqttToken.isComplete();
            log.info("mqtt建立连接:{}", complete);

			//我这里就直接订阅了 消息消费者MqttConsume
            client.subscribe(topic, 0);
            log.info("已订阅topic:{}", topic);

            return client;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("mqtt 连接异常");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

2.4 消费者

@Slf4j
public class MqttConsume implements MqttCallback {


	//连接丢失
    @Override
    public void connectionLost(Throwable throwable) {

    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("收到消息:topic:{},Qos:{},msg:{}",
                topic, mqttMessage.getQos(), new String(mqttMessage.getPayload()));
    }

	//传输完成
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2.5 生产者

@Autowired
MqttClient client;
@Autowired
MqttConfig mqttConfig;


@GetMapping("/send")
public String send() {
    try {
        MqttMessage message = new MqttMessage();
        message.setQos(0);
        message.setRetained(false);
        for (int i = 0; i < 10; i++) {
            message.setPayload(("1241234&" + i).getBytes());
            client.publish(mqttConfig.getTopic(), message);
            log.info("发送成功:{}", i);
            Thread.sleep(1000);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return "SUCCESS";
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

2.6 发送日志

com.example.mqtt.config.MqttConfig       : mqtt连接:true
com.example.mqtt.config.MqttConfig       : 已订阅topic:mqtt/test

c.e.mqtt.controller.MqttController       : 发送成功:0
com.example.mqtt.consume.MqttConsume     : 收到消息:topic:mqtt/test,Qos:0,msg:1241234&0
c.e.mqtt.controller.MqttController       : 发送成功:1
com.example.mqtt.consume.MqttConsume     : 收到消息:topic:mqtt/test,Qos:0,msg:1241234&1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3. Qos分析

QoS0,发送就不管了,最多一次;
QoS1,发送之后依赖MQTT规范,是否启动重传消息,所以至少一次;
QoS2,发送之后依赖MQTT消息机制,确保只有一次。

以上就是本章的全部内容了。

上一篇:RocketMQ第三话 – RocketMQ高可用集群搭建
下一篇:MQTT第二话 – emqx高可用集群实现

人生天地之间,若白驹过隙,忽然而已

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/188693
推荐阅读
相关标签
  

闽ICP备14008679号