当前位置:   article > 正文

MQTT.JS学习笔记(不包含5.0版本)_mqtt.js streambuilder

mqtt.js streambuilder

MQTT.JS学习笔记(不包含5.0版本)

MQTT中的QoS等级

MQTT设计了一套保证消息稳定传输的机制,包括消息应答、存储和重传。在这套机制下,提供了三种不同层次QoS(Quality of Service):

  • QoS0,At most once,至多一次;
  • QoS1,At least once,至少一次;
  • QoS2,Exactly once,确保只有一次。

QoS 是消息的发送方(Sender)和接受方(Receiver)之间达成的一个协议:

  • QoS0 代表,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了;

  • QoS1 代表,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,
    直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息;

  • QoS2 代表,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,
    如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。

注意:
QoS是Sender和Receiver之间的协议,而不是Publisher和Subscriber之间的协议。换句话说,Publisher发布了一条QoS1的消息,只能保证Broker能至少收到一次这个消息;
而对于Subscriber能否至少收到一次这个消息,还要取决于Subscriber在Subscibe的时候和Broker协商的QoS等级。

API:

  • mqtt.connect() 连接到mqtt服务器
  • mqtt.Client() 创建一个客户端
  • mqtt.Client#publish() 发送消息
  • mqtt.Client#subscribe()订阅一个或者多个主题
  • mqtt.Client#unsubscribe() 取消订阅
  • mqtt.Client#end() 关闭客户端
  • mqtt.Client#removeOutgoingMessage() 清空发送缓存
  • mqtt.Client#reconnect() 重新连接
  • mqtt.Client#handleMessage() 处理消息
  • mqtt.Client#connected 已经连接
  • mqtt.Client#reconnecting 正在重连
  • mqtt.Client#getLastMessageId() 获取上一个已发送的信息的id
  • mqtt.Store() 创建存储对象
  • mqtt.Store#put() 存储
  • mqtt.Store#del() 删除
  • mqtt.Store#createStream() 创建包含内存中所有数据包的流
  • mqtt.Store#close() 关闭存储

1、mqtt.connect([url], options)

连接到给定url和选项指定的代理并返回客户端对象mqtt.Client。
URL可以位于以下协议上:“mqtt”、“mqtts”、“tcp”、“tls”、“ws”、“wss”。URL也可以是由返回的对象URL.parse文件(),在这种情况下,两个对象被合并,即,可以通过URL和connect选项传递单个对象。
还可以使用以下内容指定服务器选项:[{host:‘localhost’,port:1883}。。。],在这种情况下,数组在每个连接处迭代。
有关所有MQTT相关选项,请参阅客户端对象的构造函数中options的选项。

2、mqtt.Client(streamBuilder, options)

Client类通过任意传输方法(TCP、TLS、WebSocket、ecc)将客户机连接到MQTT代理。
客户端自动处理以下内容:

  • 常规服务器ping
  • QoS流
  • 自动重新连接
  • 连接前开始发布

构造MQTT客户端(client)的options中可选参数:

  • wsOptions:是WebSocket连接选项。默认值为{}。这是专门针对WebSocket的。有关可能的选项,请参阅:https://github.com/websockets/ws/blob/master/doc/ws.md。
  • keepalive: 60秒,设置为0禁用(心跳包发送周期)
  • reschedulePings :发送数据包后重新安排ping消息(默认为true)(个人理解是发送消息后,重新开始心跳计时)
  • clientId:客户端ID ,默认为’mqttjs_’ + Math.random().toString(16).substr(2, 8)
  • protocolId: 协议ID,默认为’MQTT’
  • protocolVersion:协议版本,默认为 4
  • clean:默认为true,设置为false可在离线时接收QoS 1和2消息
  • reconnectPeriod:重连周期,默认为1000毫秒,两次重新连接之间的间隔。通过设置为0禁用自动重新连接。
  • connectTimeout:连接超时时长,30*1000毫秒,接收CONNACK之前的等待时间(CONNACK是连接成功后服务器返回的一个数据包)
  • username:用户名,可选
  • password:密码,可选
  • incomingStore:接收缓存
  • outgoingStore:发送缓存
  • queueQoSZero:如果连接断开,对传出的QoS为零的消息进行排队(默认为true)
  • authPacket:身份验证数据包对象的设置
  • will:当客户端严重断开连接时,由代理自动发送的消息。格式为
    •   topic:发布消息的主题
      
      • 1
    •   payload:要发布的消息
      
      • 1
    •   qos:消息等级
      
      • 1
    •   retain:保留消息标识
      
      • 1
  • transformWsUrl: 可选(url,options,client)=>仅适用于ws/wss协议的url函数。可用于实现签名URL,这些URL在重新连接时可能已过期。
  • resubscribe:如果连接断开并重新连接,订阅的主题将自动重新订阅(默认为true)

客户端状态回调函数

connect: function (connack) {} 连接成功回调
当client在连接时设置 Clean Session=1,即清除MQTT服务器中保存的会话,则 CONNACK 中 Session Present Flag 始终为 0,即这个会话是新的会话。
当client在连接时设置 Clean Session=0,则有两种情况,若服务器端存在这个clientid之前保留的额回话,则CONNACK 中的 Session Present Flag 为 1,否则为0.

reconnect: function () {} 重连回调
close: function () {} 断开连接回调
offline : function () {} 客户端离线回调
error: function(error){} 当客户端无法连接时发出(即connack rc!=0)或发生解析错误时。
以下TLS错误将作为错误事件发出:

ECONNREFUSED 连接被拒绝
ECONNRESET 连接重置
EADDRINUSE 地址正在使用
ENOTFOUND 找不到

end: function(){} 这个回调会在client.end()方法执行成功之后
message : function (topic, message, packet) {} 会在接收到订阅消息时进行回调
packetsend : function (packet) {} 当客户端发送数据包时进行回调
packetreceive: function (packet) {} 当客户端收到数据包是进行回调

3、mqtt.Client#publish(topic, message, [options], [callback])

向一个主题发送信息,参数列表:

  • topic: 发送主题, String
  • message 要发送的消息, Buffer or String
  • options 发送选项(可选), including:
    • qos QoS 等级, Number, default 0
    • retain 保留消息标志, Boolean, default false
    • dup 重复标志, Boolean, default false
  • callback - function (err) 在QoS处理完成时触发,如果QoS为0,则在下一次勾选时触发。如果客户端正在断开连接,则会发生错误。(可选)

4、mqtt.Client#subscribe(topic/topic array/topic object, [options], [callback])

订阅一个或者多个主题

  • topic是要订阅的字符串主题或要订阅的主题数组。它也可以是一个对象,它有主题名作为对象键,QoS作为值,
    比如{test1’:{QoS:0},‘test2’:{QoS:1}。支持MQTT主题通配符(±表示单层,#-表示多层)

  • options:订阅选项(可选),可选项:
    qos: QoS 等级,订阅等级,默认为0

  • callback :function (err, granted)

    • err:订阅错误,或者客户端断开连接的时候发生的错误
    • granted:结构为{topic, qos}的一个数组

5、mqtt.Client#unsubscribe(topic/topic array, [options], [callback])

取消订阅

  • topic :同上
  • options:取消订阅选项(可选):
    • properties:object
      • userProperties:允许用户属性多次出现以表示多个键值对
  • callback:function (err),当取消订阅的同时客户端正在断开连接则会发生错误

6 mqtt.Client#end([force], [options], [callback])

关闭客户端

  • force:是否立即关闭,设置为false代表客户端等待正在传递的消息被确认,确认后再进行断开操作,如果设置为true,则不进行等待,立即关闭
  • options:关闭客户端选项,可选参数:
    • reasonCode:断开连接原因。number类型
    • properties:
      • sessionExpiryInterval:以秒为单位表示会话到期间隔
      • reasonString:断开连接原因,string类型
      • userProperties:允许用户属性多次出现以表示多个键值对
      • serverReference :客户端可以用来标识要使用的另一个服务器的字符串
  • callback:客户端关闭后进行回调

7、mqtt.Client#removeOutgoingMessage(mId)

从outgoing缓存中删除邮件。如果消息被删除,则调用传出回调时将出现错误(“Message removed”)。
调用此函数后,messageId将被释放并可重用。

  • mId: outgoing缓存中消息的id

8、mqtt.Client#reconnect()

使用相同的连接选项重新连接

9、mqtt.Client#handleMessage(packet, callback)

使用背压支持处理消息,一次一个。随意重写,但始终调用回调,否则客户端将挂起。

10、mqtt.Client#connected

如果客户端已连接,则设置为true。否则为假。

11、mqtt.Client#getLastMessageId()

获取上一个已发送的信息的id

12、mqtt.Client#reconnecting

当客户端正在尝试重连时为true,否则为false

#1 3、 mqtt.Store(options)
在内存中实现的消息存储

  • options:存储选项
    • clean:当调用close方法时,清除缓存的信息,默认为true

14、mqtt.Store#put(packet, callback)

将数据包添加到存储中,数据包就是任何包含messageid属性的内容,数据包存储后会调用回调函数

15、mqtt.Store#createStream()

创建包含内存中所有数据包的流

16、mqtt.Store#del(packet, callback)

从内存中移除数据包,当移除成功后,会调用回调函数

17、mqtt.Store#close(cb)

关闭存储

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/824681
推荐阅读
相关标签
  

闽ICP备14008679号