当前位置:   article > 正文

python操作MQTT 实现发布/订阅消息_python的创建线程实现mqtt的订阅和发布

python的创建线程实现mqtt的订阅和发布

关于MQTT,可以查看官网或者参考网上资料:MQTT协议,终于有人讲清楚了 - 知乎 (zhihu.com)

这里主要讲一些代码实战,需要用到的技术有python-phao,configparser,logging,以及flask,多线程相关。

由于工作保密性质,这里没有贴开发代码,自己空闲之余写的demo可以看看。

  1. import time
  2. import datetime
  3. import os
  4. import threading
  5. import pickle
  6. from gevent import monkey
  7. from gevent.pywsgi import WSGIServer
  8. from geventwebsocket.handler import WebSocketHandler
  9. monkey.patch_all()
  10. import configparser
  11. import json
  12. import logging
  13. from logging import handlers
  14. from paho.mqtt import client as mqtt_client
  15. from influxdb_client import WritePrecision, InfluxDBClient, Point
  16. from influxdb_client.client.write_api import SYNCHRONOUS
  17. from flask import Flask
  18. app = Flask(__name__)
  19. logger = logging.getLogger('parseadapter')
  20. logger.setLevel(level=logging.INFO)
  21. # 设置log格式
  22. formatter = logging.Formatter('%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
  23. # 控制台打印
  24. stream_handler = logging.StreamHandler()
  25. stream_handler.setLevel(logging.INFO)
  26. stream_handler.setFormatter(formatter)
  27. logger.addHandler(stream_handler)
  28. # 输出到.log文件
  29. size_rotating_file_handler = handlers.RotatingFileHandler(filename='MyMQTTClient.log', maxBytes=10240, mode='a',
  30. backupCount=3)
  31. size_rotating_file_handler.setLevel(logging.INFO)
  32. size_rotating_file_handler.setFormatter(formatter)
  33. logger.addHandler(size_rotating_file_handler)
  34. class MyMQTTClient(object):
  35. def __init__(self,config_filename='MyMQTTClient.conf'):
  36. super(MyMQTTClient.__init__)
  37. self.__broker = self.getConfig(config_filename, 'MQTT', 'broker') # 代理服务器
  38. self.__port = int(self.getConfig(config_filename, 'MQTT', 'port')) # 端口
  39. self.__keepalive = int(self.getConfig(config_filename, 'MQTT', 'keepalive')) # 心跳
  40. self.__username = self.getConfig(config_filename, 'MQTT', 'username') # 用户名
  41. self.__psword = self.getConfig(config_filename, 'MQTT', 'password') # 密码
  42. current_time = int(round(time.time() * 1000))
  43. client_id = f'test-mqtt-{current_time}' # 客户端唯一
  44. self.client = mqtt_client.Client(client_id)
  45. self.client.username_pw_set(self.__username, self.__psword)
  46. self.client.on_connect = self.on_connect
  47. self.client.on_publish = self.on_publish
  48. self.client.on_subscribe = self.on_subscribe
  49. self.client.on_unsubscribe = self.on_unsubscribe
  50. self.client.on_message = self.on_message
  51. self.client.on_disconnect = self.on_disconnect
  52. self.client.connect(self.__broker, self.__port, self.__keepalive)
  53. # 读取配置文件 静态方法,可以当作写在类中的函数,但与类没有关系,可访问类的属性
  54. @staticmethod
  55. def getConfig(filename, section, option):
  56. """
  57. :param filename 文件名称
  58. :param section: 服务
  59. :param option: 配置参数
  60. :return:返回配置信息
  61. """
  62. # 获取当前目录路径
  63. proDir = os.path.split(os.path.realpath(__file__))[0]
  64. # logger.info(proDir)
  65. # 拼接路径获取完整路径
  66. configPath = os.path.join(proDir, filename)
  67. # logger.info(configPath)
  68. # 创建ConfigParser对象
  69. conf = configparser.ConfigParser()
  70. # 读取文件内容
  71. conf.read(configPath)
  72. # logger.info(conf.sections())
  73. config = conf.get(section, option)
  74. return config
  75. # 定义发布消息
  76. def publish(self,topic,data):
  77. '''
  78. topic:主题,字符串格式;
  79. data:json字符串或者其他字符串,int,float,其他类型暂不支持
  80. qos:等级,0:最多一次,1:最少一次,2:只一次
  81. '''
  82. self.client.loop_start()
  83. data = json.dum
  84. try:
  85. result = self.client.publish(topic, data, qos=0) # 仅支持None,string,int,float类型的数据
  86. # result: [0, 1]
  87. status = result[0]
  88. if status == 0:
  89. logger.info(f"Send data to topic {topic}")
  90. else:
  91. logger.error(f"Failed to send message to topic {topic}")
  92. except Exception as e:
  93. logger.error("发布失败,请检查 :%s" % e)
  94. # 发布成功后回调
  95. def on_publish(self):
  96. logger.info("发布成功")
  97. # 订阅主题
  98. def subscribe(self,topic):
  99. # 订阅多个主题:[(topic01,1),(topic02,1),(topic03,1)]
  100. self.client.subscribe([(topic,1)])
  101. # 接收服务端消息后,回调
  102. def on_message(self, client, userdata, msg):
  103. logger.info(f"接收到消息{msg}")
  104. # 可对接收的消息进行处理
  105. # 订阅成功回调
  106. def on_subscribe(self, client, userdata, mid, granted_qos):
  107. logger.info("订阅成功")
  108. # 取消订阅
  109. def unsubscribe(self,topic):
  110. self.client.unsubscribe(topic)
  111. # 取消订阅 成功后回调
  112. def on_unsubscribe(self, client, userdata, mid):
  113. logger.info("取消成功")
  114. # 断开连接
  115. def disconnect(self):
  116. self.client.loop_stop() # 停止线程
  117. self.client.disconnect() # 断开连接
  118. def get_client():
  119. newclient = MyMQTTClient()
  120. logger.info(f"创建客户端对象,{newclient}")
  121. return newclient
  122. @app.route('/subscribe/<topics_str>', methods=['GET'])
  123. def recv_topic(topics_str):
  124. publisher = newclient
  125. sub_thread = threading.Thread(target=main_subscribe, args=(publisher, topics_str,))
  126. sub_thread.start()
  127. return f"订阅主题:{topics_str},等待接收数据"
  128. # 订阅
  129. def main_subscribe(publisher, topics_str):
  130. publisher.subscribe(topics_str)
  131. # 取消订阅
  132. def main_unsubscribe(unsubscriber, topics_str):
  133. unsubscriber.unsubscribe(topics_str)
  134. # 取消订阅
  135. @app.route('/unsubscribe/<topics_str>', methods=['GET'])
  136. def unsubscribe(topics_str):
  137. unsubscriber = newclient
  138. unsub_thread = threading.Thread(target=main_unsubscribe, args=(unsubscriber, topics_str,))
  139. unsub_thread.start()
  140. return f"取消订阅:{topics_str}"
  141. if __name__ == "__main__":
  142. # 规定统一客户端 用于发布/订阅/取消订阅 订阅/取消订阅为同一个客户端 订阅和发布也可为同一个客户端,也可为两个客户端
  143. newclient = get_client()
  144. http_server = WSGIServer(('127.0.0.1', 8080), app, handler_class=WebSocketHandler)
  145. http_server.serve_forever()

配置文件.conf,这个很好用,格式大概如下:

  1. [MQTT]
  2. broker = 127.0.0.1
  3. port = 1883
  4. keepalive = 60
  5. username = admin
  6. password = 0
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/485777
推荐阅读
相关标签
  

闽ICP备14008679号