赞
踩
物联网下,物理设备内存CPU有限、4G网络不可靠、网络带宽小等,目前有emqttd、mosquitto、activemq等支持mqtt协议。对于点对点的消息传递直接使用一般的通信方式不使用消息队列就ok的,但是最近出了个需求需要消息广播准备使用发布订阅来实现。rabbitmq是将mqtt协议转换为amqp协议来处理。
消息类型比较简单,请求报文也比较简单。
- CONNECT 1 C->S 客户端请求与服务端建立连接
- CONNACK 2 S->C 服务端确认连接建立
- PUBLISH 3 CóS 发布消息
- PUBACK 4 CóS 收到发布消息确认
- PUBREC 5 CóS 发布消息收到
- PUBREL 6 CóS 发布消息释放
- PUBCOMP 7 CóS 发布消息完成
- SUBSCRIBE 8 C->S 订阅请求
- SUBACK 9 S->C 订阅确认
- UNSUBSCRIBE 10 C->S 取消订阅
- UNSUBACK 11 S->C 取消订阅确认
- PING 12 C->S 客户端发送PING(连接保活)命令
- PINGRSP 13 S->C PING命令回复
- DISCONNECT 14 C->S 断开连接
- #开启WEB管理
- rabbitmq-plugins enable rabbitmq_management
- #开启MQTT插件
- rabbitmq-plugins enable rabbitmq_mqtt
启用的是1883端口:
程序都是网上找的,先写了个简单的测试。依赖文件:
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>mqtt-client</artifactId>
- <version>0.4.0</version>
- </dependency>
publish端:
- //发布客户端
- public class publishClient {
-
- //mqtt服务器地址
- public static final String HOST = "tcp://114.116.48.130:1883";
- //主题
- public static final String TOPIC = "service_login";
- //mqtt 客户机ID
- private static final String clientid = "server";
- private MqttClient client;//客户端实例
- private MqttTopic topic11;//主题实例
- private String userName = "*****"; //非必须
- private String passWord = "*****"; //非必须
- private MqttMessage message;
- //初始化客户端实例
- public publishClient() throws MqttException {
- //MemoryPersistence设置clientid的保存形式,默认为以内存保存
- client = new MqttClient(HOST, clientid, new MemoryPersistence());
- connect();
- }
- //连接服务器
- private void connect() {
- //连接配置
- MqttConnectOptions options = new MqttConnectOptions();
- options.setCleanSession(true);//不保存,每次重启新client
- options.setUserName(userName);
- options.setPassword(passWord.toCharArray());
- // 设置超时时间
- options.setConnectionTimeout(10);
- // 设置会话心跳时间
- options.setKeepAliveInterval(20);
- try {
- //设置发布回调
- client.setCallback(new publishCallback());
- client.connect(options);
- topic11 = client.getTopic(TOPIC);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- //发布
- public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
- MqttException {
- MqttDeliveryToken token = topic.publish(message);
- token.waitForCompletion();
- System.out.println("message is published completely! "+ token.isComplete());
- }
- //测试类
- public static void main(String[] args) throws MqttException, InterruptedException {
- //发布客户端
- publishClient server = new publishClient();
- //每隔10s发一条
- for (;;){
- server.message = new MqttMessage();
- server.message.setQos(1);//保证消息能到达一次
- server.message.setRetained(true);//消息保留
- server.message.setPayload("{'key':'value'}".getBytes());//消息内容
- server.publish(server.topic11 , server.message);//发布
- Thread.sleep(10000);
- }
- }
- }
subscribe端:
- //订阅客户端
- public class subscribeClient {
- //mqtt服务器ip
- public static final String HOST = "tcp://114.116.48.130:1883";
- //主题
- public static final String TOPIC1 = "service_login";
- //mqtt 客户机ID
- private String clientid = "client";
- private MqttClient client;
- private MqttConnectOptions options;
- private String userName = "*****";
- private String passWord = "*****";
- @SuppressWarnings("unused")
- private ScheduledExecutorService scheduler;
- public subscribeClient(String clientid){
- this.clientid = clientid;
- }
- private void start() {
- try {
- // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
- client = new MqttClient(HOST, clientid, new MemoryPersistence());
- // MQTT的连接设置
- options = new MqttConnectOptions();
- // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
- options.setCleanSession(true);
- // 设置连接的用户名
- options.setUserName(userName);
- // 设置连接的密码
- options.setPassword(passWord.toCharArray());
- // 设置超时时间 单位为秒
- options.setConnectionTimeout(10);
- // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
- options.setKeepAliveInterval(20);
- // 设置回调
- client.setCallback(new subcribeCallback());
- MqttTopic topic = client.getTopic(TOPIC1);
- //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
- // 遗嘱
- options.setWill(topic, "close".getBytes(), 2, true);
- client.connect(options);
- //订阅消息
- int[] Qos = {1};
- String[] topic1 = {TOPIC1};
- client.subscribe(topic1, Qos);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) throws MqttException {
- //一个设备一个队列
- new subscribeClient("client_1").start();
- new subscribeClient("client_2").start();
- new subscribeClient("client_3").start();
- }
- }
callback回调:
- public class publishCallback implements MqttCallback {
-
- //在断开连接时调用
- @Override
- public void connectionLost(Throwable cause) {
- // 连接丢失后,一般在这里面进行重连
- System.out.println("连接断开,可以做重连");
- }
-
- //接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- System.out.println("deliveryComplete---------" + token.isComplete());
- }
-
- //接收已经预订的发布
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- // subscribe后得到的消息会执行到这里面
- System.out.println("接收消息主题 : " + topic);
- System.out.println("接收消息Qos : " + message.getQos());
- System.out.println("接收消息内容 : " + new String(message.getPayload()));
- }
- }
首先QoS取值决定了消息质量,消息推送端分别设置不同的消息级别在建立虚拟通道是有差异,先启动推送端十秒一条消息
0:尽力发送,如果遇到传递失败,TCP传输层保证不会重试,出错会丢失消息
1:消费者没有接收确认或者确认消息本身丢失,消息发送者会再次发送,可能造成消息重复
2:保证消息接收者成功接收一次,造成并发性能下降以及消息传递时延增加
关于rabbitmq的web窗口参数:
- Publish:producter pub消息的速率。
- Publisher confirm:broker确认pub消息的速率。
- Deliver(manual ack):customer手动确认的速率。
- Deliver( auto ack):customer自动确认的速率。
- Consumer ack:customer正在确认的速率。
- Redelivered:正在传递'redelivered'标志集的消息的速率。
- Get (manual ack):响应basic.get而要求确认的消息的传输速率。
- Get (auto ack):响应于basic.get而发送不需要确认的消息的速率。
- Return:将basic.return发送给producter的速率。
- Disk read:queue从磁盘读取消息的速率。
- Disk write:queue从磁盘写入消息的速率。
server.message.setQos(0):
server.message.setQos(1):
server.message.setQos(2):
启动会报IO异常,待解决。Qos=2报错,已断开连接 (32109) - java.io.EOFException,查阅资料发现是rabbitmq自身的bug,据说插件升级到mqtt-3.1.1可以解决,还没有尝试。
本身confirm方式就是用来确保消息成功推送到broker中,这里正好rabbitmqq默认使用confirm实现mqtt协议的QoS=1。rabbitmq的ack来自于发布确认,但是消费者还没有启动所以队列也没有创建,消息从broker传递到队列中之后(不管消息有没有被消费)都会由broker返回确认,启动消费者,查看队列如下:
rabbitmq实现的mqtt协议在发布订阅模式下,每个消费者都会创建一个队列,创建队列由订阅主题时触发client.subscribe(topic1, Qos),对于rabbitmq来说会收到broker返回的消息发布成功确认消息:
每个队列都只有一个消费者,如下:
由于rabbitmq是将mqtt协议转化为amqp协议,在mqtt协议里面是没有交换机、队列概念的,所以这里整个mqtt服务器是利用同一个topic交换机实现的,查看交换机如下:
这里交换机消息进出一比三,交换机绑定三个队列正好对应:
如果更换topic路由键,比如login主题增加一个logout主题,交换机中就会新建一个主题,再启动三个消费者去订阅,结果如下:
Retained :可以让新订阅的客户端得到发布方的最新的状态值,而不必要等待发送,此操作属于持久化操作,消费端重启服务依然可以收到。mqtt协议消息类型publish有一个redain标记位,broker会储存该topic最后一条消息,新上线的客户端会收到这一条消息,这个消息是本地持久化即使推送端重启。
CleanSession:是否清除客户端session,清除会使用新身份登入。如果不清除,即使是客户端下线,我这边关掉消费者,mqtt服务器会保留客户端信息如下,点进去发会发现这个队列没有消费者。如果清除的话这里是没有这条记录的。
ConnectionTimeout:超时时间。
KeepAliveInterval:会话心跳时间。
我这边推送端server.message.setRetained(false)设置消息不保留,消费端设置options.setCleanSession(false)客户端身份不清除,按道理消费者重复上线是不会收到保留消息的,可我这里没生效原因不明,重启依旧收到上一条消息的保留值。所以对publish请求抓包,查看一下publish推送请求如下:
查看subscribe请求抓包如下:
推送的时候retain是false,订阅到的消息retain是true,应该是rabbitmq给我改了, 估摸是和rabbitmq的消息持久化有关,我现在在web窗口手动publish一条消息(设置非持久化)再对subscribe端抓包发现retain为0,且重启subscribe端是没有重复获取这条消息的,结果如下:
在web窗口手动向topic交换机publish消息走的是qmqp协议并没有通过rabbitmq自带的mqtt插件所以会造成这种差别,查看rabbitmq-mqtt源码:
- #非持久化
- delivery_mode(?QOS_0) -> 1;
- #持久化
- delivery_mode(?QOS_1) -> 2.
发现qos=1的情况默认持久化消息,所以用rabbitmq-mqtt插件会让qos=1的publicsh消息设置retain=false无效,再次设置qos=0然后测试重启果然没有收到这个消息。
publish端设置qos=0,subcribe端设置cleanSession=false,启动推送端5秒一条消息,启动消费端之后又停止,查看队列中的未消费消息:
这种情况应该是效率最好的。 对于单队列来说,让生产和消费速率平衡之后测试速率的峰值可以很方便得到QPS,可是我们这里业务场景是生产者速率并不会太高,但是因为要对n多设备进行消息广播所以rabbitmq中在线的队列数量会比我们设备还多,目前还不知道用rabbitmq-mqtt实现广播消息性能怎么样。现在while死循环创建客户端取订阅主题,如图:
看到默认的socket连接上线是829个,队列达到500的时候就已经占用了一大半了。
中国移动提供了开放的物联网平台,支持多种协议,mqtt就是其中一种。所有服务端交互使用https命令,然后硬件再用对应的协议比如mqtt进行连接。不用自己搭建、不用担心负载、使用简单,就是中国移动要平台维护,几个月一次,每次几分钟,一般的应用可能受不了这5分钟的服务不可用。
搭建也比较简单。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。