赞
踩
MQTT协议几个概念解释一下,对MQTT协议熟悉的请跳过这部分
① 一旦连接到MQTT服务器,遗言消息就会被服务器托管,本客户端凡是非正常断开连接 服务器就会将本遗言发送给订阅该遗言消息的客户端,告知对方本客户端离线
② retain=False 新的client订阅是不会收到遗嘱消息的
0:至多一次,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
1:至少一次,确保消息到达,但消息重复可能会发生。
2:只有一次,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
0: 连接成功
1: 连接失败 - 不正确的协议版本
2: 连接失败 - 无效的客户端标识符
3: 连接失败 - 服务器不可用
4: 连接失败 - 错误的用户名或密码
① 大小写敏感
② # 符号是用于匹配主题中任意层级的通配符
1、EMQX下载链接: 免费下载、试用 EMQ 产品
2、选择开源版(当然如果条件允许也可以选择企业版),选择对应系统
3、下载完成,解压gzip文件,进入解压目录,进入命令行,输入
./bin/emqx start
1、下载链接: MQTT 桌面客户端 - MQTT X | EMQ
2、使用MQTTX客户端:
① 获取IP地址:打开命令行,输入systeminfo
②打开MQTTX客户端,新建连接
名称不能与已建的连接重名
Client ID是自动填写的,可以不用管
服务器地址填写第一步获取到的IP地址
其他信息暂时不用填,点击右上角的连接,可以看到返回已连接
1、安装python MQTT第三方库
pip install paho.mqtt
2、简单的发布和订阅
2.1 publish_basic.py
- # publish_basic.py
- import time
- import json
- import random
- from paho.mqtt import client as mqtt_client
-
- broker = '192.168.***.***' # mqtt代理服务器地址
- port = 1883
- keepalive = 60 # 与代理通信之间允许的最长时间段(以秒为单位)
- topic = "test/send" # 消息主题
- client_id = f'python-mqtt-send-{random.randint(0, 1000)}' # 客户端id不能重复
-
- def connect_mqtt():
- '''连接mqtt代理服务器'''
- def on_connect(client, userdata, flags, rc):
- '''连接回调函数'''
- if rc == 0:
- print("Connected to MQTT successfully!")
-
- else:
- print("Failed to connect, return code {0}".format(rc))
- # 连接mqtt代理服务器,并获取连接引用
- client = mqtt_client.Client(client_id)
- client.on_connect = on_connect
- client.connect(broker, port, keepalive)
- return client
-
- def publish(client):
- '''发布消息'''
- while True:
- '''每隔4秒发布一次信息'''
- time.sleep(4)
- # mqtt只能传输字符串数据
- info = {
- 'msg1': 'Hello kristina',
- 'msg2': 'msg {0}'.format(random.randint(0, 1000))
- }
- msg = json.dumps(info)
- # 默认retain=False,一个Topic只能有一个retained消息,后设置的会覆盖前面的消息
- result = client.publish(topic=topic, payload=msg, qos=0, retain=True)
- # 删除retained消息
- # result = client.publish(topic=topic, payload=None, qos=0, retain=True)
- if result[0] == 0:
- print("Send {0} to topic {1}".format(msg, topic))
- else:
- print("Failed to send message {0} to topic {1}".format(msg, topic))
-
- def run():
- '''运行发布者'''
- client = connect_mqtt()
- # 运行一个线程来自动调用loop()处理网络事件, 非阻塞
- client.loop_start()
- publish(client)
-
- if __name__ == '__main__':
- run()
2.2 subscribe_basic.py
- # subscribe_basic.py
- import random
- from paho.mqtt import client as mqtt_client
- import json
- from utils_logger import get_logger
-
- broker = '192.168.***.***' # mqtt代理服务器地址
- port = 1883
- keepalive = 60 # 与代理通信之间允许的最长时间段(以秒为单位)
- topic = "test/subcribe"
- client_id = f'python-mqtt-subscribe-{random.randint(0, 1000)}' # 可自定义,但要注意客户端id不能重复
-
- def connect_mqtt():
- '''连接mqtt代理服务器'''
- def on_connect(client, userdata, flags, rc):
- '''连接回调函数'''
- # 响应状态码为0表示连接成功
- if rc == 0:
- print("Connected to MQTT successfully!")
- else:
- print("Failed to connect, return code {0}".format(rc))
-
- client = mqtt_client.Client(client_id)
- client.on_connect = on_connect
- client.connect(broker, port, keepalive)
- return client
-
- def subscribe(client: mqtt_client):
- '''订阅主题并接收消息'''
- def on_message(client, userdata, msg):
- '''订阅消息回调函数'''
- data = json.loads(msg.payload) # data = 字典 #payload = json数据
- print("Received message from topic {0} ".format(msg.topic))
- print("The message have {0} information".format(len(data)))
- print("The information is '{0}'".format(data))
-
- # 订阅指定消息主题
- client.subscribe(topic=topic, qos=0)
- # 将回调函数指派给客户端实例
- client.on_message = on_message
-
- def run():
- # 运行订阅者
- client = connect_mqtt()
- subscribe(client)
- # 运行一个线程来自动调用loop()处理网络事件, 阻塞模式
- client.loop_forever()# 保持 loop()调用
-
-
- if __name__ == '__main__':
- run()
3、进阶版发布和订阅(添加写入log,遗言主题以及上线下线消息)
3.1 publish.py
- # publish.py
- import time
- import json
- import random
- from paho.mqtt import client as mqtt_client
- from utils_logger import get_logger
-
-
- broker = '192.168.***.***' # mqtt代理服务器地址
- port = 1883
- keepalive = 60 # 与代理通信之间允许的最长时间段(以秒为单位)
- topic = "test/send" # 消息主题
- will_topic = "test/will" # 遗言消息主题
- client_id = f'python-mqtt-send-{random.randint(0, 1000)}' # 客户端id不能重复
- # 遗言消息内容
- will_msg = {
- "ID": f"{client_id}",
- "stat": "Offline"
- }
- # 上线消息内容
- online_msg = {
- "ID": f"{client_id}",
- "stat": "online"
- }
- logfolder = 'logs'
- logfile = f'mqtt_publish.log'
- publish_logger = get_logger(logfolder, logfile)
-
- def connect_mqtt():
- '''连接mqtt代理服务器'''
- def on_connect(client, userdata, flags, rc):
- '''连接回调函数'''
- if rc == 0:
- # 上线后清除retained消息
- result = client.publish(topic=topic, payload=json.dumps(online_msg), qos=0, retain=False)
- publish_logger.info("Connected to MQTT successfully!")
- if result[0] == 0:
- publish_logger.info("Send {0} to topic {1}".format(online_msg, topic))
- else:
- publish_logger.error("Failed to send message {0} to topic {1}".format(online_msg, topic))
-
- else:
- publish_logger.error("Failed to connect, return code {0}".format(rc))
- # 连接mqtt代理服务器,并获取连接引用
- client = mqtt_client.Client(client_id)
- client.will_set(topic=will_topic, payload=json.dumps(will_msg), qos=0, retain=False)
- client.on_connect = on_connect
- client.connect(broker, port, keepalive)
- return client
-
- def publish(client):
- '''发布消息'''
- while True:
- '''每隔4秒发布一次信息'''
- time.sleep(4)
- # mqtt只能传输字符串数据
- info = {
- 'msg1': 'Hello kristina',
- 'msg2': 'msg {0}'.format(random.randint(0, 1000))
- }
- msg = json.dumps(info)
- # 默认retain=False,一个Topic只能有一个retained消息,后设置的会覆盖前面的消息
- result = client.publish(topic=topic, payload=msg, qos=0, retain=True)
- # 删除retained消息
- # result = client.publish(topic=topic, payload=None, qos=0, retain=True)
- if result[0] == 0:
- publish_logger.info("Send {0} to topic {1}".format(msg, topic))
- else:
- publish_logger.error("Failed to send message {0} to topic {1}".format(msg, topic))
-
- def run():
- '''运行发布者'''
- client = connect_mqtt()
- # 运行一个线程来自动调用loop()处理网络事件, 非阻塞
- client.loop_start()
- publish(client)
-
- if __name__ == '__main__':
- run()
3.2 subscribe.py
- # subcribe.py
- import random
- from paho.mqtt import client as mqtt_client
- import json
- import datetime
- from utils_logger import get_logger
- # systeminfo
- broker = '192.168.***.***' # mqtt代理服务器地址
- port = 1883
- keepalive = 60 # 与代理通信之间允许的最长时间段(以秒为单位)
- topic = "test/subcribe"
-
- logfolder = 'logs'
- logfile = f'mqtt_subscribe.log'
- subcribe_logger = get_logger(logfolder, logfile)
- client_id = f'python-mqtt-subscribe-{random.randint(0, 1000)}' # 可自定义,但要注意客户端id不能重复
-
- def connect_mqtt():
- '''连接mqtt代理服务器'''
- def on_connect(client, userdata, flags, rc):
- '''连接回调函数'''
- # 响应状态码为0表示连接成功
- if rc == 0:
- subcribe_logger.info("Connected to MQTT successfully!")
- else:
- subcribe_logger.error("Failed to connect, return code {0}".format(rc))
-
- client = mqtt_client.Client(client_id)
- client.on_connect = on_connect
- client.connect(broker, port, keepalive)
- return client
-
- def subscribe(client: mqtt_client):
- '''订阅主题并接收消息'''
- def on_message(client, userdata, msg):
- '''订阅消息回调函数'''
- data = json.loads(msg.payload) # data = 字典 #payload = json数据
- subcribe_logger.info("Received message from topic {0} ".format(msg.topic)) # msg.payload.decode()
- subcribe_logger.info("The message have {0} information".format(len(data)))
- subcribe_logger.info("The information is '{0}'".format(data))
-
- # 订阅指定消息主题
- client.subscribe(topic=topic, qos=0)
- # 将回调函数指派给客户端实例
- client.on_message = on_message
-
- def run():
- # 运行订阅者
- client = connect_mqtt()
- subscribe(client)
- # 运行一个线程来自动调用loop()处理网络事件, 阻塞模式
- client.loop_forever() # 保持 loop()调用
-
-
- if __name__ == '__main__':
- run()
3.3 效果图
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。