赞
踩
前言 本人曾在简书上发表过本文,由于简书对公式支持不好,现已搬迁至CSDN,将简书上的文章在CSDN上集中发布。
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是基于“订阅/发布”模式的轻量级通信协议,该协议基于TCP/IP,能以极低的带宽为海量(百万级)跨域设备提供可靠的消息服务,因此在物联网、小型移动终端、边缘计算方面有广泛应用。
所谓可靠的消息传输,体现为可配置消息的服务质量(QoS),有三种服务质量可选:
MQTT协议中存在三种身份:消息总线(Broker)、发布者(Publish)和订阅者(Subscribe),其中消息总线属于服务器,后两者都属于客户端。发布者和订阅者可以是各种物联网设备和小型终端,消息发布者可以同时也是消息订阅者,如下图所示。
MQTT有一个底层覆盖网:它将建立客户端到服务器的连接,提供两者之间有序、无损、基于字节流的双向传输。当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。
客户端与消息总线建立连接后就是一个会话(Session),它们之间有周期性的状态交互,且可跨越多个连续的网络连接。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
订阅消息时,可以在订阅表达式中使用通配符筛选器对主题进行筛选,可同时订阅所匹配的多个主题。
MQTT协议中主要有以下5个方法:
EMQX(Erlang/Enterprise/Elastic MQTT Broker),是基于Erlang语言开发的开源物联网MQTT消息总线。其是一款由前华为员工开发的开源软件,官网地址在此,配套有丰富的文档和使用说明。我们可以使用docker来快速部署,官网也有简洁明了的安装说明,这里不再冗述。
docker部署后,绑定了1883端口和18083端口。其中1883端口是服务总线对外提供服务的端口,客户端通过该端口与消息总线通讯。18083端口是网页版后台监控端口,打开localhost:18083,默认的管理员名为Admin,密码为public。监控界面可以看到所有连接到总线的客户节点、注册的主题、传递消息的统计数据等都一目了然,还提供了测试工具,可以测试连接消息总线并订阅主题,也可向总线发送主题消息测试客户端是否能收到订阅内容。
Docker部署后,系统已具备基本使用条件。关于如何使用插件、配置安全认证等,在官方文档有详细步骤说明,可按图索翼深入研究,这里不再展开,以后有需要再行探索。
Paho是Eclipse基金会支持的MQTT客户端开源实现,官网地址在此。其提供了如下语言库:java、Python、JavaScript、GoLang、C/C++/C#、Rust、Android Service等。
这里,我选择用Python基于Paho快速搭建一个最简单的MQTT客户端。
pip install paho-mqtt
import paho.mqtt.client as mqtt
client=mqtt.Client()
client.connect('127.0.0.1',1883,600)
client.loop_start()
while True:
topic=input('Enter the topic name: ')
message=input('Enter the message to send: ')
client.publish(topic,payload=message,qos=0)
import paho.mqtt.client as mqtt
def on_message(client,userdata,msg):
print(msg.topic+" "+str(msg.payload))
client=mqtt.Client()
client.on_message=on_message
client.connect('127.0.0.1',1883,600)
topic=input('Enter the topic you want to subscribe: ')
client.subscribe(topic,qos=0)
client.loop_forever()
先运行subscribe.py,进行持续监听。再另起一个终端,进入虚拟环境,并运行publish.py,输入对应的主题和消息内容,之前的客户端就能收到消息了。
也可以到EMQX的监控后台中,通过测试工具发送消息和订阅消息,也可在监控视图中查看到当前连接的客户端、消息统计数据等。
Paho for python的官方使用手册在此,其中列出了详细说明。此处列出其中的常用方法,并简要说明。
while True:
client.loop()
client.connect("mqtt.eclipse.org")
client.loop_start()
while True:
temperature = sensor.blocking_read()
client.publish("paho/temperature", temperature)
0:连接成功
1:连接拒绝-不正确的协议版本
2:连接拒绝-无效客户标识符
3:连接拒绝-服务器不可用
4:连接拒绝-错误用户名或密码
5:连接拒绝-未授权
6-255:当前未使用。
def on_connect(client, userdata, flags, rc):
print("Connection returned result: "+connack_string(rc))
mqttc.on_connect = on_connect
...
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")
client.on_disconnect = on_disconnect
def on_message(client, userdata, message):
print("Received message '" + str(message.payload) + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))
client.on_message = on_message
python中struct是用来处理结构数据的,可将结构数据转换为字节流,再将字节流恢复为结构数据。在转换过程中,需要使用一个格式化字符串,该字符串中的格式需与所处理的结构数据格式一一对应。
其中格式字符串由一个或多个格式字符组成,格式字符参照如下:
字节流存在大小端问题,在格式字符串首位,有一个可选字符来决定是大端还是小端,参考如下。默认为@,即使用本机字符顺序。
这里有个示例:
import struct
buffer1 = struct.pack( "ihb" , 1 , 2 , 3 )
buffer2 = struct.unpack( "ihb" , buffer )
data = [ 1 , 2 , 3 ]
buffer3 = struct.pack( "!ihb" , *data)
buffer4 = struct.unpack( "!ihb" , buffer3 )
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。