赞
踩
本文主要记录mqtt消息件emqx的安装及使用
基于liunx centos7,docker-compose,emqx:4.4.4。
version: '3.7' services: emqx01: image: emqx:4.4.4 container_name: emqx01 ports: - "1893:1883" #tcp连接 - "18083:18083" #控制台 - "8083:8083" #控制台工具websocket ws用 - "8084:8084" #控制台工具websocket wss用 environment: - TZ=Asia/Shanghai networks: - my-net networks: #新增的网络 内部服务名调用 my-net: external: true
[root@m emqx]# docker-compose ps -a Name Command State Ports ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- emqx01 /usr/bin/docker-entrypoint ... Up 11883/tcp, 0.0.0.0:18093->18083/tcp, 0.0.0.0:1893->1883/tcp, 4369/tcp, 4370/tcp, 5369/tcp, 6369/tcp, 6370/tcp, 8081/tcp, 0.0.0.0:8093->8083/tcp, 0.0.0.0:8094->8084/tcp, 8883/tcp [root@m emqx]# docker-compose logs -f Attaching to emqx01 emqx01 | Starting emqx on node e15d8dee531f@172.19.0.2 emqx01 | Start mqtt:tcp:internal listener on 127.0.0.1:11883 successfully. emqx01 | Start mqtt:tcp:external listener on 0.0.0.0:1883 successfully. emqx01 | Start mqtt:ws:external listener on 0.0.0.0:8083 successfully. emqx01 | Start mqtt:ssl:external listener on 0.0.0.0:8883 successfully. emqx01 | Start mqtt:wss:external listener on 0.0.0.0:8084 successfully. emqx01 | Start http:management listener on 8081 successfully. emqx01 | 2022-06-27T18:40:03.425652+08:00 [warning] [Dashboard] Using default password for dashboard 'admin' user. Please use './bin/emqx_ctl admins' command to change it. NOTE: the default password in config file is only used to initialise the database record, changing the config file after database is initialised has no effect. emqx01 | Start http:dashboard listener on 18083 successfully.
从日志可以看出,dashboard listener on 18083,admin/public
基于springboot2.5.6
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.13</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>5.5.13</version>
</dependency>
mqtt:
test:
url: tcp://192.168.0.221:1883
clientId: testmqtt #连接的一个标识
topic: mqtt/test
userName: admin
passWord: public
timeout: 5000
keepAlive: 10
@Slf4j @ConfigurationProperties("mqtt.test") @Component @Data public class MqttConfig { String url; String clientId; String topic; String userName; String passWord; Integer timeout; Integer keepAlive; @Bean public MqttClient initClient() { try { MqttClient client = new MqttClient(url, clientId); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(userName); options.setPassword(passWord.toCharArray()); options.setCleanSession(true); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepAlive); client.setCallback(new MqttConsume()); IMqttToken iMqttToken = client.connectWithResult(options); boolean complete = iMqttToken.isComplete(); log.info("mqtt建立连接:{}", complete); //我这里就直接订阅了 消息消费者MqttConsume client.subscribe(topic, 0); log.info("已订阅topic:{}", topic); return client; } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("mqtt 连接异常"); } } }
@Slf4j public class MqttConsume implements MqttCallback { //连接丢失 @Override public void connectionLost(Throwable throwable) { } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("收到消息:topic:{},Qos:{},msg:{}", topic, mqttMessage.getQos(), new String(mqttMessage.getPayload())); } //传输完成 @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }
@Autowired MqttClient client; @Autowired MqttConfig mqttConfig; @GetMapping("/send") public String send() { try { MqttMessage message = new MqttMessage(); message.setQos(0); message.setRetained(false); for (int i = 0; i < 10; i++) { message.setPayload(("1241234&" + i).getBytes()); client.publish(mqttConfig.getTopic(), message); log.info("发送成功:{}", i); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } return "SUCCESS"; }
com.example.mqtt.config.MqttConfig : mqtt连接:true
com.example.mqtt.config.MqttConfig : 已订阅topic:mqtt/test
c.e.mqtt.controller.MqttController : 发送成功:0
com.example.mqtt.consume.MqttConsume : 收到消息:topic:mqtt/test,Qos:0,msg:1241234&0
c.e.mqtt.controller.MqttController : 发送成功:1
com.example.mqtt.consume.MqttConsume : 收到消息:topic:mqtt/test,Qos:0,msg:1241234&1
QoS0,发送就不管了,最多一次;
QoS1,发送之后依赖MQTT规范,是否启动重传消息,所以至少一次;
QoS2,发送之后依赖MQTT消息机制,确保只有一次。
以上就是本章的全部内容了。
上一篇:RocketMQ第三话 – RocketMQ高可用集群搭建
下一篇:MQTT第二话 – emqx高可用集群实现
人生天地之间,若白驹过隙,忽然而已
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。