赞
踩
RabbitMQ 3.12 中发布的原生 MQTT 为物联网用例提供了显著的可扩展性和性能改进。
RabbitMQ 3.13 将支持 MQTT 5.0,因此将成为我们使 RabbitMQ 成为领先的 MQTT 代理之一的下一个重要步骤。
这篇博文解释了如何在 RabbitMQ 中使用新的 MQTT 5.0 功能。
MQTT 是物联网 (IoT) 的标准协议。
物联网远程设备在连接到代理时网络质量可能较差。 因此,MQTT是轻量级的:MQTT协议头很小,可以节省网络带宽。
由于物联网设备可能经常断开连接并重新连接(想象一下一辆汽车驶过隧道),MQTT 也很高效:与其他消息传递协议相比,客户端通过更短的握手进行连接和身份验证。
MQTT协议已经存在了很多年。 如下表所示,最新的 MQTT 协议版本为 5.0。
MQTT 版本 | CONNECT 数据包中的协议版本 | MQTT 规范发布年份 | 自 Year 以来的 RabbitMQ 支持(版本) |
---|---|---|---|
3.1 | 3 | 2010 | 2012 (3.0) |
3.1.1 | 4 | 2014 | 2014 (3.3) |
5.0 | 5 | 2019 | 2024 (3.13) |
值得一提的是,面向用户的协议版本和“内部”协议版本(也称为协议级别)是有区别的。 后者在 CONNECT 数据包中从客户端发送到服务器。 由于面向用户的协议版本 3.1.1 映射到内部协议版本 4,为了避免进一步的混淆,MQTT 委员会决定跳过面向用户的版本 4.0,以便面向用户的版本 5.0 映射到内部协议版本 5。
附录 C. MQTT v5.0 中的新功能摘要提供了 MQTT 5.0 新功能的完整列表。
由于您在 Web 上找到了很棒的 MQTT 5.0 资源,包括说明性图表和使用模式,因此这篇博文仅关注 RabbitMQ 的细节。 本节介绍 PR #7263 中实现的最重要功能。 对于每个功能,我们提供了一个如何将其与 RabbitMQ 一起使用的示例,或者概述了如何在 RabbitMQ 中实现它的高级描述。
要自己运行示例,请启动 RabbitMQ 服务器 3.13, 例如,使用以下 Docker 镜像标记:
docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 rabbitmq:3.13.0-management
在另一个终端窗口中,启用 MQTT 插件:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
由于 MQTT 插件是动态启用的,因此 MQTT 插件定义的功能标志被禁用。 启用所有功能标志,包括功能标志:mqtt_v5
docker exec rabbitmq rabbitmqctl enable_feature_flag all
现在,列出功能标志应显示所有功能标志都已启用:
docker exec rabbitmq rabbitmqctl list_feature_flags --formatter=pretty_table
以下示例使用 MQTTX CLI V1.9.4。 我们使用 CLI 而不是图形 UI,以便您可以通过复制粘贴命令轻松运行示例。
所有新功能也适用于 RabbitMQ Web MQTT 插件。
可以为发布到代理的每条消息设置以秒为单位的到期间隔。 如果在该过期时间间隔内未使用邮件,则该邮件将被丢弃或死信。
为 主题 创建订阅。 这将在 RabbitMQ 中创建一个队列。 通过键入终端断开客户端连接。 由于我们使用 600 秒的会话到期间隔,因此此队列将再存在 10 分钟。t/1``Ctrl+C
mqttx sub --client-id sub-1 --topic t/1 --session-expiry-interval 600 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/1...
✔ Subscribed to t/1
^C
将消息发布到同一主题,消息过期间隔为 30 秒:
mqttx pub --topic t/1 --message m1 --message-expiry-interval 30 --qos 1
… Connecting...
✔ Connected
… Message publishing...
✔ Message published
在接下来的 30 秒内,列出队列:
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages
┌─────────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 1 │
└─────────────────────────────┴─────────┴──────────┘
等待 30 秒,然后再次列出队列:
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues
┌─────────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 0 │
└─────────────────────────────┴─────────┴──────────┘
该消息已过期,因为客户端尚未连接到代理以使用该消息。 如果设置了死字策略,则邮件将死信发送到交易所。 在我们的例子中,死字被禁用。 查询 Prometheus 端点可证明经典队列中有 1 条消息已过期。sub-1
curl --silent localhost:15692/metrics | grep rabbitmq_global_messages_dead_lettered_expired_total
# TYPE rabbitmq_global_messages_dead_lettered_expired_total counter
# HELP rabbitmq_global_messages_dead_lettered_expired_total Total number of messages dead-lettered due to message TTL exceeded
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0
另一个有趣的功能是以下要求:
服务器发送到客户端的 PUBLISH 数据包必须包含设置为接收值减去应用程序消息在服务器中等待的时间的消息到期间隔。
向代理发送第二条消息,消息到期间隔为 60 秒:
mqttx pub --topic t/1 --message m2 --message-expiry-interval 60 --qos 1
等待 20 秒,然后重新连接订阅客户端:
mqttx sub --client-id sub-1 --topic t/1 --no-clean --session-expiry-interval 0 --qos 1 --output-mode clean
{
"topic": "t/1",
"payload": "m2",
"packet": {
...
"properties": {
"messageExpiryInterval": 40
}
}
}
根据 MQTT 5.0 协议规范的规定,客户端接收第二条消息,消息到期间隔设置为 40 秒: 代理接收的 60 秒减去消息在代理中等待的 20 秒。
MQTT 5.0 消息到期是在 RabbitMQ 中使用每条消息 TTL 实现的,类似于 AMQP 0.9.1 发布者中的字段。expiration
客户端可以在 SUBSCRIBE 数据包中设置订阅标识符。 如果客户端因该订阅而收到消息,则代理会将该订阅标识符包含在 PUBLISH 数据包中。
订阅标识符的用例列在 SUBSCRIBE 操作部分。
从同一客户端向服务器发送 3 个单独的 SUBSCRIBE 数据包,每个数据包具有不同的主题过滤器和不同的订阅标识符:
mqttx sub --client-id sub-2 --topic t/1 --subscription-identifier 1 --session-expiry-interval 600
^C
mqttx sub --client-id sub-2 --topic t/2 --subscription-identifier 2 --session-expiry-interval 600 --no-clean
^C
mqttx sub --client-id sub-2 --topic "t/#" --subscription-identifier 3 --session-expiry-interval 0 --no-clean --output-mode clean
在第二个终端窗口中,我们看到从同一队列到同一主题交换的 3 个绑定,每个绑定都具有不同的路由键:
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings \
source_name source_kind destination_name destination_kind routing_key
┌─────────────┬─────────────┬─────────────────────────────┬──────────────────┬─────────────────────────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ mqtt-subscription-sub-2qos0 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.# │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.1 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.2 │
└─────────────┴─────────────┴─────────────────────────────┴──────────────────┴─────────────────────────────┘
第一个条目是与默认交换的隐式绑定。
每个具有 MQTT 主题筛选器的 MQTT 订阅对应一个带有绑定键的 AMQP 0.9.1 绑定。 准确地说,表列的名称错误:应该改为调用它。 MQTT 中的主题级分隔符是 “” 字符,而 AMQP 0.9.1 主题交换中的主题级分隔符是 “” 字符。routing_key``binding_key``/``.
再次在第二个终端窗口中,向主题发送消息:t/1
mqttx pub --topic t/1 --message m1
(订阅客户端的)第一个终端窗口接收以下 PUBLISH 数据包:
{
"topic": "t/1",
"payload": "m1",
"packet": {
...
"properties": {
"subscriptionIdentifier": [
1,
3
]
}
}
}
它包含订阅标识符 1 和 3,因为 topic filters 和 match topic .t/1``t/#``t/1
同样,如果向主题发送第二条消息,订阅客户端将收到包含订阅标识符 2 和 3 的 PUBLISH 数据包。t/2
订阅标识符是 MQTT 会话状态的一部分。 因此,在客户端断开连接时,订阅标识符必须保留在服务器的数据库中,直到 MQTT 会话结束。 RabbitMQ 将订阅标识符存储在绑定参数中:
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings routing_key arguments
┌─────────────────────────────┬───────────────────────────────────────────────────────────────────────────────────┐
│ routing_key │ arguments │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-2qos0 │ │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.# │ {mqtt_subscription_opts,0,false,false,0,3}{<<"x-binding-key">>,longstr,<<"t.#">>} │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.1 │ {mqtt_subscription_opts,0,false,false,0,1}{<<"x-binding-key">>,longstr,<<"t.1">>} │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.2 │ {mqtt_subscription_opts,0,false,false,0,2}{<<"x-binding-key">>,longstr,<<"t.2">>} │
└─────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────┘
绑定参数的确切结构并不重要,并且可能会在将来的 RabbitMQ 版本中更改。 但是,可以在绑定参数中看到整数 1、2 和 3,这些参数对应于订阅标识符。
当主题交换路由消息时,发布 Erlang 进程会将所有匹配的绑定键包含在消息中。 订阅 MQTT 客户端的 Erlang 进程将匹配的绑定密钥与其知道的 MQTT 主题过滤器进行比较,并将订阅标识符包含在发送到 MQTT 客户端的 PUBLISH 数据包中。
发布 Erlang 进程可以是 MQTT 连接进程,也可以是 AMQP 0.9.1 通道进程。 一如既往,RabbitMQ 在跨协议互操作性方面表现出色:当 AMQP 0.9.1(或 STOMP 或 AMQP 1.0)客户端向主题交换发送消息时, 正确的订阅标识符将包含在发送到 MQTT 客户端的 PUBLISH 数据包中。
MQTT 5.0 提供了 3 个新的订阅选项:
所有订阅选项均由 RabbitMQ 实现。 在这里,我们只关注“保留处理”选项:
此选项指定在建立订阅时是否发送保留的消息。
这些值为:
0 = 在订阅
时发送保留消息 1 = 仅在订阅当前不存在
时在订阅时发送保留消息 2 = 在订阅时不发送保留的消息
发送保留的消息:
mqttx pub --topic mytopic --message m --retain
保留处理值 0 将接收保留的消息,而值 2 不会:
mqttx sub --topic mytopic --retain-handling 0
… Connecting...
✔ Connected
… Subscribing to mytopic...
✔ Subscribed to mytopic
payload: m
retain: true
^C
mqttx sub --topic mytopic --retain-handling 2
… Connecting...
✔ Connected
… Subscribing to mytopic...
✔ Subscribed to mytopic
数据包 CONNACK、PUBACK、SUBACK、UNSUBACK 和 DISCONNECT 包含原因码。
一个实现示例是,如果消息未路由到任何队列,则 RabbitMQ 将在 PUBACK 数据包中使用原因代码进行回复。 MQTT 5.0 原因代码在概念上对应于 AMQP 0.9.1 中的强制消息属性和处理程序。No matching subscribers``No matching subscribers``BasicReturn
大多数 MQTT 数据包可以包含用户属性。 用户属性的含义不是由 MQTT 规范定义的。
PUBLISH 数据包中的用户属性由客户端应用程序定义,并由服务器原封不动地转发。
在第一个终端窗口中订阅:
mqttx sub --topic t/5
在第二个终端窗口中发布包含用户属性的消息:
mqttx pub --topic t/5 --message m --user-properties "key1: value1"
第一个终端窗口将接收用户属性,原封不动:
payload: m
userProperties: [ { key: 'key1', value: 'value1' } ]
MQTT 5.0 PUBLISH 数据包中的用户属性类似于 AMQP 0.9.1 中的消息属性。headers
使用用户属性进行连接:
mqttx conn --client-id myclient --user-properties "connecting-from: London"
在浏览器中打开管理 UI http://localhost:15672/#/connections(用户名和密码都是 ),然后单击 MQTT 连接:guest
RabbitMQ 将在管理 UI 中显示 CONNECT 数据包中的用户属性。
发布者可以指定 MIME 内容类型。 它还可以设置有效负载格式指示器,指示有效负载是由 UTF-8 编码的字符数据还是未指定的二进制数据组成。
在第一个终端窗口中,订阅一个主题:
mqttx sub --topic t/6 --output-mode clean
在第二个终端窗口中,发送一条带有内容类型和有效负载格式指示符的消息:
mqttx pub --topic t/6 --message "my UTF-8 encoded data 声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/400443
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。