赞
踩
WebHook 是由 emqx_web_hook (opens new window)插件提供的 将 EMQX 中的钩子事件通知到某个 Web 服务 的功能。
WebHook 的内部实现是基于 钩子,但它更靠近顶层一些。
它通过在钩子上的挂载回调函数,获取到 EMQX 中的各种事件,并转发至 emqx_web_hook 中配置的 Web 服务器。
以 客户端成功接入(client.connected) 事件为例,其事件的传递流程如下:
Client | EMQX | emqx_web_hook | HTTP +------------+
=============>| - - - - - - -> - - - - - - - ->===========> | Web Server |
| Broker | | Request +------------+
提示:WebHook 对于事件的处理是单向的,它仅支持将 EMQX 中的事件推送给 Web 服务,并不关心 Web 服务的返回。
借助 Webhook 可以完成设备在线、上下线记录,订阅与消息存储、消息送达确认等诸多业务。
Webhook 的配置文件位于 etc/plugins/emqx_web_hook.conf
,配置项的详细说明可以查看 配置项。
在 etc/plugins/emqx_web_hook.conf
可配置触发规则,其配置的格式如下:
## 格式示例
web.hook.rule.<Event>.<Number> = <Rule>
## 示例值
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}
目前支持以下事件:
名称 | 说明 | 执行时机 |
---|---|---|
client.connect | 处理连接报文 | 服务端收到客户端的连接报文时 |
client.connack | 下发连接应答 | 服务端准备下发连接应答报文时 |
client.connected | 成功接入 | 客户端认证完成并成功接入系统后 |
client.disconnected | 连接断开 | 客户端连接层在准备关闭时 |
client.subscribe | 订阅主题 | 收到订阅报文后,执行 client.check_acl 鉴权前 |
client.unsubscribe | 取消订阅 | 收到取消订阅报文后 |
session.subscribed | 会话订阅主题 | 完成订阅操作后 |
session.unsubscribed | 会话取消订阅 | 完成取消订阅操作后 |
message.publish | 消息发布 | 服务端在发布(路由)消息前 |
message.delivered | 消息投递 | 消息准备投递到客户端前 |
message.acked | 消息回执 | 服务端在收到客户端发回的消息 ACK 后 |
message.dropped | 消息丢弃 | 发布出的消息被丢弃后 |
同一个事件可以配置多个触发规则,配置相同的事件应当依次递增。
触发规则,其值为一个 JSON 字符串,其中可用的 Key 有:
例如,我们只将与 a/b/c
和 foo/#
主题匹配的消息转发到 Web 服务器上,其配置应该为:
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}
这样 Webhook 仅会转发与 a/b/c
和 foo/#
主题匹配的消息,例如 foo/bar
等,而不是转发 a/b/d
或 fo/bar
。
事件触发时 Webhook 会按照配置将每个事件组成一个 HTTP 请求发送到 url
所配置的 Web 服务器上。其请求格式为:
URL: <url> # 来自于配置中的 `url` 字段
Method: POST # 固定为 POST 方法
Body: <JSON> # Body 为 JSON 格式字符串
对于不同的事件,请求 Body 体内容有所不同,下表列举了各个事件中 Body 的参数列表:
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:“client_connect” |
clientid | string | 客户端 ClientId |
username | string | 客户端 Username,不存在时该值为 “undefined” |
ipaddress | string | 客户端源 IP 地址 |
keepalive | integer | 客户端申请的心跳保活时间 |
proto_ver | integer | 协议版本号 |
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:“client_connack” |
clientid | string | 客户端 ClientId |
username | string | 客户端 Username,不存在时该值为 “undefined” |
ipaddress | string | 客户端源 IP 地址 |
keepalive | integer | 客户端申请的心跳保活时间 |
proto_ver | integer | 协议版本号 |
conn_ack | string | “success” 表示成功,其它表示失败的原因 |
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:“client_connected” |
clientid | string | 客户端 ClientId |
username | string | 客户端 Username,不存在时该值为 “undefined” |
ipaddress | string | 客户端源 IP 地址 |
keepalive | integer | 客户端申请的心跳保活时间 |
proto_ver | integer | 协议版本号 |
connected_at | integer | 时间戳(秒) |
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:“client_disconnected” |
clientid | string | 客户端 ClientId |
username | string | 客户端 Username,不存在时该值为 “undefined” |
reason | string | 错误原因 |
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:“client_subscribe” |
clientid | string | 客户端 ClientId |
username | string | 客户端 Username,不存在时该值为 “undefined” |
topic | string | 将订阅的主题 |
opts | json | 订阅参数 |
Key | 类型 | 说明 |
---|---|---|
qos | enum | QoS 等级,可取 0 1 2 |
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:“client_unsubscribe” |
clientid | string | 客户端 ClientId |
username | string | 客户端 Username,不存在时该值为 “undefined” |
topic | string | 取消订阅的主题 |
session.subscribed:同 client.subscribe
,action 为 session_subscribed
session.unsubscribed:同 client.unsubscribe
,action 为 session_unsubscribe
session.terminated: 同 client.disconnected
,action 为 session_terminated
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:“message_publish” |
from_client_id | string | 发布端 ClientId |
from_username | string | 发布端 Username,不存在时该值为 “undefined” |
topic | string | 取消订阅的主题 |
qos | enum | QoS 等级,可取 0 1 2 |
retain | bool | 是否为 Retain 消息 |
payload | string | 消息 Payload |
ts | integer | 消息的时间戳(毫秒) |
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:“message_delivered” |
clientid | string | 接收端 ClientId |
username | string | 接收端 Username,不存在时该值为 “undefined” |
from_client_id | string | 发布端 ClientId |
from_username | string | 发布端 Username,不存在时该值为 “undefined” |
topic | string | 取消订阅的主题 |
qos | enum | QoS 等级,可取 0 1 2 |
retain | bool | 是否为 Retain 消息 |
payload | string | 消息 Payload |
ts | integer | 消息时间戳(毫秒) |
Key | 类型 | 说明 |
---|---|---|
action | string | 事件名称 固定为:“message_acked” |
clientid | string | 接收端 ClientId |
from_client_id | string | 发布端 ClientId |
from_username | string | 发布端 Username,不存在时该值为 “undefined” |
topic | string | 取消订阅的主题 |
qos | enum | QoS 等级,可取 0 1 2 |
retain | bool | 是否为 Retain 消息 |
payload | string | 消息 Payload |
ts | integer | 消息时间戳(毫秒) |
# 事件需要转发的目的服务器地址
web.hook.api.url = http://127.0.0.1:8991/mqtt/webhook
# 触发规则
web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
# 以下可以不开启,测试用用
web.hook.rule.client.subscribe.1 = {"action": "on_client_subscribe"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish","topic":"img/#"}
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; @RequestMapping("/mqtt") @RestController public class Controller_5 { private final static Logger logger = LoggerFactory.getLogger(Controller_5.class); private Map<String,Boolean> ztList=new HashMap<>(); @PostMapping("/webhook") public void webhook(@RequestBody() Map<String,Object> params) { logger.info("参数列表 {}",params); /** * 注意 action,clientid,事件名 的名称不能修改,否则匹配不上 */ String action = (String)params.get("action"); String clientid = (String)params.get("clientid"); if(action.equals("client_connected")) { logger.info("client:{} 上线",clientid); ztList.put(clientid,true); } if(action.equals("client_disconnected")) { logger.info("client:{} 下线",clientid); ztList.put(clientid,false); } } @RequestMapping("/ztList") public Map<String,Boolean> getZtList() { return ztList; } }
系统需要知道所有客户端当前的连接状态,方便在后台管理系统中进行直观展示
通过EMQX 的webhook将客户端的连接断开等事件通知到我们自建的服务上,通过事件类型获取客户端的连接状态,然后将客户端的连接状态进行存储,并且提供HTTP API供后台系统查询所有客户端的状态。
package com.itheima.controller.mqtt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.*; import java.util.HashMap; import java.util.Map; /** * */ @RestController @RequestMapping("/mqtt") public class WebHookController { private static final Logger log = LoggerFactory.getLogger(WebHookController.class); private Map<String,Boolean> clientStatus = new HashMap<>(); @PostMapping("/webhook") public void hook(@RequestBody Map<String,Object> params){ log.info("emqx 触发 webhook,请求体数据={}",params); String action = (String) params.get("action"); String clientId = (String) params.get("clientid"); if(action.equals("client_connected")){ log.info("客户端{}接入本系统",clientId); clientStatus.put(clientId,true); } if(action.equals("client_disconnected")){ log.info("客户端{}下线",clientId); clientStatus.put(clientId,false); } } @GetMapping("/allStatus") public Map getStatus(){ return this.clientStatus; } }
hook方法用来接收EMQ X传入过来的请求,将客户端Id的连接状态记录到map中,getAllStatus方法用来返回所有客户端状态。
然后通过客户端连接/断开EMQ X之后,通过访问 all 接口就能得到这些客户端得状态了。
当然了,在实际的项目中肯定就不会这么简单,我们会将这些客户端的状态存入类似redis这样的分布式缓存中,方便整个系统进行存取随时获取客户端状态。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。