赞
踩
MQTT协议是一种基于发布/订阅模式的轻量级物联网消息传输协议,可以用极少代码和带宽为联网设备提供实时可靠消息服务,广泛应用于物联网,移动互联网等。
1.安装yum源
2.安装mqtt服务端
3.启动mqtt服务端
curl -s https://assets.emqx.com/scripts/install-emqx-rpm.sh | sudo bash
sudo yum install emqx
systemctl start emqx
国内下载网址:https://mqttx.app/zh
安装包是exe文件,一路next安装即可
pip install paho-mqtt
MQTT服务端监听的TCP端口是1883 ,界面采用的是聊天工具的形式,简单直观好用
连接MQTT服务端
创建Topic
向指定Topic发送消息并接收
1.实例化客户端,CLIENT_ID是自定义的客户端ID,用来区分连接到服务端的客户端,可以不传入,Client类会自动生成一个ID
mqtt_client.Client(CLIENT_ID)
2.on_connect和on_connect_fail是自定义的建立连接和断开连接时要回调的函数
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connect Success Return Code {}".format(rc))
else:
print("Failed Connect, Return Code {}".format(rc))
def on_connect_fail(client, userdata):
print("Failed Connect")
client = mqtt_client.Client(CLIENT_ID)
client.on_connect = on_connect
client.on_connect_fail = on_connect_fail
3.向指定Topic发送消息
def send_msg(self, client):
for i in range(10):
msg = "第{}次打你丫的".format(i)
res = client.publish(TOPIC, msg)
if res[0] == 0:
print('SEND {} TO {} OVER'.format(msg, TOPIC))
4.订阅指定Topic接收消息
def subscribe(self, client):
def on_message(client, userdata, msg):
print(f"RECEIVED `{msg.payload.decode()}` FROM `{msg.topic}` TOPIC")
client.subscribe(TOPIC)
client.on_message = on_message
5.网络循环的阻塞形式运行客户端
client.loop_forever()
以下是完整代码
# -*- coding: utf-8 -*- # ------------------------------------------------------------------------------- # Name: mqtt_action # Description: # Author: CHEN # Date: 2022/7/13 # ------------------------------------------------------------------------------- from paho.mqtt import client as mqtt_client import random SERVER_IP = '192.168.8.140' PORT = 1883 TOPIC = "testtopic/123" CLIENT_ID = f'python-mqtt-{random.randint(0, 1000)}' class MqttAction: """ 1.连接到MQTT服务端 2.向服务端指定TOPIC发送消息 3.订阅指定TOPIC接收消息 """ def set_connect(self): # @client.connect_callback() def on_connect(client, userdata, flags, rc): if rc == 0: print("Connect Success Return Code {}".format(rc)) else: print("Failed Connect, Return Code {}".format(rc)) def on_connect_fail(client, userdata): print("Failed Connect") client = mqtt_client.Client(CLIENT_ID) client.on_connect = on_connect client.on_connect_fail = on_connect_fail client.connect(SERVER_IP, PORT) self.subscribe(client) return client def run(self): client = self.set_connect() self.send_msg(client) client.loop_forever() def send_msg(self, client): for i in range(10): msg = "第{}次打你丫的".format(i) res = client.publish(TOPIC, msg) if res[0] == 0: print('SEND {} TO {} OVER'.format(msg, TOPIC)) def subscribe(self, client): def on_message(client, userdata, msg): print(f"RECEIVED `{msg.payload.decode()}` FROM `{msg.topic}` TOPIC") client.subscribe(TOPIC) client.on_message = on_message if __name__ == '__main__': MqttAction().run()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。