当前位置:   article > 正文

物联网通信协议:MQTT协议(qos)——python测试_python mqtt qos

python mqtt qos

一、引言

        本章内容主要是介绍MQTT协议的Qos等级以及利用python程序测试基于MQTT协议客户端通信。MQTT协议是一种基于发布/订阅模式的轻量级消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用而设计,可为联网设备提供实时可靠的消息服务。

        本章内容不对MQTT做过多的介绍,详细的介绍参见文章末尾的参考资料,有关于EMQX的MQTT协议入门、进阶学习资料,非常详细丰富。本章节主要介绍不同qos等级的差异,网络上的资料错综复杂,而且还讲不清楚其中的内容,本章内容做了一个简单的归纳整理。并解决了qos=1时订阅方无法收到消息的问题。

二、MQTT协议之Qos等级

        MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,它支持三种不同的 QoS(Quality of Service,服务质量)级别,用于确保消息的可靠性和顺序性:

  1. QoS 0(最多一次):这是最简单的服务质量级别。当使用 QoS 0 发送消息时,消息会尽力投递,但不会进行确认或重传。发送者只会发送一次消息,无论它是否到达目标设备。对于 QoS 0,可能会出现消息丢失、乱序或重复接收的情况这种级别适用于不要求消息可靠性的应用场景,例如传感器数据定期上报等。大多数场景使用qos=0是足够的。 当我们使用 QoS 0 传递消息时,消息的可靠性完全依赖于底层的 TCP 协议QoS 0 消息发送丢失最主要的场景是:出现TCP连接关闭、重置,可能丢失当前处于网络链路或操作系统底层缓冲区中的消息。

  2. QoS 1(至少一次):此级别下,消息会确保至少被传输一次。发送者会等待接收者发送确认消息(PUBACK)作为响应。如果发送者没有收到确认,它将重新发送消息,直到接收到确认为止。这样可以确保消息的可靠传输,但可能会导致重复的消息传递QoS 1 在需要可靠性但允许少量重复的应用场景中常被使用,如传感器数据采集。

  3. QoS 2(恰好一次):这是最高的服务质量级别,提供了恰好一次的传输保证。消息会进行两阶段的握手。发送者首先发送消息,接收者对其进行确认(PUBREC),然后发送者再次确认(PUBREL),最后接收者进行最终的确认(PUBCOMP)。这种级别可以确保消息只被传输一次,也不会出现重复接收的情况。QoS 2 提供了最高的可靠性,但也带来了更多的通信开销和延迟它适用于对消息顺序和可靠性要求很高的应用场景,如金融交易。

     在选择 QoS 级别时,需要根据应用的需求权衡消息传输的可靠性、延迟和通信开销。较低的 QoS 级别提供了更高的吞吐量和较低的延迟,但可能会导致消息丢失或重复;而较高的 QoS 级别可以确保消息可靠性,但会增加延迟和通信开销。

        在实际应用中,需要根据具体需求选择适当的 QoS 级别。        

注意:发布者和订阅者在发布/订阅消息时设置的qos等级是相对不同的概念。如下图:

MQTT QoS 降级

发布者发布消息的qos=2指的是发布者与代理服务器Broker之间的服务质量,订阅者订阅消息设置的qos=1指的是订阅者与代理服务器Broker之间的服务质量并且当订阅消息的qos等级小于发布消息的qos等级时会发生“降级”,即订阅者接收到消息时的服务等级是发布和订阅时较小一方的qos等级【如上图订阅时的qos为1,发布时的qos为2,接收消息的qos将为1】

 三、MQTT协议之python程序测试

这里为了方便测试和学习使用的是EMQX的免费在线 MQTT 服务器:

MQTT 服务器信息

  • Broker:broker.emqx.io
  • TCP 端口:1883
  • WebSocket 端口:8083
  • SSL/TLS 端口:8883
  • WebSocket Secure 端口:8084
  • CA 证书文件:

    broker.emqx.io-ca.crt 

 使用的是库是paho.mqtt,支持以TCPWebSocket的方式接入代理服务器:

订阅者客户端程序:(qos 0)

  1. import paho.mqtt.client as mqtt
  2. broker = 'broker.emqx.io' # 代理服务器
  3. tcp_port = 1883 # TCP 端口
  4. ws_port = 8083 # websocket端口
  5. topic = "mqtt_test_dd"
  6. client_id = 'python-mqtt-test-002' # 任意设置一个接入的client_id(不需要也可以)
  7. # 定义连接的回调函数
  8. def on_connect(client, userdata, flags, rc):
  9. if rc == 0:
  10. print("Connected to MQTT Broker!")
  11. else:
  12. print("Failed to connect, return code %d\n", rc)
  13. # 定义接收处理消息的回调函数
  14. def on_message(client, userdata, msg):
  15. print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
  16. # 创建客户端对象
  17. client = mqtt.Client(client_id) # 默认为TCP接入
  18. client = mqtt.Client(client_id, transport="websockets") # 以websocket接入
  19. # 设置回调函数
  20. client.on_connect = on_connect
  21. client.on_message = on_message
  22. # 连接到MQTT服务器
  23. client.connect(broker, tcp_port, 60)
  24. # 订阅主题
  25. client.subscribe(topic, qos=0)
  26. # 启动监听消息
  27. client.loop_forever()
  28. # 断开连接
  29. client.disconnect()

发布者客户端程序:(qos 0)

  1. import paho.mqtt.client as mqtt
  2. import time
  3. broker = 'broker.emqx.io' # 代理服务器
  4. tcp_port = 1883 # TCP 端口
  5. ws_port = 8083 # websocket端口
  6. topic = "mqtt_test_dd"
  7. client_id = 'python-mqtt-test-001' # 任意设置一个接入的client_id(不需要也可以)
  8. def on_connect(client, userdata, flags, rc):
  9. if rc == 0:
  10. print("Connected to MQTT Broker!")
  11. else:
  12. print("Failed to connect, return code %d\n", rc)
  13. # 创建客户端对象
  14. client = mqtt.Client(client_id)
  15. # 设置回调函数
  16. client.on_connect = on_connect
  17. # 连接到MQTT服务器
  18. client.connect(broker, tcp_port)
  19. # 间隔两秒发布消息
  20. msg_count = 0
  21. while True:
  22. time.sleep(2)
  23. msg = f"messages: {msg_count}"
  24. result = client.publish(topic, msg, qos=0) # (rc, mid) = client.publish(...) rc:是否发布成功 mid:消息序列id
  25. status = result[0]
  26. if status == 0: # 发布成功
  27. print(f"Send `{msg}` to topic `{topic}`")
  28. else:
  29. print(f"Failed to send message to topic {topic}")
  30. msg_count += 1
  31. # 断开连接
  32. client.disconnect()

 程序运行结果:

 这里可以看到,发布者在连接服务器后并没有调用设置好的回调函数打印出:

Connected to MQTT Broker!

实际是这里没有调用loop_start()等监听消息,回调函数无法生效!!

而这也是上述发布者和订阅者的程序将qos等级改为1进行通信时发生的:发送20条消息后,订阅者将无法再接收消息这个问题的解决方法。

  • qos1  /   2,客户端(发送方)需添加client.loop_start()在后台运行一个线程来处理网络通信和保持连接。方能实现正常接收!因为有消息确认机制!
  • 否则会出现订阅方接收完20条消息之后无法接收的情况!!!

添加了监听消息的发布者客户端程序:

  1. import paho.mqtt.client as mqtt
  2. import time
  3. broker = 'broker.emqx.io' # 代理服务器
  4. tcp_port = 1883 # TCP 端口
  5. ws_port = 8083 # websocket端口
  6. topic = "mqtt_test_dd"
  7. client_id = 'python-mqtt-test-001' # 任意设置一个接入的client_id(不需要也可以)
  8. def on_connect(client, userdata, flags, rc):
  9. if rc == 0:
  10. print("Connected to MQTT Broker!")
  11. else:
  12. print("Failed to connect, return code %d\n", rc)
  13. # 创建客户端对象
  14. client = mqtt.Client(client_id)
  15. # 设置回调函数
  16. client.on_connect = on_connect
  17. # 连接到MQTT服务器
  18. client.connect(broker, tcp_port)
  19. # 监听消息(非阻塞)
  20. client.loop_start()
  21. # 间隔两秒发布消息
  22. msg_count = 0
  23. while True:
  24. time.sleep(2)
  25. msg = f"messages: {msg_count}"
  26. result = client.publish(topic, msg, qos=1) # (rc, mid) = client.publish(...) rc:是否发布成功 mid:消息序列id
  27. status = result[0]
  28. if status == 0: # 发布成功
  29. print(f"Send `{msg}` to topic `{topic}`")
  30. else:
  31. print(f"Failed to send message to topic {topic}")
  32. msg_count += 1
  33. # 断开连接
  34. client.disconnect()

运行程序即可实现基于MQTT协议的发布者和订阅者的正常收发通信。

注:该服务器为公共服务器,故在测试时,订阅和发布消息的主题应该尽量避免简单化,防止多个人共用一个主题影响测试。同时有时候无法通信可能是服务器的问题。

四、参考资料 

1.MQTT详细教程:
MQTT 最全教程:从入门到精通 | EMQ (emqx.com)
2. MQTT物联网传输协议消息介绍:

小议 MQTT 物联网传输协议 - UinIO.com 电子技术博客

3.MQTT QoS 0, 1, 2 介绍:【图文解释】

MQTT QoS 0, 1, 2 介绍 | EMQ (emqx.com)

MQTT协议解析之QoS Level - 知乎 (zhihu.com)(图解) 

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号