赞
踩
概要设计
带着问题阅读
架构图
基础概念
topic通配符
在MQTT协议里,过滤规则==Topic
主题名(Topic Name)用于识别消息应该被发布到哪一个会话,服务端发送给订阅客户端的 Publish 报文的主题名必须匹配该订阅的主题过滤器。Topic是不要预先创建的,发布者发送消息到某个主题、或者订阅者订阅某个主题的时候,Broker 就会自动创建这个主题。
Topic有层级结构,并且支持通配符+和#:
“+” 是匹配单层的通配符。比如 news/+ 可以匹配 news/sports,news/+/basketball 可匹配到 news/sports/basketball。
“#” 是一到多层的通配符。比如 news/# 可以匹配 news、 news/sports、news/sports/basketball 以及 news/sports/basketball/x 等等。
物联平台
设备接入EMQX
设备注册
认证配置
创建device表
SQL CREATE TABLE `device` ( `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT, `product_code` VARCHAR(32) NOT NULL DEFAULT '' COMMENT '产品编号' , `device_code` VARCHAR(32) NOT NULL COMMENT '设备编号(系统生成)' , `device_name` VARCHAR(64) NOT NULL COMMENT '设备名称' , `username` VARCHAR(128) NOT NULL COMMENT '设备连接broker时的用户名' , `password` VARCHAR(32) NOT NULL COMMENT '设备连接broker时的密码' , `status` VARCHAR(20) NOT NULL DEFAULT 'active' COMMENT '设备状态' , PRIMARY KEY (`id`) USING BTREE ) COMMENT='设备表' COLLATE='utf8mb4_bin' ENGINE=InnoDB AUTO_INCREMENT=9 ; |
配置EMQX
其中SQL:
SQL SELECT password FROM device where username = ${username} and status = 'active' LIMIT 1 |
从表device中根据mqtt client username查询status='active'的记录;
如果不存在则认证失败,拒绝client连接;
如果存在则比较表中password和mqtt client password是否相同,如果相同,则认证通过。
备注:
可使用MQTTX客户端工具连接broker
如连接成功则说明设备认证通过
设备数据转发
本章节描述设备发布到MQTT的数据如何传递到物联平台
采用emqx webhook方案
物联平台提供http接口
POST方法的http接口,例如
Java @PostMapping("/hook") public String hook(@RequestBody JSONObject event) { log.info("receive new event:{}", event); hook.dispatcher(event); return "success"; } |
并根据参数不同,分别处理,主要有连接事件、断开事件、发布事件
EMQX配置webhook
EMQX配置转发规则
创建规则按钮:
SQL: SELECT * FROM "$events/client_connected"
服务端收到的requst body示例:
JSON { "conn_props": {}, "peername": "10.182.63.59:57024", "metadata": { "rule_id": "client_connected" }, "clientid": "mqttx_463f1a5b", "is_bridge": false, "keepalive": 60, "proto_ver": 5, "proto_name": "MQTT", "connected_at": 1663553836180, "receive_maximum": 32, "sockname": "10.206.98.18:1883", "mountpoint": "undefined", "node": "emqx@127.0.0.1", "expiry_interval": 0, "event": "client.connected", "username": "test/9cbbb8cc-92ec-4a72-8a2e-3d0244628430", "timestamp": 1663553836180, "clean_start": false } |
SQL: SELECT * FROM "$events/client_disconnected"
服务端收到的requst body示例:
JSON { "reason": "normal", "peername": "10.182.63.59:50173", "metadata": { "rule_id": "client_disconnected" }, "clientid": "mqttx_463f1a5b", "proto_ver": 5, "proto_name": "MQTT", "sockname": "10.206.98.18:1883", "disconn_props": {}, "node": "emqx@127.0.0.1", "event": "client.disconnected", "disconnected_at": 1663553735295, "username": "test/9cbbb8cc-92ec-4a72-8a2e-3d0244628430", "timestamp": 1663553735296 } |
SQL: SELECT * FROM "#"
服务端收到的requst body示例:
JSON { "metadata": { "rule_id": "client_publish" }, "peerhost": "10.182.63.59", "clientid": "mqttx_463f1a5b", "flags": { "retain": false, "dup": false }, "node": "emqx@127.0.0.1", "qos": 0, "payload": "{\r\n \"cpu\":8\r\n}", "pub_props": {}, "publish_received_at": 1663553920287, "topic": "/client/publish", "id": "0005E8FE547BC2FBF4432A0066F50003", "event": "message.publish", "username": "test/9cbbb8cc-92ec-4a72-8a2e-3d0244628430", "timestamp": 1663553920287 } |
优点
配置简单、方便
缺点
如果服务端异常,则会丢失数据。当服务端重启时,mqtt和服务端的连接会断开,当服务端重启完成后,连接也不会重新建立,需要手动执行Enable。
解决方案:使用nginx做负载均衡,既能提供稳定的服务端,又可以部署多个服务端实例,防止单点故障。
备注:作为client订阅mqtt系统内部topic,实现相对复杂,暂不采用
设备上线/下线
设备连接信息保存在表device和表device_connection
当webhook接收到设备连接事件时,修改device为在线状态、创建或修改连接信息
当webhook接收到设备断开事件时,修改device为离线状态、修改连接信息
备注:
为什么记录device_connection?
表device描述的是逻辑上的设备,可能有很多物联设备使用相同的设备信息连接broker
设备禁用/恢复
设备是否禁用保存在表device,使用status字段表示,有active和suspended,即启用和禁用两种状态
设备禁用
使用对应的username和password将无法连接到mqtt,这是因为emqx中配置的client 认证中的SQL中过滤了status='active'的记录。
通过emqx提供的rest api接口来实现:
访问rest api 接口需要提前创建app和app secret key用于访问认证:
设备恢复
更新device表status字段为active
设备删除
设备授权
本章节描述设备授权,授权指的是设备发布权限和订阅权限,避免设备随机发布等情况的发生。
参考官方文档:MySQL
创建设备授权表(device_acl)
SQL CREATE TABLE `device_acl` ( `id` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT, `username` VARCHAR(128) NOT NULL COMMENT 'client的username', `permission` VARCHAR(20) NOT NULL COMMENT '用于指定操作权限,可选值有 allow 和 deny' , `action` VARCHAR(20) NOT NULL COMMENT '用于指定适用于哪些操作,可选值有 publish、subscribe 和 all' , `topic` VARCHAR(256) NOT NULL COMMENT '用于指定当前规则适用的主题,可以使用主题过滤器和 主题占位符', PRIMARY KEY (`id`) USING BTREE, INDEX `username_idx` (`username`) USING BTREE ) COMMENT='设备授权表' COLLATE='utf8mb4_general_ci' ENGINE=InnoDB AUTO_INCREMENT=9 ; |
EMQX授权配置
生产环境要开启缓存功能
SELECT action, permission, topic FROM device_acl where username = ${username}
物联平台实现授权功能
注册设备时,新增授权
删除设备时,取消授权
上行数据
作为物联网平台首先要做的就是可以接收并处理设备上传的数据,称之为
上行数据。
topic
设备发布的主题名格式为: upload_data/${productCode}/${deviceCode}/${messageId}
示例:upload_data/computer/c1/0000001
物联平台处理设备发布事件
当event是【message.publish】时,物联平台解析payload中的设备属性、存储属性值到device_property表。
下行数据
一般有两种订阅:
给单车开锁的指令。
大多数情况下,设备在收到指令后都应该回复指令执行的结果,比如文件有没有下载完毕、继电器有没有打开等。是否回复以及如何回复是业务系统和设备之间的约定,物联网平台只负责将下发指令到设备,同时将设备的回复再传送回业务系统。
指令下发
设备订阅
利用EMQX的自动订阅功能。当MQTT client连接到Broker时,EMQX会按照预先定义好的规则自动为client订阅主题,设备不需要主动subscribe,增加和减少主题也不需要改动设备的代码。
参考官方文档: 自动订阅
配置如下:
Nginx # 自动订阅 auto_subscribe { topics = [ { topic = "cmd/${username}/+/+/+/#" qos = 1 rh = 0 rap = 0 nl = 0 } ] } |
${username} 即 ${productCode}/${deviceCode}
例如:username(test/954862b4-5ba9-436f-a39e-f939aeacc3c3)连接broker后,会自动订阅topic(test/954862b4-5ba9-436f-a39e-f939aeacc3c3/+/+/+/#)
常见问题
如果clientId包含斜杠,会被误认为URI,导致异常。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。