赞
踩
本章内容主要是介绍MQTT协议的Qos等级以及利用python程序测试基于MQTT协议客户端通信。MQTT协议是一种基于发布/订阅模式的轻量级消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用而设计,可为联网设备提供实时可靠的消息服务。
本章内容不对MQTT做过多的介绍,详细的介绍参见文章末尾的参考资料,有关于EMQX的MQTT协议入门、进阶学习资料,非常详细丰富。本章节主要介绍不同qos等级的差异,网络上的资料错综复杂,而且还讲不清楚其中的内容,本章内容做了一个简单的归纳整理。并解决了qos=1时订阅方无法收到消息的问题。
MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,它支持三种不同的 QoS(Quality of Service,服务质量)级别,用于确保消息的可靠性和顺序性:
QoS 0(最多一次):这是最简单的服务质量级别。当使用 QoS 0 发送消息时,消息会尽力投递,但不会进行确认或重传。发送者只会发送一次消息,无论它是否到达目标设备。对于 QoS 0,可能会出现消息丢失、乱序或重复接收的情况。这种级别适用于不要求消息可靠性的应用场景,例如传感器数据定期上报等。大多数场景使用qos=0是足够的。 当我们使用 QoS 0 传递消息时,消息的可靠性完全依赖于底层的 TCP 协议。QoS 0 消息发送丢失最主要的场景是:出现TCP连接关闭、重置,可能丢失当前处于网络链路或操作系统底层缓冲区中的消息。
QoS 1(至少一次):此级别下,消息会确保至少被传输一次。发送者会等待接收者发送确认消息(PUBACK)作为响应。如果发送者没有收到确认,它将重新发送消息,直到接收到确认为止。这样可以确保消息的可靠传输,但可能会导致重复的消息传递。QoS 1 在需要可靠性但允许少量重复的应用场景中常被使用,如传感器数据采集。
QoS 2(恰好一次):这是最高的服务质量级别,提供了恰好一次的传输保证。消息会进行两阶段的握手。发送者首先发送消息,接收者对其进行确认(PUBREC),然后发送者再次确认(PUBREL),最后接收者进行最终的确认(PUBCOMP)。这种级别可以确保消息只被传输一次,也不会出现重复接收的情况。QoS 2 提供了最高的可靠性,但也带来了更多的通信开销和延迟。它适用于对消息顺序和可靠性要求很高的应用场景,如金融交易。
在选择 QoS 级别时,需要根据应用的需求权衡消息传输的可靠性、延迟和通信开销。较低的 QoS 级别提供了更高的吞吐量和较低的延迟,但可能会导致消息丢失或重复;而较高的 QoS 级别可以确保消息可靠性,但会增加延迟和通信开销。
在实际应用中,需要根据具体需求选择适当的 QoS 级别。
注意:发布者和订阅者在发布/订阅消息时设置的qos等级是相对不同的概念。如下图:
发布者发布消息的qos=2指的是发布者与代理服务器Broker之间的服务质量,订阅者订阅消息设置的qos=1指的是订阅者与代理服务器Broker之间的服务质量。并且当订阅消息的qos等级小于发布消息的qos等级时会发生“降级”,即订阅者接收到消息时的服务等级是发布和订阅时较小一方的qos等级。【如上图订阅时的qos为1,发布时的qos为2,接收消息的qos将为1】
这里为了方便测试和学习使用的是EMQX的免费在线 MQTT 服务器:
MQTT 服务器信息
- Broker:broker.emqx.io
- TCP 端口:1883
- WebSocket 端口:8083
- SSL/TLS 端口:8883
- WebSocket Secure 端口:8084
- CA 证书文件:
使用的是库是paho.mqtt,支持以TCP或WebSocket的方式接入代理服务器:
订阅者客户端程序:(qos 0)
import paho.mqtt.client as mqtt broker = 'broker.emqx.io' # 代理服务器 tcp_port = 1883 # TCP 端口 ws_port = 8083 # websocket端口 topic = "mqtt_test_dd" client_id = 'python-mqtt-test-002' # 任意设置一个接入的client_id(不需要也可以) # 定义连接的回调函数 def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) # 定义接收处理消息的回调函数 def on_message(client, userdata, msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") # 创建客户端对象 client = mqtt.Client(client_id) # 默认为TCP接入 client = mqtt.Client(client_id, transport="websockets") # 以websocket接入 # 设置回调函数 client.on_connect = on_connect client.on_message = on_message # 连接到MQTT服务器 client.connect(broker, tcp_port, 60) # 订阅主题 client.subscribe(topic, qos=0) # 启动监听消息 client.loop_forever() # 断开连接 client.disconnect()发布者客户端程序:(qos 0)
import paho.mqtt.client as mqtt import time broker = 'broker.emqx.io' # 代理服务器 tcp_port = 1883 # TCP 端口 ws_port = 8083 # websocket端口 topic = "mqtt_test_dd" client_id = 'python-mqtt-test-001' # 任意设置一个接入的client_id(不需要也可以) def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) # 创建客户端对象 client = mqtt.Client(client_id) # 设置回调函数 client.on_connect = on_connect # 连接到MQTT服务器 client.connect(broker, tcp_port) # 间隔两秒发布消息 msg_count = 0 while True: time.sleep(2) msg = f"messages: {msg_count}" result = client.publish(topic, msg, qos=0) # (rc, mid) = client.publish(...) rc:是否发布成功 mid:消息序列id status = result[0] if status == 0: # 发布成功 print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1 # 断开连接 client.disconnect()
程序运行结果:
这里可以看到,发布者在连接服务器后并没有调用设置好的回调函数打印出:
Connected to MQTT Broker!
实际是这里没有调用loop_start()等监听消息,回调函数无法生效!!
而这也是上述发布者和订阅者的程序将qos等级改为1进行通信时发生的:发送20条消息后,订阅者将无法再接收消息这个问题的解决方法。
添加了监听消息的发布者客户端程序:
import paho.mqtt.client as mqtt import time broker = 'broker.emqx.io' # 代理服务器 tcp_port = 1883 # TCP 端口 ws_port = 8083 # websocket端口 topic = "mqtt_test_dd" client_id = 'python-mqtt-test-001' # 任意设置一个接入的client_id(不需要也可以) def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) # 创建客户端对象 client = mqtt.Client(client_id) # 设置回调函数 client.on_connect = on_connect # 连接到MQTT服务器 client.connect(broker, tcp_port) # 监听消息(非阻塞) client.loop_start() # 间隔两秒发布消息 msg_count = 0 while True: time.sleep(2) msg = f"messages: {msg_count}" result = client.publish(topic, msg, qos=1) # (rc, mid) = client.publish(...) rc:是否发布成功 mid:消息序列id status = result[0] if status == 0: # 发布成功 print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1 # 断开连接 client.disconnect()运行程序即可实现基于MQTT协议的发布者和订阅者的正常收发通信。
注:该服务器为公共服务器,故在测试时,订阅和发布消息的主题应该尽量避免简单化,防止多个人共用一个主题影响测试。同时有时候无法通信可能是服务器的问题。
小议 MQTT 物联网传输协议 - UinIO.com 电子技术博客
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。