赞
踩
提要:本人在物联网开发中,需要使用MQTT技术,以此记录,也可让新手系统快速的入门应用。本文应用为双向订阅发布通信。
MQTT简介、基础、认识有很多相关文章,本文不再赘述,但对于没有方向的同学,可以参考我的学习路线,推荐MQTT官网了解相关原理知识–MQTT官网,再看**官网推荐相关系列文章,即可使任何人都可以快速掌握MQTT,而无需阅读整个MQTT规范。它解释了MQTT概念的核心,其功能和其他基本信息。**
个人总结:
(https://www.emqx.cn/downloads#broker),
1.1 找到对应系统,下载对应版本。比如Windows,以下以此为例
或者Ubuntu对应下图操作即可
1.2 将 emqx-windows-4.2.9.zip解压,本文解压至桌面C:\Users\Administrator\Desktop\emqx
1.3 在windows命令行,进入你解压的 emqx\bin 目录下
1.4 在emqx\bin目录下,输入启动服务命令 emqx.cmd start
,查询端口是否正在监听netstat -ano | findstr "1883"
,出现下图即成功
1.5 启动服务器后,可以使用网页查看相应状态,包括主题。发送的数据等,浏览器打开http://127.0.0.1:18083,使用默认管理账号admin/public即可登录查看,在设置里还能改成中文界面。
pip install paho-mqtt
此代码为双向通信,即一端即为发布者又为订阅者,根据主题不同来区分。如果不需要双向通信,可以不写对应回调函数,以及对应推送或者订阅方法。
根据个人需求更改发布者数据:topic主题,订阅者订阅此主题,即可接到发布者发布的数据;sensor_data即为发送的数据,可以为任意格式,根据个人需求更改
# 发布MQTT信息
sensor_data = "ni hao ......from topic-demo"
client.publish(topic="demo", payload=sensor_data, qos=2)
根据个人需求更改订阅者数据:topic主题,此为test主题,订阅者订阅此主题,即可接到发布者发布的数据;
# 订阅主题 实现双向通信中接收功能,qs质量等级为2
client.subscribe(("test", 2))
import paho.mqtt.client as mqtt import time # 当代理响应订阅请求时被调用。 def on_connect(client, userdata, flags, rc): if rc == 0: print("连接成功") print("Connected with result code " + str(rc)) # 当代理响应订阅请求时被调用 def on_subscribe(client, userdata, mid, granted_qos): print("Subscribed: " + str(mid) + " " + str(granted_qos)) # 当使用使用publish()发送的消息已经传输到代理时被调用。 def on_publish(client, obj, mid): print("OnPublish, mid: " + str(mid)) # 当收到关于客户订阅的主题的消息时调用。 message是一个描述所有消息参数的MQTTMessage。 def on_message(client, userdata, msg): print(msg.topic + " " + str(msg.payload)) # 当客户端有日志信息时调用 def on_log(client, obj, level, string): print("Log:" + string) # 实例化 client = mqtt.Client() # client.username_pw_set("admin", "password") # 回调函数 client.on_connect = on_connect client.on_subscribe = on_subscribe client.on_message = on_message client.on_log = on_log # host为启动的broker地址 举例本机启动的ip 端口默认1883 client.connect(host="127.0.0.1", port=1883, keepalive=6000) # 订阅频道 time.sleep(1) # 多个主题采用此方式 # client.subscribe([("demo", 0), ("test", 2)]) # 订阅主题 实现双向通信中接收功能,qs质量等级为2 client.subscribe(("test", 2)) client.loop_start() i = 0 while True: try: # 发布MQTT信息 sensor_data = "ni hao ......from topic-demo" # 消息将会发送给代理,并随后从代理发送到订阅匹配主题的任何客户端。 # publish(topic, payload=None, qos=0, retain=False) # topic:该消息发布的主题 # payload:要发送的实际消息。如果没有给出,或设置为无,则将使用零长度消息。 传递int或float将导致有效负载转换为表示该数字的字符串。 如果你想发送一个真正的int / float,使用struct.pack()来创建你需要的负载 # qos:服务的质量级别 对于Qos级别为1和2的消息,这意味着已经完成了与代理的握手。 对于Qos级别为0的消息,这只意味着消息离开了客户端。 # retain:如果设置为True,则该消息将被设置为该主题的“最后已知良好” / 保留的消息 client.publish(topic="demo", payload=sensor_data, qos=2) time.sleep(5) # i += 1 except KeyboardInterrupt: print("EXIT") # 这是网络循环的阻塞形式,直到客户端调用disconnect()时才会返回。它会自动处理重新连接。 client.disconnect()
import paho.mqtt.client as mqtt import time # 当代理响应订阅请求时被调用。 def on_connect(client, userdata, flags, rc): if rc == 0: print("连接成功") print("Connected with result code " + str(rc)) # 当代理响应订阅请求时被调用 def on_subscribe(client, userdata, mid, granted_qos): print("Subscribed: " + str(mid) + " " + str(granted_qos)) # 当使用使用publish()发送的消息已经传输到代理时被调用。 def on_publish(client, obj, mid): print("OnPublish, mid: " + str(mid)) # 当收到关于客户订阅的主题的消息时调用。 message是一个描述所有消息参数的MQTTMessage。 def on_message(client, userdata, msg): print(msg.topic + " " + str(msg.payload)) # 当客户端有日志信息时调用 def on_log(client, obj, level, string): print("Log:" + string) # 实例化 client = mqtt.Client() # client.username_pw_set("admin", "password") # 回调函数 client.on_connect = on_connect client.on_subscribe = on_subscribe client.on_message = on_message client.on_log = on_log # host为启动的broker地址 举例本机启动的ip 端口默认1883 client.connect(host="127.0.0.1", port=1883, keepalive=6000) # 订阅频道 time.sleep(1) # 多个主题采用此方式 # client.subscribe([("demo", 0), ("test", 2)]) # 订阅主题 实现双向通信中接收功能,qs质量等级为2 client.subscribe(("demo", 2)) client.loop_start() i = 0 while True: try: # 发布MQTT信息 sensor_data = "hello word......from topic-test" # 消息将会发送给代理,并随后从代理发送到订阅匹配主题的任何客户端。 # publish(topic, payload=None, qos=0, retain=False) # topic:该消息发布的主题 # payload:要发送的实际消息。如果没有给出,或设置为无,则将使用零长度消息。 传递int或float将导致有效负载转换为表示该数字的字符串。 如果你想发送一个真正的int / float,使用struct.pack()来创建你需要的负载 # qos:服务的质量级别 对于Qos级别为1和2的消息,这意味着已经完成了与代理的握手。 对于Qos级别为0的消息,这只意味着消息离开了客户端。 # retain:如果设置为True,则该消息将被设置为该主题的“最后已知良好” / 保留的消息 client.publish(topic="test", payload=sensor_data, qos=2) time.sleep(5) # i += 1 except KeyboardInterrupt: print("EXIT") # 这是网络循环的阻塞形式,直到客户端调用disconnect()时才会返回。它会自动处理重新连接。 client.disconnect()
A端成功运行后如下图
B端成功运行后如下图
如果启动失败,如下图的话,基本属于没有启动Broker,按照二准备条件里面的1走一遍即可,再不行就把防火墙全部关闭。
本demo仅在局域网学习使用,真实项目中把Broker部署在公网服务器上,订阅者和发布者不需要知道彼此,只需要找到公网服务器上的Broker,Broker根据主题,进行消息转发,完成订阅发布通信,注意MQTT设置账号密码,保证不被他人获取数据,保证数据安全性。
欢迎各位交流,如有错误,欢迎指正,Figting!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。