赞
踩
我们将从客户端向服务端发起 MQTT 连接请求开始,到连接中断直到会话过期为止的消息收发序列称之为会话。会话是服务端和客户端的一个连接,进行消息交互前必须先建立会话。
MQTT v3.1.1会话的生命周期由CONNECT报文里的Clean Session标志位和控制:
MQTT v3.1.1 没有规定持久会话应该在什么时候过期,如果仅从协议层面理解的话,这个持久会话应该永久存在。但在实际场景中这并不现实,因为它会非常占用服务端的资源,所以服务端通常不会遵循协议来实现,而是向用户提供一个全局配置来限制会话过期时间。
而到了 MQTT 5.0,这个问题得到了妥善的解决,Clean Session 字段被拆分成了 Clean Start 字段与 Session Expiry Interval 字段。Clean Start 字段指定是否需要全新的会话,Session Expiry Interval 字段指定会话过期时间,它们在连接时指定,但 Session Expiry Interval 字段可以在客户端断开连接时被更新。因此我们可以很轻易地实现客户端网络连接异常断开时会话被保留,客户端正常下线时会话则随着连接关闭而终结的功能。
客户端如何知道这是被恢复的会话?
显而易见的是,当客户端以期望从先前建立的会话恢复状态的方式发起连接,它需要知道服务端是否存在相应的会话,才能决定在连接建立后是否需要重复一遍订阅操作。关于这一点,MQTT 协议从 v3.1.1 开始,就为 CONNACK 报文设计了 Session Present 字段,用于表示当前连接使用的是否是一个全新会话,客户端可以根据这个字段的值进行判断。
Client ID是用来识别不同会话的唯一标识。认证时使用的用户名(User Name)和密码(Password)则是完全独立的。两个会话可以使用相同的用户名,也可以使用不同的用户名。
注:如果两个会话使用相同的Client ID,那么后一个会话CONNECT时会导致前一个会话的网络连接断开。
Client ID可以设为空,表明本次会话是匿名的临时访问,网络断开时会话立即结束。匿名访问只允许Clean Session = 1。
不稳定的网络及有限的硬件资源是物联网应用需要面对的两大难题,MQTT 客户端与服务器的连接可能随时会因为网络波动及资源限制而异常断开。为了解决网络连接断开对通信造成的影响,MQTT 协议提供了持久会话功能。
MQTT 客户端在发起到服务器的连接时,可以设置是否创建一个持久会话。持久会话会保存一些重要的数据,以使会话能在多个网络连接中继续。持久会话主要有以下三个作用:
避免因网络中断导致需要反复订阅带来的额外开销。
避免错过离线期间的消息。
确保 QoS 1 和 QoS 2 的消息质量保证不被网络中断影响。
在协议规范中,QoS 1 和 QoS 2 消息首先会在客户端与 Broker 存储起来,在最终确认抵达订阅端后才会被删除,此过程需要 Broker 将状态与客户端相关联,这称为会话状态。除了消息存储外,订阅信息(客户端订阅的主题列表)也是会话状态的一部分。
客户端中的会话状态包括:
服务器中的会话状态包括:
发布者发布消息时,如果 Retained 标记被设置为 true,则该消息即是 MQTT 中的保留消息(Retained Message)。MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。
如下图,当客户端订阅主题时,如果服务端存在该主题匹配的保留消息,则该保留消息将被立即发送给该客户端。
注意:保留消息虽然存储在服务端中,但它并不属于会话的一部分。也就是说,即便发布这个保留消息的会话终结,保留消息也不会被删除。不管“clean_session”标志如何设置,保留消息都被保留。
在发布保留消息时,MQTT设备需要将PUBLISH报文中retainFlag设置为true
每个主题下只能存在一份保留消息,因此如果已经存在相同主题的保留消息,则该保留消息被替换。
’retain’标志设置为0的消息不会替换相同主题的保留消息,只有保留的消息替换保留的消息
服务器只会为每个主题保存最新一条保留消息,保留消息的保存时间与服务器的设置有关。若服务器设置保留消息存储在内存,则 MQTT 服务器重启后消息即会丢失;若存储在磁盘,则服务器重启后保留消息仍然存在。
保留消息虽然存储在服务端中,但它并不属于会话的一部分。也就是说,即便发布这个保留消息的会话已结束,保留消息也不会被删除。删除保留消息有以下几种方式:
当客户端订阅了有保留消息的主题后,即会收到该主题的保留消息,可通过消息中的保留标志位判断是否是保留消息。需要注意的是,在保留消息发布前订阅主题,将不会收到保留消息。需要待保留消息发布后,重新订阅该主题,才会收到保留消息。
如下图,我们先订阅主题 sensor/t2
,然后向该主题发布一条保留消息,该订阅会立即收到一条消息,但是该消息并不是保留消息。当我们删除该订阅,再次重新订阅 sensor/t2
主题时,立即收到了刚刚发布的保留消息。
ref:
解释:publish不成功,一直发布,直到断开连接
MQTTAsync_createWithOptions
而不是MQTTAsync_create
来创建客户端对象。将MQTTAsync_createOptions::sendWhileDisconnected
设置为非零,并设置MQTTAsync_createOptions::maxBufferedMessages
(默认值100)。MQTTAsync_getPendingTokens
来返回等待发送的消息的id,或者返回发送过程尚未完成的消息的id。Publish While Disconnected
This feature was not originally available because with persistence enabled, messages could be stored locally without ever knowing if they could be sent. The client application could have created the client with an erroneous broker address or port for instance.
To enable messages to be published when the application is disconnected MQTTAsync_createWithOptions must be used instead of MQTTAsync_create to create the client object. The ::createOptions field sendWhileDisconnected must be set to non-zero, and the maxBufferedMessages field set as required - the default being 100.
MQTTAsync_getPendingTokens can be called to return the ids of the messages waiting to be sent, or for which the sending process has not completed.
- /** Options for the ::MQTTAsync_createWithOptions call */
- typedef struct
- {
- /** The eyecatcher for this structure. must be MQCO. */
- char struct_id[4];
- /** The version number of this structure. Must be 0, 1, 2 or 3
- * 0 means no MQTTVersion
- * 1 means no allowDisconnectedSendAtAnyTime, deleteOldestMessages, restoreMessages
- * 2 means no persistQoS0
- */
- int struct_version;
- /** Whether to allow messages to be sent when the client library is not connected. */
- int sendWhileDisconnected;
- /** The maximum number of messages allowed to be buffered. This is intended to be used to
- * limit the number of messages queued while the client is not connected. It also applies
- * when the client is connected, however, so has to be greater than 0. */
- int maxBufferedMessages;
- /** Whether the MQTT version is 3.1, 3.1.1, or 5. To use V5, this must be set.
- * MQTT V5 has to be chosen here, because during the create call the message persistence
- * is initialized, and we want to know whether the format of any persisted messages
- * is appropriate for the MQTT version we are going to connect with. Selecting 3.1 or
- * 3.1.1 and attempting to read 5.0 persisted messages will result in an error on create. */
- int MQTTVersion;
- /**
- * Allow sending of messages while disconnected before a first successful connect.
- */
- int allowDisconnectedSendAtAnyTime;
- /*
- * When the maximum number of buffered messages is reached, delete the oldest rather than the newest.
- */
- int deleteOldestMessages;
- /*
- * Restore messages from persistence on create - or clear it.
- */
- int restoreMessages;
- /*
- * Persist QoS0 publish commands - an option to not persist them.
- */
- int persistQoS0;
- } MQTTAsync_createOptions;
-
- #define MQTTAsync_createOptions_initializer { {'M', 'Q', 'C', 'O'}, 2, 0, 100, MQTTVERSION_DEFAULT, 0, 0, 1, 1}
-
- #define MQTTAsync_createOptions_initializer5 { {'M', 'Q', 'C', 'O'}, 2, 0, 100, MQTTVERSION_5, 0, 0, 1, 1}
-
-
- LIBMQTT_API int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
- int persistence_type, void* persistence_context, MQTTAsync_createOptions* options);
ref:
Paho Asynchronous MQTT C Client Library: Publish While Disconnected
paho.mqtt.c/MQTTAsync.h at 0995176412616ae8f1cba51c8d609fe4b69687ea · eclipse/paho.mqtt.c · GitHub
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。