赞
踩
由于工作保密性质,这里没有贴开发代码,自己空闲之余写的demo可以看看。
import time import datetime import os import threading import pickle from gevent import monkey from gevent.pywsgi import WSGIServer from geventwebsocket.handler import WebSocketHandler monkey.patch_all() import configparser import json import logging from logging import handlers from paho.mqtt import client as mqtt_client from influxdb_client import WritePrecision, InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS from flask import Flask app = Flask(__name__) logger = logging.getLogger('parseadapter') logger.setLevel(level=logging.INFO) # 设置log格式 formatter = logging.Formatter('%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s') # 控制台打印 stream_handler = logging.StreamHandler() stream_handler.setLevel(logging.INFO) stream_handler.setFormatter(formatter) logger.addHandler(stream_handler) # 输出到.log文件 size_rotating_file_handler = handlers.RotatingFileHandler(filename='MyMQTTClient.log', maxBytes=10240, mode='a', backupCount=3) size_rotating_file_handler.setLevel(logging.INFO) size_rotating_file_handler.setFormatter(formatter) logger.addHandler(size_rotating_file_handler) class MyMQTTClient(object): def __init__(self,config_filename='MyMQTTClient.conf'): super(MyMQTTClient.__init__) self.__broker = self.getConfig(config_filename, 'MQTT', 'broker') # 代理服务器 self.__port = int(self.getConfig(config_filename, 'MQTT', 'port')) # 端口 self.__keepalive = int(self.getConfig(config_filename, 'MQTT', 'keepalive')) # 心跳 self.__username = self.getConfig(config_filename, 'MQTT', 'username') # 用户名 self.__psword = self.getConfig(config_filename, 'MQTT', 'password') # 密码 current_time = int(round(time.time() * 1000)) client_id = f'test-mqtt-{current_time}' # 客户端唯一 self.client = mqtt_client.Client(client_id) self.client.username_pw_set(self.__username, self.__psword) self.client.on_connect = self.on_connect self.client.on_publish = self.on_publish self.client.on_subscribe = self.on_subscribe self.client.on_unsubscribe = self.on_unsubscribe self.client.on_message = self.on_message self.client.on_disconnect = self.on_disconnect self.client.connect(self.__broker, self.__port, self.__keepalive) # 读取配置文件 静态方法,可以当作写在类中的函数,但与类没有关系,可访问类的属性 @staticmethod def getConfig(filename, section, option): """ :param filename 文件名称 :param section: 服务 :param option: 配置参数 :return:返回配置信息 """ # 获取当前目录路径 proDir = os.path.split(os.path.realpath(__file__))[0] # logger.info(proDir) # 拼接路径获取完整路径 configPath = os.path.join(proDir, filename) # logger.info(configPath) # 创建ConfigParser对象 conf = configparser.ConfigParser() # 读取文件内容 conf.read(configPath) # logger.info(conf.sections()) config = conf.get(section, option) return config # 定义发布消息 def publish(self,topic,data): ''' topic:主题,字符串格式; data:json字符串或者其他字符串,int,float,其他类型暂不支持 qos:等级,0:最多一次,1:最少一次,2:只一次 ''' self.client.loop_start() data = json.dum try: result = self.client.publish(topic, data, qos=0) # 仅支持None,string,int,float类型的数据 # result: [0, 1] status = result[0] if status == 0: logger.info(f"Send data to topic {topic}") else: logger.error(f"Failed to send message to topic {topic}") except Exception as e: logger.error("发布失败,请检查 :%s" % e) # 发布成功后回调 def on_publish(self): logger.info("发布成功") # 订阅主题 def subscribe(self,topic): # 订阅多个主题:[(topic01,1),(topic02,1),(topic03,1)] self.client.subscribe([(topic,1)]) # 接收服务端消息后,回调 def on_message(self, client, userdata, msg): logger.info(f"接收到消息{msg}") # 可对接收的消息进行处理 # 订阅成功回调 def on_subscribe(self, client, userdata, mid, granted_qos): logger.info("订阅成功") # 取消订阅 def unsubscribe(self,topic): self.client.unsubscribe(topic) # 取消订阅 成功后回调 def on_unsubscribe(self, client, userdata, mid): logger.info("取消成功") # 断开连接 def disconnect(self): self.client.loop_stop() # 停止线程 self.client.disconnect() # 断开连接 def get_client(): newclient = MyMQTTClient() logger.info(f"创建客户端对象,{newclient}") return newclient @app.route('/subscribe/<topics_str>', methods=['GET']) def recv_topic(topics_str): publisher = newclient sub_thread = threading.Thread(target=main_subscribe, args=(publisher, topics_str,)) sub_thread.start() return f"订阅主题:{topics_str},等待接收数据" # 订阅 def main_subscribe(publisher, topics_str): publisher.subscribe(topics_str) # 取消订阅 def main_unsubscribe(unsubscriber, topics_str): unsubscriber.unsubscribe(topics_str) # 取消订阅 @app.route('/unsubscribe/<topics_str>', methods=['GET']) def unsubscribe(topics_str): unsubscriber = newclient unsub_thread = threading.Thread(target=main_unsubscribe, args=(unsubscriber, topics_str,)) unsub_thread.start() return f"取消订阅:{topics_str}" if __name__ == "__main__": # 规定统一客户端 用于发布/订阅/取消订阅 订阅/取消订阅为同一个客户端 订阅和发布也可为同一个客户端,也可为两个客户端 newclient = get_client() http_server = WSGIServer(('127.0.0.1', 8080), app, handler_class=WebSocketHandler) http_server.serve_forever()
- [MQTT]
- broker = 127.0.0.1
- port = 1883
- keepalive = 60
- username = admin
- password = 0
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。