当前位置:   article > 正文

基于python实现mqtt_python mqtt模块

python mqtt模块

前言

这篇博客更像是笔记吧,写这篇文章的原因是,看别人的博文排版真是一言难尽,还不好copy,为了自己未来使用得方便,还是记录一下吧。


安装mqtt

mqtt库还是有几个的,不过从目前使用体验来说,还是paho-mqtt好用。

安装依赖包psutil和paho-mqtt

  1. pip install psutil
  2. pip install paho-mqtt

消息发布代码

  1. #!/usr/bin/env python
  2. #coding:utf-8
  3. import time
  4. import json
  5. import psutil
  6. import random
  7. from paho.mqtt import client as mqtt_client
  8. broker = 'broker.emqx.io' # mqtt代理服务器地址
  9. port = 1883
  10. keepalive = 60 # 与代理通信之间允许的最长时间段(以秒为单位)
  11. topic = "/python/mqtt" # 消息主题
  12. client_id = f'python-mqtt-pub-{random.randint(0, 1000)}' # 客户端id不能重复
  13. def to_M(n):
  14. '''将B转换为M'''
  15. u = 1024 * 1024
  16. m = round(n / u, 2)
  17. return m
  18. def get_info():
  19. '''获取系统硬件信息:cpu利用率,cpu个数,系统负载,内存信息等'''
  20. cpu_percent = psutil.cpu_percent(interval=1)
  21. cpu_count = psutil.cpu_count()
  22. sys_loadavg = [round(x / psutil.cpu_count() * 100, 2) for x in psutil.getloadavg()]
  23. mem = psutil.virtual_memory()
  24. mem_total, men_free = to_M(mem.total), to_M(mem.free)
  25. mem_percent = mem.percent
  26. info = {
  27. 'cpu_percent': cpu_percent,
  28. 'cpu_count' : cpu_count,
  29. 'sys_loadavg': sys_loadavg,
  30. 'mem_total': mem_total,
  31. 'mem_percent': mem_percent,
  32. 'men_free': men_free
  33. }
  34. # mqtt只能传输字符串数据
  35. return json.dumps(info)
  36. def connect_mqtt():
  37. '''连接mqtt代理服务器'''
  38. def on_connect(client, userdata, flags, rc):
  39. '''连接回调函数'''
  40. # 响应状态码为0表示连接成功
  41. if rc == 0:
  42. print("Connected to MQTT OK!")
  43. else:
  44. print("Failed to connect, return code %d\n", rc)
  45. # 连接mqtt代理服务器,并获取连接引用
  46. client = mqtt_client.Client(client_id)
  47. client.on_connect = on_connect
  48. client.connect(broker, port, keepalive)
  49. return client
  50. def publish(client):
  51. '''发布消息'''
  52. while True:
  53. '''每隔4秒发布一次服务器信息'''
  54. time.sleep(4)
  55. msg = get_info()
  56. result = client.publish(topic, msg)
  57. status = result[0]
  58. if status == 0:
  59. print(f"Send `{msg}` to topic `{topic}`")
  60. else:
  61. print(f"Failed to send message to topic {topic}")
  62. def run():
  63. '''运行发布者'''
  64. client = connect_mqtt()
  65. # 运行一个线程来自动调用loop()处理网络事件, 非阻塞
  66. client.loop_start()
  67. publish(client)
  68. if __name__ == '__main__':
  69. run()

消息订阅代码

  1. #!/usr/bin/env python
  2. #coding:utf-8
  3. import random
  4. from paho.mqtt import client as mqtt_client
  5. broker = 'broker.emqx.io' # mqtt代理服务器地址
  6. port = 1883
  7. keepalive = 60 # 与代理通信之间允许的最长时间段(以秒为单位)
  8. topic = "/python/mqtt" # 消息主题
  9. client_id = f'python-mqtt-sub-{random.randint(0, 1000)}' # 客户端id不能重复
  10. def connect_mqtt():
  11. '''连接mqtt代理服务器'''
  12. def on_connect(client, userdata, flags, rc):
  13. '''连接回调函数'''
  14. # 响应状态码为0表示连接成功
  15. if rc == 0:
  16. print("Connected to MQTT OK!")
  17. else:
  18. print("Failed to connect, return code %d\n", rc)
  19. client = mqtt_client.Client(client_id)
  20. client.on_connect = on_connect
  21. client.connect(broker, port, keepalive )
  22. return client
  23. def subscribe(client: mqtt_client):
  24. '''订阅主题并接收消息'''
  25. def on_message(client, userdata, msg):
  26. '''订阅消息回调函数'''
  27. print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
  28. # 订阅指定消息主题
  29. client.subscribe(topic)
  30. client.on_message = on_message
  31. def run():
  32. # 运行订阅者
  33. client = connect_mqtt()
  34. subscribe(client)
  35. # 运行一个线程来自动调用loop()处理网络事件, 阻塞模式
  36. client.loop_forever()
  37. if __name__ == '__main__':
  38. run()

简评

可以用这两个例子相互测试。还有什么想了解的欢迎评论区留言。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/188537
推荐阅读
相关标签
  

闽ICP备14008679号