当前位置:   article > 正文

IOT实战-基于MQTTX搭建物联网平台_mqtt物联网平台

mqtt物联网平台

概要设计

带着问题阅读

  1. 怎么对设备鉴权?设备是否合法?设备能否发布到目标topic?设备能否订阅目标topic
  1. 设备数据如何从broker传递到物联平台?
  1. 物联平台如何下发指令给设备?
  1. 定时触发的规则如何实现动态配置?
  1. 规则中的条件如何检验是否满足?

架构图

基础概念

topic通配符

在MQTT协议里,过滤规则==Topic

主题名(Topic Name)用于识别消息应该被发布到哪一个会话,服务端发送给订阅客户端的 Publish 报文的主题名必须匹配该订阅的主题过滤器。Topic是不要预先创建的,发布者发送消息到某个主题、或者订阅者订阅某个主题的时候,Broker 就会自动创建这个主题。

Topic有层级结构,并且支持通配符+和#:

“+” 是匹配单层的通配符。比如 news/+ 可以匹配 news/sports,news/+/basketball 可匹配到 news/sports/basketball。

“#” 是一到多层的通配符。比如 news/# 可以匹配 news、 news/sports、news/sports/basketball 以及 news/sports/basketball/x 等等。

物联平台

设备接入EMQX

设备注册

  1. 物联平台提供要设备注册接口,注册成功后写入device表,包含设备接入MQTT的username和password
  1. 设备携带username和password通过sdk连接emqx
  1. emqx执行设备认证:检查device表中存在匹配的username和password则认证通过,反之拒绝

认证配置

创建device

SQL

CREATE TABLE `device` (

        `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,

        `product_code` VARCHAR(32) NOT NULL DEFAULT '' COMMENT '产品编号' ,

        `device_code` VARCHAR(32) NOT NULL COMMENT '设备编号(系统生成)' ,

        `device_name` VARCHAR(64) NOT NULL COMMENT '设备名称' ,

        `username` VARCHAR(128) NOT NULL COMMENT '设备连接broker时的用户名' ,

        `password` VARCHAR(32) NOT NULL COMMENT '设备连接broker时的密码' ,

        `status` VARCHAR(20) NOT NULL DEFAULT 'active' COMMENT '设备状态' ,

        PRIMARY KEY (`id`) USING BTREE

)

COMMENT='设备表'

COLLATE='utf8mb4_bin'

ENGINE=InnoDB

AUTO_INCREMENT=9

;

配置EMQX

  1. 创建认证

其中SQL:

SQL

SELECT password FROM device

where username = ${username} and status = 'active' LIMIT 1

从表device中根据mqtt client username查询status='active'的记录;

如果不存在则认证失败,拒绝client连接;

如果存在则比较表中password和mqtt client password是否相同,如果相同,则认证通过。

备注:

可使用MQTTX客户端工具连接broker

如连接成功则说明设备认证通过

设备数据转发

本章节描述设备发布到MQTT的数据如何传递到物联平台

采用emqx webhook方案

物联平台提供http接口

POST方法的http接口,例如

Java

@PostMapping("/hook")

public String hook(@RequestBody JSONObject event) {

    log.info("receive new event:{}", event);

    hook.dispatcher(event);

    return "success";

}

并根据参数不同,分别处理,主要有连接事件、断开事件、发布事件

EMQX配置webhook

  1. 新建【DataBridge】

  1. 选择webhook,点击Next

  1. 配置webhook,自定义webhook名称(例如iot-platform),POST方法,注意清空Body的过滤条件。

EMQX配置转发规则

创建规则按钮:

  1. 配置设备连接事件转发规则

SQL: SELECT * FROM "$events/client_connected"

服务端收到的requst body示例:

JSON

{

        "conn_props": {},

        "peername": "10.182.63.59:57024",

        "metadata": {

                "rule_id": "client_connected"

        },

        "clientid": "mqttx_463f1a5b",

        "is_bridge": false,

        "keepalive": 60,

        "proto_ver": 5,

        "proto_name": "MQTT",

        "connected_at": 1663553836180,

        "receive_maximum": 32,

        "sockname": "10.206.98.18:1883",

        "mountpoint": "undefined",

        "node": "emqx@127.0.0.1",

        "expiry_interval": 0,

        "event": "client.connected",

        "username": "test/9cbbb8cc-92ec-4a72-8a2e-3d0244628430",

        "timestamp": 1663553836180,

        "clean_start": false

}

  1. 配置设备断开事件转发规则

SQL: SELECT * FROM "$events/client_disconnected"

服务端收到的requst body示例:

JSON

{

        "reason": "normal",

        "peername": "10.182.63.59:50173",

        "metadata": {

                "rule_id": "client_disconnected"

        },

        "clientid": "mqttx_463f1a5b",

        "proto_ver": 5,

        "proto_name": "MQTT",

        "sockname": "10.206.98.18:1883",

        "disconn_props": {},

        "node": "emqx@127.0.0.1",

        "event": "client.disconnected",

        "disconnected_at": 1663553735295,

        "username": "test/9cbbb8cc-92ec-4a72-8a2e-3d0244628430",

        "timestamp": 1663553735296

}

  1. 配置设备发布事件转发规则

SQL: SELECT * FROM "#"

服务端收到的requst body示例:

JSON

{

        "metadata": {

                "rule_id": "client_publish"

        },

        "peerhost": "10.182.63.59",

        "clientid": "mqttx_463f1a5b",

        "flags": {

                "retain": false,

                "dup": false

        },

        "node": "emqx@127.0.0.1",

        "qos": 0,

        "payload": "{\r\n    \"cpu\":8\r\n}",

        "pub_props": {},

        "publish_received_at": 1663553920287,

        "topic": "/client/publish",

        "id": "0005E8FE547BC2FBF4432A0066F50003",

        "event": "message.publish",

        "username": "test/9cbbb8cc-92ec-4a72-8a2e-3d0244628430",

        "timestamp": 1663553920287

}

优点

配置简单、方便

缺点

如果服务端异常,则会丢失数据。当服务端重启时,mqtt和服务端的连接会断开,当服务端重启完成后,连接也不会重新建立,需要手动执行Enable。

解决方案:使用nginx做负载均衡,既能提供稳定的服务端,又可以部署多个服务端实例,防止单点故障。

备注:作为client订阅mqtt系统内部topic,实现相对复杂,暂不采用

设备上线/下线

设备连接信息保存在表device和表device_connection

当webhook接收到设备连接事件时,修改device为在线状态、创建或修改连接信息

当webhook接收到设备断开事件时,修改device为离线状态、修改连接信息

备注:

为什么记录device_connection?

表device描述的是逻辑上的设备,可能有很多物联设备使用相同的设备信息连接broker

设备禁用/恢复

设备是否禁用保存在表device,使用status字段表示,有active和suspended,即启用和禁用两种状态

设备禁用

  1. 更新device表status字段为suspended

使用对应的username和password将无法连接到mqtt,这是因为emqx中配置的client 认证中的SQL中过滤了status='active'的记录。

  1. 断开设备连接

通过emqx提供的rest api接口来实现:

访问rest api 接口需要提前创建app和app secret key用于访问认证:

设备恢复

更新device表status字段为active

设备删除

  1. 删除设备记录
  1. 删除设备连接记录
  1. 断开设备与mqtt的连接(参考【设备禁用】断开设备连接)

设备授权

本章节描述设备授权,授权指的是设备发布权限和订阅权限,避免设备随机发布等情况的发生。

参考官方文档:MySQL

创建设备授权表(device_acl

SQL

CREATE TABLE `device_acl` (

        `id` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,

        `username` VARCHAR(128) NOT NULL COMMENT 'client的username',

        `permission` VARCHAR(20) NOT NULL COMMENT '用于指定操作权限,可选值有 allow 和 deny' ,

        `action` VARCHAR(20) NOT NULL COMMENT '用于指定适用于哪些操作,可选值有 publish、subscribe 和 all' ,

        `topic` VARCHAR(256) NOT NULL COMMENT '用于指定当前规则适用的主题,可以使用主题过滤器和 主题占位符',

        PRIMARY KEY (`id`) USING BTREE,

        INDEX `username_idx` (`username`) USING BTREE

)

COMMENT='设备授权表'

COLLATE='utf8mb4_general_ci'

ENGINE=InnoDB

AUTO_INCREMENT=9

;

EMQX授权配置

  1. 打开授权配置

  1. 配置授权如下:

生产环境要开启缓存功能

  1. 配置ACL

  1. 选择MySQL

  1. 配置MySQL ACL如下

SELECT action, permission, topic FROM device_acl where username = ${username}

物联平台实现授权功能

注册设备时,新增授权

删除设备时,取消授权

上行数据

作为物联网平台首先要做的就是可以接收并处理设备上传的数据,称之为

上行数据。

topic

设备发布的主题名格式为: upload_data/${productCode}/${deviceCode}/${messageId}

  1. productCode:产品编号
  1. deviceCode:设备编号
  1. messageId:每个消息的唯一ID

示例:upload_data/computer/c1/0000001

物联平台处理设备发布事件

当event是【message.publish】时,物联平台解析payload中的设备属性、存储属性值到device_property表。

下行数据

一般有两种订阅:

  1. 需要同步的数据,比如发送图片给设备
  1. 第二种是指令,平台下发给设备,要求设备完成某种操作,比如共享单车的服务端下发

给单车开锁的指令。

大多数情况下,设备在收到指令后都应该回复指令执行的结果,比如文件有没有下载完毕、继电器有没有打开等。是否回复以及如何回复是业务系统和设备之间的约定,物联网平台只负责将下发指令到设备,同时将设备的回复再传送回业务系统。

指令下发

  1. 物联平台将修改属性的指令传递到设备订阅的topic中(通过调用EMQX rest api来实现)。
  1. 设备接收到topic传来的指令,执行指令。
  1. 指令执行完成后,发布指令完成的信息到特性的topic中,用于通知物联平台指令执行完成。
  1. 物联平台通过webhook接受到指令执行完成的消息后,修改device_property中保存的设备属性值。

设备订阅

利用EMQX的自动订阅功能。当MQTT client连接到Broker时,EMQX会按照预先定义好的规则自动为client订阅主题,设备不需要主动subscribe,增加和减少主题也不需要改动设备的代码。

参考官方文档: 自动订阅

配置如下:

Nginx

# 自动订阅

auto_subscribe {

    topics = [

        {

            topic = "cmd/${username}/+/+/+/#"

            qos   = 1

            rh    = 0

            rap   = 0

            nl    = 0

        }

    ]

}

${username} 即 ${productCode}/${deviceCode}

例如:username(test/954862b4-5ba9-436f-a39e-f939aeacc3c3)连接broker后,会自动订阅topic(test/954862b4-5ba9-436f-a39e-f939aeacc3c3/+/+/+/#)

常见问题

  1. 设备发布数据后,平台未显示相应数据,可能是因为topic获取,请检查产品编码和设备编码是否正确
  1. 设备连接时可自定义clientId,但是不能包含斜杠''/",这是因为平台在断开连接时会调用emqx的rest接口:

如果clientId包含斜杠,会被误认为URI,导致异常。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/544685
推荐阅读
相关标签
  

闽ICP备14008679号