赞
踩
MQTT属于是物联网的通信协议,在MQTT协议中有两大角色:客户端(发布者/订阅者),服务端(Mqtt broker);针对客户端和服务端需要有遵循该协议的的具体实现,EMQ/EMQX就是MQTT Broker的一种实现。
EMQX 基于 Erlang/OTP 平台开发的 MQTT 消息服务器,是开源社区中最流行的 MQTT 消息服务器。EMQ X 是开源百万级分布式 MQTT 消息服务器(MQTT Messaging Broker),用于支持各种接入标准 MQTT协议的设备,实现从设备端到服务器端的消息传递,以及从服务器端到设备端的设备控制消息转发。从而实现物联网设备的数据采集,和对设备的操作和控制。
到目前为止,比较流行的 MQTT Broker 有几个:
使用 C 语言实现的 MQTT Broker。Eclipse 组织还还包含了大量的 MQTT 客户端项目:
https://www.eclipse.org/paho/#
使用 Erlang 语言开发的 MQTT Broker,支持许多其他 IoT 协议比如 CoAP、LwM2M 等
使用 Node.JS 开发的 MQTT Broker,简单易用。
同样使用 Erlang 开发的 MQTT Broker
从支持 MQTT5.0、稳定性、扩展性、集群能力等方面考虑,EMQX 的表现应该是最好的。
与别的MQTT服务器相比EMQ X 主要有以下的特点:
● 经过100+版本的迭代,EMQ X 目前为开源社区中最流行的 MQTT 消息中间件,在各种客户严格的生产环境上经受了严苛的考验;
● EMQ X 支持丰富的物联网协议,包括 MQTT、MQTT-SN、CoAP、 LwM2M、LoRaWAN 和 WebSocket等;
● 优化的架构设计,支持超大规模的设备连接。企业版单机能支持百万的 MQTT 连接;集群能支持千万级别的 MQTT 连接;
● 易于安装和使用;
● 灵活的扩展性,支持企业的一些定制场景;
● 中国本地的技术支持服务,通过微信、QQ等线上渠道快速响应客户需求;
● 基于 Apache 2.0 协议许可,完全开源。EMQ X 的代码都放在 Github 中,用户可以查看所有源代码。
● EMQ X 3.0 支持 MQTT 5.0 协议,是开源社区中第一个支持 5.0协议规范的消息服务器,并且完全兼容
● MQTT V3.1 和 V3.1.1 协议。除了 MQTT 协议之外,EMQ X 还支持别的一些物联网协议
● 单机支持百万连接,集群支持千万级连接;毫秒级消息转发。EMQ X 中应用了多种技术以实现上述功
能
● 利用 Erlang/OTP 平台的软实时、高并发和容错(电信领域久经考验的语言)
● 全异步架构
● 连接、会话、路由、集群的分层设计
● 消息平面和控制平面的分离等
● 扩展模块和插件,EMQ X 提供了灵活的扩展机制,可以实现私有协议、认证鉴权、数据持久化、桥接发和管理控制台等的扩展
● 桥接:EMQ X 可以跟别的消息系统进行对接,比如 EMQ X Enterprise 版本中可以支持将消息转发到
Kafka、RabbitMQ 或者别的 EMQ 节点等
● 共享订阅:共享订阅支持通过负载均衡的方式在多个订阅者之间来分发 MQTT 消息。比如针对物联网等 数据采集场景,会有比较多的设备在发送数据,通过共享订阅的方式可以在订阅端设置多个订阅者来实现这几个订阅者之间的工作负载均衡
典型的物联网平台包括设备硬件、数据采集、数据存储、分析、Web / 移动应用等。EMQX 位于数据采集这一层,分别与硬件和数据存储、分析进行交互,是物联网平台的核心:前端的硬件通过 MQTT 协议与位于数据采集层的 EMQX 交互,通过 EMQX 将数据采集后,通过 EMQX 提供的数据接口,将数据保存到后台的持久化平台中(各种关系型数据库和 NOSQL 数据库),或者流式数据处理框架等,上层应用通过这些数据分析后得到的结果呈现给最终用户。
EMQX 公司主要提供三个产品,可在官网首页产品导航查看每一种产品;主要体现在支持的连接数量、产品功能和商业服务等方面的区别:
EMQX Broker:EMQX 开源版,完整支持 MQTT V3.1.1/V5.0 协议规范,完整支持 TCP、TLS、WebSocket 连接,支持百万级连接和分布式集群架构;LDAP, MySQL, Redis, MongoDB 等扩展插件集成,支持插件模式扩展服务器功能;支持跨 Linux、Windows、macOS 平台安装,支持公有云、私有云、K8S/容器部署
EMQX Enterprise:EMQX 企业版,在开源版基础上,支持物联网主流协议 MQTT、MQTT-SN、 CoAP/LwM2M、HTTP、WebSocket 一站式设备接入;JT-808/GBT-32960 等行业协议支持,基于 TCP/UDP私有协议的旧网设备接入兼容,多重安全机制与认证鉴权;高并发软实时消息路由;强大灵活的内置规则引擎;企业服务与应用集成;多种数据库持久化支持;消息变换桥接转发 Kafka;管理监控中心
EMQX Platform:EMQX 平台版,EMQ X Platform 是面向千万级超大型 IoT 网络和应用,全球首选电 信级物联网终端接入解决方案。千万级大容量;多物联网协议;电信级高可靠;卓越 5G 网络支持;跨云跨IDC 部署;兼容历史系统;完善的咨询服务(从咨询到运维)
完整的 MQTT V3.1/V3.1.1 及 V5.0 协议规范支持
QoS0, QoS1, QoS2 消息支持
持久会话与离线消息支持
Retained 消息支持
Last Will 消息支持
TCP/SSL 连接支持
MQTT/WebSocket/SSL 支持
HTTP 消息发布接口支持
$SYS/# 系统主题支持
客户端在线状态查询与订阅支持
客户端 ID 或 IP 地址认证支持
用户名密码认证支持
LDAP 认证
Redis、MySQL、PostgreSQL、MongoDB、HTTP 认证集成
浏览器 Cookie 认证
基于客户端 ID、IP 地址、用户名的访问控制 (ACL)
多服务器节点集群 (Cluster)
支持 manual、mcast、dns、etcd、k8s 等多种集群发现方式
网络分区自动愈合
消息速率限制
连接速率限制
按分区配置节点
多服务器节点桥接 (Bridge)
MQTT Broker 桥接支持
Stomp 协议支持
MQTT-SN 协议支持
CoAP 协议支持
Stomp/SockJS 支持
延时 Publish ($delay/topic)
Flapping 检测
黑名单支持
共享订阅 ($share/:group/topic)
TLS/PSK 支持
规则引擎
空动作 (调试)
消息重新发布
桥接数据到 MQTT Broker
检查 (调试)
发送数据到 Web 服务
EMQ X 目前支持的操作系统:
Centos6
Centos7
OpenSUSE tumbleweed
Debian 8
Debian 9
Debian 10
Ubuntu 14.04
Ubuntu 16.04
Ubuntu 18.04
macOS 10.13
macOS 10.14
macOS 10.15Windows Server 2019
产品部署建议 Linux 服务器,不推荐 Windows 服务器。
安装的方式有很多种,可供自由选择: Shell脚本安装、包管理器安装、二进制包安装、ZIP压缩包安装、Homebrew安装、Docker运行安装、Helm安装、源码编译安装 。
配置 EMQX Yum 源:
curl -s https://assets.emqx.com/scripts/install-emqx-rpm.sh | sudo bash
安装 EMQX:
sudo yum install emqx -y
启动 EMQX:
sudo systemctl start emqx
通过访问服务端18083端口
地址:http://192.168.200.129:18083
默认用户名:admin,默认密码:public
获取 Docker 镜像:
docker pull emqx/emqx-enterprise:5.5.0
启动 Docker 容器:
docker run -d --name emqx-enterprise -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx-enterprise:5.5.0
仪表盘查看基本信息
身份认证是大多数应用的重要组成部分,MQTT 协议支持用户名密码认证,启用身份认证能有效阻止非法客户端的连接。
EMQ X 中的认证指的是当一个客户端连接到 EMQ X 的时候,通过服务器端的配置来控制客户端连接服务器的权限。
EMQ X 的认证支持包括两个层面:
● MQTT 协议本身在 CONNECT 报文中指定用户名和密码,EMQ X 以插件形式支持基于 Username、ClientID、HTTP、JWT、LDAP 及各类数据库如 MongoDB、MySQL、PostgreSQL、Redis 等多种形式的认证。
● 在传输层上,TLS 可以保证使用客户端证书的客户端到服务器的身份验证,并确保服务器向客户端验证服务器证书。也支持基于 PSK 的 TLS/DTLS 认证。
EMQ X 支持使用内置数据源(文件、内置数据库)、JWT、外部主流数据库和自定义 HTTP API 作为身份认证数据源。
连接数据源、进行认证逻辑通过插件实现的,每个插件对应一种认证方式,使用前需要启用相应的插件。
客户端连接时插件通过检查其 username/clientid 和 password 是否与指定数据源的信息一致来实现对客户端的身份认证。 (v5.0以上默认集成)
EMQ X 支持的认证方式:
内置数据源
● Username 认证
● Cliend ID 认证
使用配置文件与 EMQ X 内置数据库提供认证数据源,通过 HTTP API 进行管理,足够简单轻量。
外部数据库
● LDAP 认证
● MySQL 认证
● PostgreSQL 认证
● Redis 认证
● MongoDB 认证
外部数据库可以存储大量数据,同时方便与外部设备管理系统集成。
其他
● HTTP 认证
● JWT 认证
JWT 认证可以批量签发认证信息,HTTP 认证能够实现复杂的认证鉴权逻辑。 更改插件配置后需要重启插件才能生效,部分认证鉴权插件包含 ACL 功能。
认证结果
任何一种认证方式最终都会返回一个结果:
● 认证成功:经过比对客户端认证成功
● 认证失败:经过比对客户端认证失败,数据源中密码与当前密码不一致
● 忽略认证(ignore):当前认证方式中未查找到认证数据,无法显式判断结果是成功还是失败,交由认证链下一认证方式或匿名认证来判断
匿名认证 (v5.0以上,如果没有开启任何认证器,默认使用匿名认证)
EMQ X 默认配置中启用了匿名认证,任何客户端都能接入 EMQ X。没有启用认证插件或认证插件没有显式允许/拒绝(ignore)连接请求时,EMQ X 将根据匿名认证启用情况决定是否允许客户端连接。
配置匿名认证开关:
# etc/emqx.conf
## Value: true | false
allow_anonymous = true
注意:我们需要进入到容器内部修改该配置,然后重启EMQ X服务
新建客户端
发布端发布消息
订阅端订阅消息
可以订阅多个主题
安装 paho-mqtt:
pip install paho-mqtt
导入 Paho MQTT 客户端:
from paho.mqtt import client as mqtt_client
设置 MQTT Broker 连接地址,端口以及 topic,同时我们调用 Python random.randint
函数随机生成 MQTT 客户端 id。
broker = '192.168.101.130'
port = 1883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
# 如果 broker 需要鉴权,设置用户名密码
username = 'test'
password = 'test'
编写连接回调函数 on_connect
,该函数将在客户端连接后被调用,在该函数中可以依据 rc
来判断客户端是否连接成功。
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
设置 MQTT Broker 连接地址,端口以及 topic,同时我们调用 Python random.randint
函数随机生成 MQTT 客户端 id。
broker = '192.168.101.130'
port = 1883
topic = 'python/mqtt'
client_id = f'python-mqtt-{random.randint(0, 1000)}'
# 如果 broker 需要鉴权,设置用户名密码
username = 'test'
password = 'test'
设置 CA 证书,如果您使用 Serverless 或者基础版部署,您可以在部署概览中下载 CA 证书文件。如果您使用专业版部署,请参考专业版 TLS/SSL 配置进行证书配置。
编写连接回调函数 on_connect
,该函数将在客户端连接后被调用,在该函数中可以依据 rc
来判断客户端是否连接成功。
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
client = mqtt_client.Client(client_id)
# Set CA certificate
client.tls_set(ca_certs='./server-ca.crt')
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
设置将要订阅的主题及对应 QoS 等级。
编写消息回调函数 on_message
,该函数将在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic=topic, qos=0)
client.on_message = on_message
通过以下代码取消订阅,此时应指定取消订阅的主题。
def unsubscribe(client: mqtt_client):
client.on_message = None
client.unsubscribe(topic)
发布消息时需要告知 MQTT Broker 目标主题及消息内容。
首先定义一个 while 循环语句,在循环中我们将设置每秒调用 MQTT 客户端 publish
函数向 python/mqtt
主题发送消息。
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
通过以下代码指定客户端对消息事件进行监听,并在收到消息后执行回调函数,将接收到的消息及其主题打印到控制台。
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.on_message = on_message
如客户端希望主动断开连接,可以通过如下代码实现:
def disconnect(client: mqtt_client):
client.loop_stop()
client.disconnect()
消息发布代码
import random import time import paho.mqtt.client as mqtt_client broker = '192.168.101.130' port = 1883 topic = "python/mqtt" # 随机生成带有前缀的客户端ID client_id = f'python-mqtt-{random.randint(0, 100)}' # 如果 broker 需要鉴权,设置用户名密码 username = 'test' password = 'test' def publish(client): msg_count = 0 while True: time.sleep(5) # 发送的消息(message) msg = f"messages: {msg_count}" # 调用库中方法public()进行发布,会返回一个列表 result = client.publish(topic, msg) # 列表的第一个元素返回的是请求是否成功,然后作判断 status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1 # 连接的回调方法 def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc, *args): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2,client_id) # client.tls_set(ca_certs='./server-ca.crt') client.username_pw_set(username, password) client.on_connect = on_connect client.connect(broker, port) return client def run(): client = connect_mqtt() client.loop_start() publish(client) if __name__ == '__main__': run()
消息订阅代码
import random from paho.mqtt import client as mqtt_client broker = '192.168.101.130' port = 1883 topic = "python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0, 100)}' # 如果 broker 需要鉴权,设置用户名密码 username = 'test' password = 'test' def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc, *args): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2,client_id) # client.tls_set(ca_certs='./server-ca.crt') client.username_pw_set(username, password) client.on_connect = on_connect client.connect(broker, port) return client def subscribe(client: mqtt_client): def on_message(client, userdata, msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") client.subscribe(topic) client.on_message = on_message def run(): client = connect_mqtt() subscribe(client) client.loop_forever() if __name__ == '__main__': run()
测试验证
消息发布:
消息订阅:
t.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received {msg.payload.decode()}
from {msg.topic}
topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if name == ‘main’:
run()
测试验证
消息发布:
[外链图片转存中…(img-WPrIwHDI-1714120209815)]
消息订阅:
[外链图片转存中…(img-VtTAjSWB-1714120209816)]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。