当前位置:   article > 正文

mqtt 入门+开发_mqtt开发

mqtt开发

mqtt介绍

简介

MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。

它工作在TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件。

实现方式

实现MQTT协议需要客户端和服务器端通讯完成。

在通讯过程中,MQTT协议中有三种身份:发布者(Publish)代理(Broker)(服务器)订阅者(Subscribe)

其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

订阅(Subscription)

订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

消息发布质量(Qos):
qos=0:“至多一次”,这一级别会发生消息丢失或重复,消息发布依赖于TCP/IP网络
qos=1:“至少一次”,确保消息到达,但消息重复可能会发生
qos=2:“只有一次”,确保消息到达一次

订阅普通主题。例如:订阅 hello 主题。
订阅通配符主题。例如:订阅通配符主题 testtopic/#,并给 testtopic/1 主题发送消息,会接收到该消息。

会话(Session)

每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

主题名(Topic Name)

连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。

主题筛选器(Topic Filter)

一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

  1. 所有的主题名和主题过滤器必须至少包含一个字符
  2. 主题名或主题过滤器以前置或后置斜杠 “/” 区分
  3. 只包含斜杠 “/” 的主题名或主题过滤器是合法的
  4. 主题名和主题过滤器是 UTF-8 编码字符串, 它们不能超过 65535 字节
  5. 主题名和主题过滤器是区分大小写的

参考:MQTT 主题与主题过滤_amwha的博客-CSDN博客

负载(Payload)

消息订阅者所具体接收的内容。

包含CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息:

(1)CONNECT,消息体内容主要是:客户端的ClientID、订阅的Topic、Message以及用户名和密码。
(2)SUBSCRIBE,消息体内容是一系列的要订阅的主题以及QoS。
(3)SUBACK,消息体内容是服务器对于SUBSCRIBE所申请的主题及QoS进行确认和回复。
(4)UNSUBSCRIBE,消息体内容是要订阅的主题。

服务器(Broker) 

也称为 MQTT 消息服务器,它可以是运行了 MQTT 消息服务器软件的一台服务器或一个服务器集群。MQTT Broker 负责接收来自客户端的网络连接,并处理客户端的订阅/取消订阅(Subscribe/Unsubscribe)、消息发布(Publish)请求,同时也会将客户端发布的消息转发给其他订阅者。 

 服务器选型

1、基本需求
1)支持 mqtt3.1 / mqtt3.1.1协议(可选 mqtt5.0)
3.1和3.1.1是最常见的协议版本,几乎目前生产的IoT设备都支持,所以Broker也必须支持。至于5.0版本,目前各大Broker都在努力支持,不过还需要一些时间才会普及。

2)支持QoS0、QoS1(可选QoS2)
各大厂商都至少支持了QoS1,保证消息到达。一般的场景下不会用到QoS2,所以可以选择性地考虑支持QoS2

3)支持遗嘱消息
这是必须支持的功能,通常设备断开都不是主动断开的,而是没有电了才断开,属于异常断开,需要设置遗嘱消息来通知后端服务或者其他设备进行后续处理。

4)支持持久化
一些数据如QoS1消息、持久Session,需要支持持久化,这是MQTT协议规定的。

5)支持多种连接方式
MQTT over TCP:最基础的连接方式
MQTT over Websocket:在Websocket之上做MQTT封装,对APP这种客户端来说很友好
MQTT over TCP/SSL:基础连接方式做通信加密,通常SSL采用TLS
MQTT over Websocket/SSL:Websocket做通信加密,通常SSL采用TLS

6)保留消息(可选)
保留消息的利用场景几乎可以忽略,而带来的查询成本会很高(每次订阅主题都要查一遍有没有保留消息,再加上通配符匹配,时延很高),所以不一定需要支持,具体应用具体分析。

7)支持集群
Broker要支持保持海量MQTT连接,需要做集群。集群的难点在于Session的持久化和集群通信。

8)支持自定义验证方式
验证客户端的合法性有三点:CONNECT阶段验证是否允许连接、PUBLISH阶段验证是否允许发布、SUBSCRIBE阶段验证是否允许订阅。
CONNECT阶段需要验证ClientID、Username、Password、IP四项,不过大部分开源Broker都只支持Username和Password的验证。
PUBLISH、SUBSCRIBE的验证的目的是防止非法客户端订阅别人的主题,向别人的主题发布消息。但每台设备每次订阅、发布都要验证一次频率巨高,所以需要设计Cache和高效查询机制。

2、高级功能:支持共享订阅
共享订阅的具体含义是,多个客户端订阅同一个主题,消息只会被分发给其中的一个客户端。
共享订阅主要针对的是需要客户端负载均衡的场景,比如后端服务多个Worker,需要共享订阅来只让一个Worker得到数据。但仔细地想一想,后端服务一定有大量消息扇入,在Broker端用共享订阅可能会导致内存爆炸,还不如直接发到Kafka,利用Kafka的负载均衡来做。不过现在的Broker都在逐渐支持共享订阅,所以也是一个趋势吧。

参考:MQTT Broker 选型 - 简书

MQTT整合java开发

依赖

org.eclipse.paho.client.mqttv3.jar

下载地址:Index of /repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.client.mqttv3

  1. <dependency>
  2. <groupId>org.eclipse.paho</groupId>
  3. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  4. <version>1.2.2</version>
  5. </dependency>

MQTT客户端 

  1. import org.eclipse.paho.client.mqttv3.*;
  2. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  3. /**
  4. * 创建一个MQTT客户端
  5. *
  6. */
  7. @Slf4j
  8. public class MqttPushClient {
  9. public static String MQTT_HOST = "";
  10. public static String MQTT_CLIENTID = "";
  11. public static String MQTT_USERNAME = "";
  12. public static String MQTT_PASSWORD = "";
  13. public static int MQTT_TIMEOUT = 10;
  14. public static int MQTT_KEEPALIVE = 10;
  15. private MqttClient client;
  16. private static volatile MqttPushClient mqttClient = null;
  17. public static MqttPushClient getInstance() {
  18. if (mqttClient == null) {
  19. synchronized (MqttPushClient.class) {
  20. if (mqttClient == null) {
  21. mqttClient = new MqttPushClient();
  22. }
  23. }
  24. }
  25. return mqttClient;
  26. }
  27. private MqttPushClient() {
  28. log.info("Connect MQTT: " + this);
  29. connect();
  30. }
  31. /**
  32. * 创建连接
  33. */
  34. private void connect() {
  35. try {
  36. client = new MqttClient(MQTT_HOST, MQTT_CLIENTID, new MemoryPersistence());
  37. MqttConnectOptions option = new MqttConnectOptions();
  38. option.setCleanSession(true);//设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
  39. option.setUserName(MQTT_USERNAME);//设置连接的用户名
  40. option.setPassword(MQTT_PASSWORD.toCharArray());//设置连接的密码
  41. option.setConnectionTimeout(MQTT_TIMEOUT);// 设置超时时间
  42. option.setKeepAliveInterval(MQTT_KEEPALIVE);// 设置会话心跳时间
  43. option.setAutomaticReconnect(true);// 自动重连
  44. try {
  45. client.setCallback(new MqttPushCallback());
  46. client.connect(option);
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. }
  50. } catch (Exception e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. /**
  55. * 发布主题,用于通知<br>
  56. * 默认qos为1 非持久化
  57. *
  58. * @param topic
  59. * @param data
  60. */
  61. public void publish(String topic, String data) {
  62. publish(topic, data, 1, false);
  63. }
  64. /**
  65. * 发布
  66. *
  67. * @param topic
  68. * @param data
  69. * @param qos
  70. * @param retained
  71. */
  72. public void publish(String topic, String data, int qos, boolean retained) {
  73. MqttMessage message = new MqttMessage();
  74. message.setQos(qos);//设置qos,决定消息到达次数。
  75. message.setRetained(retained);// 服务器是否保存消息
  76. message.setPayload(data.getBytes());//设置消息内容
  77. MqttTopic mqttTopic = client.getTopic(topic);// 设置消息主题
  78. if (null == mqttTopic) {
  79. log.error("Topic Not Exist");
  80. }
  81. MqttDeliveryToken token;
  82. try {
  83. token = mqttTopic.publish(message);
  84. token.waitForCompletion();
  85. } catch (Exception e) {
  86. e.printStackTrace();
  87. }
  88. }
  89. /**
  90. * 订阅某个主题 qos默认为1
  91. *
  92. * @param topic
  93. */
  94. public void subscribe(String topic) {
  95. subscribe(topic, 1);
  96. }
  97. /**
  98. * 订阅某个主题
  99. *
  100. * @param topic
  101. * @param qos
  102. */
  103. public void subscribe(String topic, int qos) {
  104. try {
  105. client.subscribe(topic, qos);
  106. } catch (Exception e) {
  107. e.printStackTrace();
  108. }
  109. }
  110. }

MQTT 回调类

  1. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  2. import org.eclipse.paho.client.mqttv3.MqttCallback;
  3. import org.eclipse.paho.client.mqttv3.MqttMessage;
  4. /**
  5. * MQTT 推送回调
  6. */
  7. public class MqttPushCallback implements MqttCallback {
  8. private static final Logger log = LoggerFactory.getLogger(MqttPushCallback.class);
  9. //连接断开时的回调
  10. @Override
  11. public void connectionLost(Throwable cause) {
  12. log.info("断开连接,建议重连" + this);
  13. //断开连接,建议重连
  14. }
  15. //消息发送成功时的回调
  16. @Override
  17. public void deliveryComplete(IMqttDeliveryToken token) {
  18. //log.info(token.isComplete() + "");
  19. }
  20. //收到下推消息时的回调
  21. @Override
  22. public void messageArrived(String topic, MqttMessage message) throws Exception {
  23. log.info("Topic: " + topic);
  24. log.info("Message: " + new String(message.getPayload()));
  25. }
  26. }

参考:MQTT Java Client开发 - 星辰大海mark-shi - 博客园

物联网架构成长之路(32)-SpringBoot集成MQTT客户端 - 无脑仔的小明 - 博客园

MQTT整合springboot开发

依赖

  1. <dependency>
  2. <groupId>org.springframework.integration</groupId>
  3. <artifactId>spring-integration-stream</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.integration</groupId>
  7. <artifactId>spring-integration-mqtt</artifactId>
  8. </dependency>

 配置文件

  1. #MQTT配置信息
  2. #MQTT-用户名
  3. spring.mqtt.username=xxxx
  4. #MQTT-密码
  5. spring.mqtt.password=xxxx
  6. #MQTT-服务器连接地址
  7. spring.mqtt.url=tcp://xxx.xxx.xxx:18831
  8. #MQTT-连接服务器默认客户端ID
  9. spring.mqtt.client.id=xxxxxx
  10. #MQTT-默认的消息推送主题,实际可在调用接口时指定
  11. spring.mqtt.default.topic=/public/TEST/#
  12. #连接超时
  13. spring.mqtt.completionTimeout=3000

消息接收

配置类

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.integration.annotation.IntegrationComponentScan;
  7. import org.springframework.integration.annotation.ServiceActivator;
  8. import org.springframework.integration.channel.DirectChannel;
  9. import org.springframework.integration.core.MessageProducer;
  10. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  11. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  12. import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
  13. import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
  14. import org.springframework.messaging.Message;
  15. import org.springframework.messaging.MessageChannel;
  16. import org.springframework.messaging.MessageHandler;
  17. import org.springframework.messaging.MessagingException;
  18. import java.text.SimpleDateFormat;
  19. //接收配置类
  20. @Configuration
  21. @IntegrationComponentScan
  22. @Slf4j
  23. public class MqttReceiveConfig {
  24. @Value("${spring.mqtt.username}")
  25. private String username;
  26. @Value("${spring.mqtt.password}")
  27. private String password;
  28. @Value("${spring.mqtt.url}")
  29. private String hostUrl;
  30. @Value("${spring.mqtt.client.id}")
  31. private String clientId;
  32. @Value("${spring.mqtt.default.topic}")
  33. private String defaultTopic;
  34. @Value("${spring.mqtt.completionTimeout}")
  35. private int completionTimeout; //连接超时
  36. SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz");
  37. @Bean
  38. public MqttConnectOptions getMqttConnectOptions() {
  39. // MQTT的连接设置
  40. MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
  41. // 设置连接的用户名
  42. mqttConnectOptions.setUserName(username);
  43. // 设置连接的密码
  44. mqttConnectOptions.setPassword(password.toCharArray());
  45. // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
  46. // 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,
  47. // 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息
  48. mqttConnectOptions.setCleanSession(false);
  49. // 设置发布端地址
  50. mqttConnectOptions.setServerURIs(new String[]{hostUrl});
  51. // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
  52. mqttConnectOptions.setKeepAliveInterval(20);
  53. return mqttConnectOptions;
  54. }
  55. @Bean
  56. public MqttPahoClientFactory mqttClientFactory() {
  57. // 客户端工厂类,根据配置的选项(用户名、密码、服务器集群地址等)创建一个默认的客户端。
  58. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  59. factory.setConnectionOptions(getMqttConnectOptions());
  60. return factory;
  61. }
  62. //接收通道
  63. @Bean
  64. public MessageChannel mqttInputChannel() {
  65. return new DirectChannel();
  66. }
  67. //配置监听的topic
  68. @Bean
  69. public MessageProducer inbound() {
  70. String[] defaultTopics= defaultTopic.split(",");
  71. MqttPahoMessageDrivenChannelAdapter adapter =
  72. new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound",
  73. mqttClientFactory(), defaultTopics);
  74. adapter.setCompletionTimeout(completionTimeout);
  75. adapter.setConverter(new DefaultPahoMessageConverter());
  76. adapter.setQos(1);
  77. adapter.setOutputChannel(mqttInputChannel());
  78. return adapter;
  79. }
  80. //通过通道获取数据
  81. @Bean
  82. @ServiceActivator(inputChannel = "mqttInputChannel")
  83. public MessageHandler handler() {
  84. return new MessageHandler() {
  85. @Override
  86. public void handleMessage(Message<?> message) throws MessagingException {
  87. String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
  88. System.out.println("Topic: " + topic);
  89. System.out.println("Message: " + topic);
  90. }
  91. };
  92. }
  93. }

消息推送

配置类

  1. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.integration.annotation.IntegrationComponentScan;
  6. import org.springframework.integration.annotation.ServiceActivator;
  7. import org.springframework.integration.channel.DirectChannel;
  8. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  9. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  10. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  11. import org.springframework.messaging.MessageChannel;
  12. import org.springframework.messaging.MessageHandler;
  13. //MQTT消息推送配置类
  14. @Configuration
  15. @IntegrationComponentScan
  16. public class MqttSenderConfig {
  17. @Value("${spring.mqtt.username}")
  18. private String username;
  19. @Value("${spring.mqtt.password}")
  20. private String password;
  21. @Value("${spring.mqtt.url}")
  22. private String hostUrl;
  23. @Value("${spring.mqtt.client.id}")
  24. private String clientId;
  25. @Value("${spring.mqtt.default.topic}")
  26. private String defaultTopic;
  27. @Bean
  28. public MqttConnectOptions getMqttConnectOptions() {
  29. MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
  30. mqttConnectOptions.setUserName(username);
  31. mqttConnectOptions.setPassword(password.toCharArray());
  32. mqttConnectOptions.setServerURIs(new String[]{hostUrl});
  33. mqttConnectOptions.setKeepAliveInterval(2);
  34. return mqttConnectOptions;
  35. }
  36. @Bean
  37. public MqttPahoClientFactory mqttClientFactory() {
  38. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  39. factory.setConnectionOptions(getMqttConnectOptions());
  40. return factory;
  41. }
  42. // 消息推送客户端
  43. @Bean
  44. @ServiceActivator(inputChannel = "mqttOutboundChannel")
  45. public MessageHandler mqttOutbound() {
  46. // MQTT出站通道适配器的抽象类的实现,用于推送消息。
  47. MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
  48. messageHandler.setAsync(true);
  49. messageHandler.setDefaultTopic(defaultTopic);
  50. return messageHandler;
  51. }
  52. // 消息推送通道
  53. @Bean
  54. public MessageChannel mqttOutboundChannel() {
  55. return new DirectChannel();
  56. }
  57. }

 消息推送接口类

  1. import org.springframework.integration.annotation.MessagingGateway;
  2. import org.springframework.integration.mqtt.support.MqttHeaders;
  3. import org.springframework.messaging.handler.annotation.Header;
  4. @Component
  5. @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
  6. public interface MqttSenderService {
  7. /**
  8. * 消息推送
  9. *
  10. * @param data 发送的消息内容
  11. * @param topic 主题
  12. */
  13. void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
  14. /**
  15. * 消息推送
  16. *
  17. * @param data 发送的消息内容
  18. * @param topic 主题
  19. * @param qos 消息服务质量
  20. */
  21. void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
  22. }

推送消息 

  1. @Autowired
  2. private MqttSenderService mqttSenderService;
  3. mqttSenderService.sendToMqtt("121212121210099", "/public/TEST/windows");

异常问题

mqtt客户端频繁中断:Lost connection: 已断开连接; retry

问题原因:

原因1:造成这种情况的原因是ClientId相同。2个服务端使用相同的ClientId连接mqtt服务器。
解决方案:让消费端的ClientId为随机字符串。这样ClientId就不会重复。

原因2:在回调函数内进行业务处理遇到异常并没有捕获

解决方案:在可能出现异常的语句块,进行try-catch捕获

MQTT 客户端工具

Mosquito CLI
MQTTX
MQTT.fx
MQTT Box
mqtt-spy
MQTT Lens

参考:物联网 - MQTT 客户端工具 - 个人文章 - SegmentFault 思否

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/188649
推荐阅读
相关标签
  

闽ICP备14008679号