当前位置:   article > 正文

RabbitMQ 实现 Mqtt 协议_rabbitmq mqtt

rabbitmq mqtt

物联网下,物理设备内存CPU有限、4G网络不可靠、网络带宽小等,目前有emqttd、mosquitto、activemq等支持mqtt协议。对于点对点的消息传递直接使用一般的通信方式不使用消息队列就ok的,但是最近出了个需求需要消息广播准备使用发布订阅来实现。rabbitmq是将mqtt协议转换为amqp协议来处理。

  • 1.消息类型

消息类型比较简单,请求报文也比较简单。

  1. CONNECT 1 C->S 客户端请求与服务端建立连接
  2. CONNACK 2 S->C 服务端确认连接建立
  3. PUBLISH 3 CóS 发布消息
  4. PUBACK 4 CóS 收到发布消息确认
  5. PUBREC 5 CóS 发布消息收到
  6. PUBREL 6 CóS 发布消息释放
  7. PUBCOMP 7 CóS 发布消息完成
  8. SUBSCRIBE 8 C->S 订阅请求
  9. SUBACK 9 S->C 订阅确认
  10. UNSUBSCRIBE 10 C->S 取消订阅
  11. UNSUBACK 11 S->C 取消订阅确认
  12. PING 12 C->S 客户端发送PING(连接保活)命令
  13. PINGRSP 13 S->C PING命令回复
  14. DISCONNECT 14 C->S 断开连接
  • 2.rabbitmq开启mqtt
  1. #开启WEB管理
  2. rabbitmq-plugins enable rabbitmq_management
  3. #开启MQTT插件
  4. rabbitmq-plugins enable rabbitmq_mqtt

启用的是1883端口:

mqtt

  • 3.java客户端

程序都是网上找的,先写了个简单的测试。依赖文件:

  1. <dependency>
  2. <groupId>org.eclipse.paho</groupId>
  3. <artifactId>mqtt-client</artifactId>
  4. <version>0.4.0</version>
  5. </dependency>

publish端:

  1. //发布客户端
  2. public class publishClient {
  3. //mqtt服务器地址
  4. public static final String HOST = "tcp://114.116.48.130:1883";
  5. //主题
  6. public static final String TOPIC = "service_login";
  7. //mqtt 客户机ID
  8. private static final String clientid = "server";
  9. private MqttClient client;//客户端实例
  10. private MqttTopic topic11;//主题实例
  11. private String userName = "*****"; //非必须
  12. private String passWord = "*****"; //非必须
  13. private MqttMessage message;
  14. //初始化客户端实例
  15. public publishClient() throws MqttException {
  16. //MemoryPersistence设置clientid的保存形式,默认为以内存保存
  17. client = new MqttClient(HOST, clientid, new MemoryPersistence());
  18. connect();
  19. }
  20. //连接服务器
  21. private void connect() {
  22. //连接配置
  23. MqttConnectOptions options = new MqttConnectOptions();
  24. options.setCleanSession(true);//不保存,每次重启新client
  25. options.setUserName(userName);
  26. options.setPassword(passWord.toCharArray());
  27. // 设置超时时间
  28. options.setConnectionTimeout(10);
  29. // 设置会话心跳时间
  30. options.setKeepAliveInterval(20);
  31. try {
  32. //设置发布回调
  33. client.setCallback(new publishCallback());
  34. client.connect(options);
  35. topic11 = client.getTopic(TOPIC);
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. //发布
  41. public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
  42. MqttException {
  43. MqttDeliveryToken token = topic.publish(message);
  44. token.waitForCompletion();
  45. System.out.println("message is published completely! "+ token.isComplete());
  46. }
  47. //测试类
  48. public static void main(String[] args) throws MqttException, InterruptedException {
  49. //发布客户端
  50. publishClient server = new publishClient();
  51. //每隔10s发一条
  52. for (;;){
  53. server.message = new MqttMessage();
  54. server.message.setQos(1);//保证消息能到达一次
  55. server.message.setRetained(true);//消息保留
  56. server.message.setPayload("{'key':'value'}".getBytes());//消息内容
  57. server.publish(server.topic11 , server.message);//发布
  58. Thread.sleep(10000);
  59. }
  60. }
  61. }

subscribe端:

  1. //订阅客户端
  2. public class subscribeClient {
  3. //mqtt服务器ip
  4. public static final String HOST = "tcp://114.116.48.130:1883";
  5. //主题
  6. public static final String TOPIC1 = "service_login";
  7. //mqtt 客户机ID
  8. private String clientid = "client";
  9. private MqttClient client;
  10. private MqttConnectOptions options;
  11. private String userName = "*****";
  12. private String passWord = "*****";
  13. @SuppressWarnings("unused")
  14. private ScheduledExecutorService scheduler;
  15. public subscribeClient(String clientid){
  16. this.clientid = clientid;
  17. }
  18. private void start() {
  19. try {
  20. // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
  21. client = new MqttClient(HOST, clientid, new MemoryPersistence());
  22. // MQTT的连接设置
  23. options = new MqttConnectOptions();
  24. // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
  25. options.setCleanSession(true);
  26. // 设置连接的用户名
  27. options.setUserName(userName);
  28. // 设置连接的密码
  29. options.setPassword(passWord.toCharArray());
  30. // 设置超时时间 单位为秒
  31. options.setConnectionTimeout(10);
  32. // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
  33. options.setKeepAliveInterval(20);
  34. // 设置回调
  35. client.setCallback(new subcribeCallback());
  36. MqttTopic topic = client.getTopic(TOPIC1);
  37. //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
  38. // 遗嘱
  39. options.setWill(topic, "close".getBytes(), 2, true);
  40. client.connect(options);
  41. //订阅消息
  42. int[] Qos = {1};
  43. String[] topic1 = {TOPIC1};
  44. client.subscribe(topic1, Qos);
  45. } catch (Exception e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. public static void main(String[] args) throws MqttException {
  50. //一个设备一个队列
  51. new subscribeClient("client_1").start();
  52. new subscribeClient("client_2").start();
  53. new subscribeClient("client_3").start();
  54. }
  55. }

 callback回调:

  1. public class publishCallback implements MqttCallback {
  2. //在断开连接时调用
  3. @Override
  4. public void connectionLost(Throwable cause) {
  5. // 连接丢失后,一般在这里面进行重连
  6. System.out.println("连接断开,可以做重连");
  7. }
  8. //接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用
  9. @Override
  10. public void deliveryComplete(IMqttDeliveryToken token) {
  11. System.out.println("deliveryComplete---------" + token.isComplete());
  12. }
  13. //接收已经预订的发布
  14. @Override
  15. public void messageArrived(String topic, MqttMessage message) throws Exception {
  16. // subscribe后得到的消息会执行到这里面
  17. System.out.println("接收消息主题 : " + topic);
  18. System.out.println("接收消息Qos : " + message.getQos());
  19. System.out.println("接收消息内容 : " + new String(message.getPayload()));
  20. }
  21. }
  • 4.测试

首先QoS取值决定了消息质量,消息推送端分别设置不同的消息级别在建立虚拟通道是有差异,先启动推送端十秒一条消息

0:尽力发送,如果遇到传递失败,TCP传输层保证不会重试,出错会丢失消息
1:消费者没有接收确认或者确认消息本身丢失,消息发送者会再次发送,可能造成消息重复
2:保证消息接收者成功接收一次,造成并发性能下降以及消息传递时延增加

关于rabbitmq的web窗口参数:

  1. Publish:producter pub消息的速率。
  2. Publisher confirm:broker确认pub消息的速率。
  3. Deliver(manual ack):customer手动确认的速率。
  4. Deliver( auto ack):customer自动确认的速率。
  5. Consumer ack:customer正在确认的速率。
  6. Redelivered:正在传递'redelivered'标志集的消息的速率。
  7. Get (manual ack):响应basic.get而要求确认的消息的传输速率。
  8. Get (auto ack):响应于basic.get而发送不需要确认的消息的速率。
  9. Return:将basic.return发送给producter的速率。
  10. Disk read:queue从磁盘读取消息的速率。
  11. Disk write:queue从磁盘写入消息的速率。

server.message.setQos(0):

级别0

server.message.setQos(1):

级别1

server.message.setQos(2):

启动会报IO异常,待解决。Qos=2报错,已断开连接 (32109) - java.io.EOFException,查阅资料发现是rabbitmq自身的bug,据说插件升级到mqtt-3.1.1可以解决,还没有尝试。

本身confirm方式就是用来确保消息成功推送到broker中,这里正好rabbitmqq默认使用confirm实现mqtt协议的QoS=1。rabbitmq的ack来自于发布确认,但是消费者还没有启动所以队列也没有创建,消息从broker传递到队列中之后(不管消息有没有被消费)都会由broker返回确认,启动消费者,查看队列如下:

rabbitmq实现的mqtt协议在发布订阅模式下,每个消费者都会创建一个队列,创建队列由订阅主题时触发client.subscribe(topic1, Qos),对于rabbitmq来说会收到broker返回的消息发布成功确认消息:

每个队列都只有一个消费者,如下:

由于rabbitmq是将mqtt协议转化为amqp协议,在mqtt协议里面是没有交换机、队列概念的,所以这里整个mqtt服务器是利用同一个topic交换机实现的,查看交换机如下:

这里交换机消息进出一比三,交换机绑定三个队列正好对应:

如果更换topic路由键,比如login主题增加一个logout主题,交换机中就会新建一个主题,再启动三个消费者去订阅,结果如下:

  • 5.org.eclipse.paho.client.mqttv3使用细节

Retained :可以让新订阅的客户端得到发布方的最新的状态值,而不必要等待发送,此操作属于持久化操作,消费端重启服务依然可以收到。mqtt协议消息类型publish有一个redain标记位,broker会储存该topic最后一条消息,新上线的客户端会收到这一条消息,这个消息是本地持久化即使推送端重启。

CleanSession:是否清除客户端session,清除会使用新身份登入。如果不清除,即使是客户端下线,我这边关掉消费者,mqtt服务器会保留客户端信息如下,点进去发会发现这个队列没有消费者。如果清除的话这里是没有这条记录的。

ConnectionTimeout:超时时间。

KeepAliveInterval:会话心跳时间。

我这边推送端server.message.setRetained(false)设置消息不保留,消费端设置options.setCleanSession(false)客户端身份不清除,按道理消费者重复上线是不会收到保留消息的,可我这里没生效原因不明,重启依旧收到上一条消息的保留值。所以对publish请求抓包,查看一下publish推送请求如下:

查看subscribe请求抓包如下:

推送的时候retain是false,订阅到的消息retain是true,应该是rabbitmq给我改了, 估摸是和rabbitmq的消息持久化有关,我现在在web窗口手动publish一条消息(设置非持久化)再对subscribe端抓包发现retain为0,且重启subscribe端是没有重复获取这条消息的,结果如下:

在web窗口手动向topic交换机publish消息走的是qmqp协议并没有通过rabbitmq自带的mqtt插件所以会造成这种差别,查看rabbitmq-mqtt源码

  1. #非持久化
  2. delivery_mode(?QOS_0) -> 1;
  3. #持久化
  4. delivery_mode(?QOS_1) -> 2.

发现qos=1的情况默认持久化消息,所以用rabbitmq-mqtt插件会让qos=1的publicsh消息设置retain=false无效,再次设置qos=0然后测试重启果然没有收到这个消息。

  • 6.关于QPS

publish端设置qos=0,subcribe端设置cleanSession=false,启动推送端5秒一条消息,启动消费端之后又停止,查看队列中的未消费消息:

这种情况应该是效率最好的。 对于单队列来说,让生产和消费速率平衡之后测试速率的峰值可以很方便得到QPS,可是我们这里业务场景是生产者速率并不会太高,但是因为要对n多设备进行消息广播所以rabbitmq中在线的队列数量会比我们设备还多,目前还不知道用rabbitmq-mqtt实现广播消息性能怎么样。现在while死循环创建客户端取订阅主题,如图:

看到默认的socket连接上线是829个,队列达到500的时候就已经占用了一大半了。 

  • 7.关于开放平台mqtt服务器

中国移动提供了开放的物联网平台,支持多种协议,mqtt就是其中一种。所有服务端交互使用https命令,然后硬件再用对应的协议比如mqtt进行连接。不用自己搭建、不用担心负载、使用简单,就是中国移动要平台维护,几个月一次,每次几分钟,一般的应用可能受不了这5分钟的服务不可用。

中国移动oneNet物联网开放平台

  • 8.mosquitto

搭建也比较简单。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/788168
推荐阅读
相关标签
  

闽ICP备14008679号