当前位置:   article > 正文

【python】MQTT通信_/python/mqttls0tls1crudjtibq0

/python/mqttls0tls1crudjtibq0

一、MQTT协议

MQTT协议几个概念解释一下,对MQTT协议熟悉的请跳过这部分

1、遗言消息

① 一旦连接到MQTT服务器,遗言消息就会被服务器托管,本客户端凡是非正常断开连接 服务器就会将本遗言发送给订阅该遗言消息的客户端,告知对方本客户端离线

② retain=False 新的client订阅是不会收到遗嘱消息的

2、服务质量(QoS)

  0:至多一次,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。

  1:至少一次,确保消息到达,但消息重复可能会发生。

  2:只有一次,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

3、连接返回代码

0: 连接成功

1: 连接失败 - 不正确的协议版本

2: 连接失败 - 无效的客户端标识符

3: 连接失败 - 服务器不可用

4: 连接失败 - 错误的用户名或密码

4、消息主题

① 大小写敏感

② # 符号是用于匹配主题中任意层级的通配符

5、Client ID可以自定义,但要注意不能重复

6、MQTT只能传输字符串数据

二、下载MQTT服务器端EMQX

1、EMQX下载链接: 免费下载、试用 EMQ 产品

2、选择开源版(当然如果条件允许也可以选择企业版),选择对应系统

 3、下载完成,解压gzip文件,进入解压目录,进入命令行,输入

./bin/emqx start

三、下载MQTT服务器端MQTTX(也可以不下载)

1、下载链接: MQTT 桌面客户端 - MQTT X | EMQ

2、使用MQTTX客户端:

① 获取IP地址:打开命令行,输入systeminfo

②打开MQTTX客户端,新建连接

名称不能与已建的连接重名

Client ID是自动填写的,可以不用管

服务器地址填写第一步获取到的IP地址

其他信息暂时不用填,点击右上角的连接,可以看到返回已连接 

 

四、Python代码

1、安装python MQTT第三方库

pip install paho.mqtt

2、简单的发布和订阅

        2.1 publish_basic.py

  1. # publish_basic.py
  2. import time
  3. import json
  4. import random
  5. from paho.mqtt import client as mqtt_client
  6. broker = '192.168.***.***' # mqtt代理服务器地址
  7. port = 1883
  8. keepalive = 60 # 与代理通信之间允许的最长时间段(以秒为单位)
  9. topic = "test/send" # 消息主题
  10. client_id = f'python-mqtt-send-{random.randint(0, 1000)}' # 客户端id不能重复
  11. def connect_mqtt():
  12. '''连接mqtt代理服务器'''
  13. def on_connect(client, userdata, flags, rc):
  14. '''连接回调函数'''
  15. if rc == 0:
  16. print("Connected to MQTT successfully!")
  17. else:
  18. print("Failed to connect, return code {0}".format(rc))
  19. # 连接mqtt代理服务器,并获取连接引用
  20. client = mqtt_client.Client(client_id)
  21. client.on_connect = on_connect
  22. client.connect(broker, port, keepalive)
  23. return client
  24. def publish(client):
  25. '''发布消息'''
  26. while True:
  27. '''每隔4秒发布一次信息'''
  28. time.sleep(4)
  29. # mqtt只能传输字符串数据
  30. info = {
  31. 'msg1': 'Hello kristina',
  32. 'msg2': 'msg {0}'.format(random.randint(0, 1000))
  33. }
  34. msg = json.dumps(info)
  35. # 默认retain=False,一个Topic只能有一个retained消息,后设置的会覆盖前面的消息
  36. result = client.publish(topic=topic, payload=msg, qos=0, retain=True)
  37. # 删除retained消息
  38. # result = client.publish(topic=topic, payload=None, qos=0, retain=True)
  39. if result[0] == 0:
  40. print("Send {0} to topic {1}".format(msg, topic))
  41. else:
  42. print("Failed to send message {0} to topic {1}".format(msg, topic))
  43. def run():
  44. '''运行发布者'''
  45. client = connect_mqtt()
  46. # 运行一个线程来自动调用loop()处理网络事件, 非阻塞
  47. client.loop_start()
  48. publish(client)
  49. if __name__ == '__main__':
  50. run()

        2.2 subscribe_basic.py

  1. # subscribe_basic.py
  2. import random
  3. from paho.mqtt import client as mqtt_client
  4. import json
  5. from utils_logger import get_logger
  6. broker = '192.168.***.***' # mqtt代理服务器地址
  7. port = 1883
  8. keepalive = 60 # 与代理通信之间允许的最长时间段(以秒为单位)
  9. topic = "test/subcribe"
  10. client_id = f'python-mqtt-subscribe-{random.randint(0, 1000)}' # 可自定义,但要注意客户端id不能重复
  11. def connect_mqtt():
  12. '''连接mqtt代理服务器'''
  13. def on_connect(client, userdata, flags, rc):
  14. '''连接回调函数'''
  15. # 响应状态码为0表示连接成功
  16. if rc == 0:
  17. print("Connected to MQTT successfully!")
  18. else:
  19. print("Failed to connect, return code {0}".format(rc))
  20. client = mqtt_client.Client(client_id)
  21. client.on_connect = on_connect
  22. client.connect(broker, port, keepalive)
  23. return client
  24. def subscribe(client: mqtt_client):
  25. '''订阅主题并接收消息'''
  26. def on_message(client, userdata, msg):
  27. '''订阅消息回调函数'''
  28. data = json.loads(msg.payload) # data = 字典 #payload = json数据
  29. print("Received message from topic {0} ".format(msg.topic))
  30. print("The message have {0} information".format(len(data)))
  31. print("The information is '{0}'".format(data))
  32. # 订阅指定消息主题
  33. client.subscribe(topic=topic, qos=0)
  34. # 将回调函数指派给客户端实例
  35. client.on_message = on_message
  36. def run():
  37. # 运行订阅者
  38. client = connect_mqtt()
  39. subscribe(client)
  40. # 运行一个线程来自动调用loop()处理网络事件, 阻塞模式
  41. client.loop_forever()# 保持 loop()调用
  42. if __name__ == '__main__':
  43. run()

3、进阶版发布和订阅(添加写入log,遗言主题以及上线下线消息)

        3.1 publish.py

  1. # publish.py
  2. import time
  3. import json
  4. import random
  5. from paho.mqtt import client as mqtt_client
  6. from utils_logger import get_logger
  7. broker = '192.168.***.***' # mqtt代理服务器地址
  8. port = 1883
  9. keepalive = 60 # 与代理通信之间允许的最长时间段(以秒为单位)
  10. topic = "test/send" # 消息主题
  11. will_topic = "test/will" # 遗言消息主题
  12. client_id = f'python-mqtt-send-{random.randint(0, 1000)}' # 客户端id不能重复
  13. # 遗言消息内容
  14. will_msg = {
  15. "ID": f"{client_id}",
  16. "stat": "Offline"
  17. }
  18. # 上线消息内容
  19. online_msg = {
  20. "ID": f"{client_id}",
  21. "stat": "online"
  22. }
  23. logfolder = 'logs'
  24. logfile = f'mqtt_publish.log'
  25. publish_logger = get_logger(logfolder, logfile)
  26. def connect_mqtt():
  27. '''连接mqtt代理服务器'''
  28. def on_connect(client, userdata, flags, rc):
  29. '''连接回调函数'''
  30. if rc == 0:
  31. # 上线后清除retained消息
  32. result = client.publish(topic=topic, payload=json.dumps(online_msg), qos=0, retain=False)
  33. publish_logger.info("Connected to MQTT successfully!")
  34. if result[0] == 0:
  35. publish_logger.info("Send {0} to topic {1}".format(online_msg, topic))
  36. else:
  37. publish_logger.error("Failed to send message {0} to topic {1}".format(online_msg, topic))
  38. else:
  39. publish_logger.error("Failed to connect, return code {0}".format(rc))
  40. # 连接mqtt代理服务器,并获取连接引用
  41. client = mqtt_client.Client(client_id)
  42. client.will_set(topic=will_topic, payload=json.dumps(will_msg), qos=0, retain=False)
  43. client.on_connect = on_connect
  44. client.connect(broker, port, keepalive)
  45. return client
  46. def publish(client):
  47. '''发布消息'''
  48. while True:
  49. '''每隔4秒发布一次信息'''
  50. time.sleep(4)
  51. # mqtt只能传输字符串数据
  52. info = {
  53. 'msg1': 'Hello kristina',
  54. 'msg2': 'msg {0}'.format(random.randint(0, 1000))
  55. }
  56. msg = json.dumps(info)
  57. # 默认retain=False,一个Topic只能有一个retained消息,后设置的会覆盖前面的消息
  58. result = client.publish(topic=topic, payload=msg, qos=0, retain=True)
  59. # 删除retained消息
  60. # result = client.publish(topic=topic, payload=None, qos=0, retain=True)
  61. if result[0] == 0:
  62. publish_logger.info("Send {0} to topic {1}".format(msg, topic))
  63. else:
  64. publish_logger.error("Failed to send message {0} to topic {1}".format(msg, topic))
  65. def run():
  66. '''运行发布者'''
  67. client = connect_mqtt()
  68. # 运行一个线程来自动调用loop()处理网络事件, 非阻塞
  69. client.loop_start()
  70. publish(client)
  71. if __name__ == '__main__':
  72. run()

        3.2 subscribe.py

  1. # subcribe.py
  2. import random
  3. from paho.mqtt import client as mqtt_client
  4. import json
  5. import datetime
  6. from utils_logger import get_logger
  7. # systeminfo
  8. broker = '192.168.***.***' # mqtt代理服务器地址
  9. port = 1883
  10. keepalive = 60 # 与代理通信之间允许的最长时间段(以秒为单位)
  11. topic = "test/subcribe"
  12. logfolder = 'logs'
  13. logfile = f'mqtt_subscribe.log'
  14. subcribe_logger = get_logger(logfolder, logfile)
  15. client_id = f'python-mqtt-subscribe-{random.randint(0, 1000)}' # 可自定义,但要注意客户端id不能重复
  16. def connect_mqtt():
  17. '''连接mqtt代理服务器'''
  18. def on_connect(client, userdata, flags, rc):
  19. '''连接回调函数'''
  20. # 响应状态码为0表示连接成功
  21. if rc == 0:
  22. subcribe_logger.info("Connected to MQTT successfully!")
  23. else:
  24. subcribe_logger.error("Failed to connect, return code {0}".format(rc))
  25. client = mqtt_client.Client(client_id)
  26. client.on_connect = on_connect
  27. client.connect(broker, port, keepalive)
  28. return client
  29. def subscribe(client: mqtt_client):
  30. '''订阅主题并接收消息'''
  31. def on_message(client, userdata, msg):
  32. '''订阅消息回调函数'''
  33. data = json.loads(msg.payload) # data = 字典 #payload = json数据
  34. subcribe_logger.info("Received message from topic {0} ".format(msg.topic)) # msg.payload.decode()
  35. subcribe_logger.info("The message have {0} information".format(len(data)))
  36. subcribe_logger.info("The information is '{0}'".format(data))
  37. # 订阅指定消息主题
  38. client.subscribe(topic=topic, qos=0)
  39. # 将回调函数指派给客户端实例
  40. client.on_message = on_message
  41. def run():
  42. # 运行订阅者
  43. client = connect_mqtt()
  44. subscribe(client)
  45. # 运行一个线程来自动调用loop()处理网络事件, 阻塞模式
  46. client.loop_forever() # 保持 loop()调用
  47. if __name__ == '__main__':
  48. run()

        3.3 效果图

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/200760
推荐阅读
相关标签
  

闽ICP备14008679号