此篇文章继上一篇物联网协议之MQTT源码分析(一)而写的第二篇MQTT发布消息以及接收Broker消息的源码分析,想看MQTT连接的小伙伴可以去看我上一篇哦。
MQTT发布消息
MQTT发布消息是由MqttAndroidClient类的publish函数执行的,我们来看看这个函数:
- // MqttAndroidClient类:
- @Override
- public IMqttDeliveryToken publish(String topic, byte[] payload, int qos,
- boolean retained, Object userContext,
- IMqttActionListener callback)
- throws MqttException, MqttPersistenceException {
- // 将消息内容、qos消息等级、retained消息是否保留封装成MqttMessage
- MqttMessage message = new MqttMessage(payload);
- message.setQos(qos);
- message.setRetained(retained);
- // 每一条消息都有自己的token
- MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid(
- this, userContext, callback, message);
- String activityToken = storeToken(token);
- IMqttDeliveryToken internalToken = mqttService.publish(clientHandle,
- topic, payload, qos, retained, null, activityToken);
- token.setDelegate(internalToken);
- return token;
- }
- 复制代码
从上面代码可以看出,发布消息需要topic消息主题、payload消息内容、callback回调监听等,经由mqttService.publish继续执行发布操作:
- // MqttService类:MQTT唯一组件
- public IMqttDeliveryToken publish(String clientHandle, String topic,
- byte[] payload, int qos, boolean retained,
- String invocationContext, String activityToken)
- throws MqttPersistenceException, MqttException {
- MqttConnection client = getConnection(clientHandle);
- return client.publish(topic, payload, qos, retained, invocationContext,
- activityToken);
- }
- 复制代码
MqttConnection在上一篇中讲解过,MQTT的连接会初始化一个MqttConnection,并保存在一个Map集合connections中,并通过getConnection(clientHandle)方法获取。很明显我们要接着看client.publish函数啦:
- // MqttConnection类:
- public IMqttDeliveryToken publish(String topic, byte[] payload, int qos,
- boolean retained, String invocationContext,
- String activityToken) {
- // 用于发布消息,是否发布成功的回调
- final Bundle resultBundle = new Bundle();
- resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
- MqttServiceConstants.SEND_ACTION);
- resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN,
- activityToken);
- resultBundle.putString(
- MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT,
- invocationContext);
-
- IMqttDeliveryToken sendToken = null;
-
- if ((myClient != null) && (myClient.isConnected())) {
- // 携带resultBundle数据,用于监听回调发布消息是否成功
- IMqttActionListener listener = new MqttConnectionListener(
- resultBundle);
- try {
- MqttMessage message = new MqttMessage(payload);
- message.setQos(qos);
- message.setRetained(retained);
- sendToken = myClient.publish(topic, payload, qos, retained,
- invocationContext, listener);
- storeSendDetails(topic, message, sendToken, invocationContext,
- activityToken);
- } catch (Exception e) {
- handleException(resultBundle, e);
- }
- } else {
- resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE,
- NOT_CONNECTED);
- service.traceError(MqttServiceConstants.SEND_ACTION, NOT_CONNECTED);
- service.callbackToActivity(clientHandle, Status.ERROR, resultBundle);
- }
-
- return sendToken;
- }
- 复制代码
这段代码中很明显可以看出发布的操作又交给了myClient.publish方法,那myClient是谁呢?上一篇文章中讲过myClient是MqttAsyncClient,是在MQTT连接时在MqttConnection类的connect方法中初始化的,详情请看上一篇。
- // MqttAsyncClient类:
- public IMqttDeliveryToken publish(String topic, byte[] payload, int qos
- , boolean retained,Object userContext,
- IMqttActionListener callback) throws MqttException,MqttPersistenceException {
- MqttMessage message = new MqttMessage(payload);
- message.setQos(qos);
- message.setRetained(retained);
- return this.publish(topic, message, userContext, callback);
- }
-
- public IMqttDeliveryToken publish(String topic, MqttMessage message
- , Object userContext,
- IMqttActionListener callback) throws MqttException,MqttPersistenceException {
- final String methodName = "publish";
- // @TRACE 111=< topic={0} message={1}userContext={1} callback={2}
- log.fine(CLASS_NAME, methodName, "111", new Object[]{topic, userContext, callback});
-
- // Checks if a topic is valid when publishing a message.
- MqttTopic.validate(topic, false/* wildcards NOT allowed */);
-
- MqttDeliveryToken token = new MqttDeliveryToken(getClientId());
- token.setActionCallback(callback);
- token.setUserContext(userContext);
- token.setMessage(message);
- token.internalTok.setTopics(new String[]{topic});
-
- MqttPublish pubMsg = new MqttPublish(topic, message);
- comms.sendNoWait(pubMsg, token);
-
- // @TRACE 112=<
- log.fine(CLASS_NAME, methodName, "112");
-
- return token;
- }
- 复制代码
从这段代码中可以看到,现在把把topic和message封装成了MqttPublish类型的消息,并继续由comms.sendNoWait执行,comms是ClientComms,ClientComms是在初始化MqttAsyncClient的构造方法中初始化的,详情看上一篇。
- // ClientComms类:
- public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException {
- final String methodName = "sendNoWait";
- // 判断状态或者消息类型
- if (isConnected() ||
- (!isConnected() && message instanceof MqttConnect) ||
- (isDisconnecting() && message instanceof MqttDisconnect)) {
- if (disconnectedMessageBuffer != null && disconnectedMessageBuffer.getMessageCount() != 0) {
- //@TRACE 507=Client Connected, Offline Buffer available, but not empty. Adding
- // message to buffer. message={0}
- log.fine(CLASS_NAME, methodName, "507", new Object[]{message.getKey()});
- if (disconnectedMessageBuffer.isPersistBuffer()) {
- this.clientState.persistBufferedMessage(message);
- }
- disconnectedMessageBuffer.putMessage(message, token);
- } else {
- // 现在不是disconnect因此,逻辑走这里
- this.internalSend(message, token);
- }
- } else if (disconnectedMessageBuffer != null) {
- //@TRACE 508=Offline Buffer available. Adding message to buffer. message={0}
- log.fine(CLASS_NAME, methodName, "508", new Object[]{message.getKey()});
- if (disconnectedMessageBuffer.isPersistBuffer()) {
- this.clientState.persistBufferedMessage(message);
- }
- disconnectedMessageBuffer.putMessage(message, token);
- } else {
- //@TRACE 208=failed: not connected
- log.fine(CLASS_NAME, methodName, "208");
- throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
- }
- }
-
- void internalSend(MqttWireMessage message, MqttToken token) throws MqttException {
- final String methodName = "internalSend";
- ...
- try {
- // Persist if needed and send the message
- this.clientState.send(message, token);
- } catch (MqttException e) {
- // 注意此处代码***
- if (message instanceof MqttPublish) {
- this.clientState.undo((MqttPublish) message);
- }
- throw e;
- }
- }
- 复制代码
comms.sendNoWait方法中又调用了本类中的internalSend方法,并且在internalSend方法中又调用了clientState.send(message, token)方法继续发布。ClientState对象是在ClientComms初始化的构造方法中初始化的。此处需要注意一下catch里的代码,下面会具体说明。
- // ClientState类:
- public void send(MqttWireMessage message, MqttToken token) throws MqttException {
- final String methodName = "send";
- ...
-
- if (message instanceof MqttPublish) {
- synchronized (queueLock) {
- /**
- * 注意这里:actualInFlight实际飞行中>maxInflight最大飞行中
- * maxInflight:是我们在自己代码中通过连接选项MqttConnectOptions.setMaxInflight();设置的,默认大小为10
- */
- if (actualInFlight >= this.maxInflight) {
- //@TRACE 613= sending {0} msgs at max inflight window
- log.fine(CLASS_NAME, methodName, "613",
- new Object[]{new Integer(actualInFlight)});
-
- throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
- }
-
- MqttMessage innerMessage = ((MqttPublish) message).getMessage();
- //@TRACE 628=pending publish key={0} qos={1} message={2}
- log.fine(CLASS_NAME, methodName, "628",
- new Object[]{new Integer(message.getMessageId()),
- new Integer(innerMessage.getQos()), message});
- /**
- * 根据自己设置的qos等级,来决定是否需要恢复消息
- * 这里需要说明一下qos等级区别:
- * qos==0,至多发送一次,不进行重试,Broker不会返回确认消息。
- * qos==1,至少发送一次,确保消息到达Broker,Broker需要返回确认消息PUBACK
- * qos==2,Broker肯定会收到消息,且只收到一次,qos==1可能会发送重复消息
- */
- switch (innerMessage.getQos()) {
- case 2:
- outboundQoS2.put(new Integer(message.getMessageId()), message);
- persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
- break;
- case 1:
- outboundQoS1.put(new Integer(message.getMessageId()), message);
- persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
- break;
- }
- tokenStore.saveToken(token, message);
- pendingMessages.addElement(message);
- queueLock.notifyAll();
- }
- } else {
- ...
- }
- }
- 复制代码
这段代码中我们发现了一个可能需要我们自己设置的属性maxInflight,如果实际发送中的消息大于maxInflight约束的最大的话就会抛出MqttException异常,那么这个异常catch里是怎么处理的呢,这就要往回看一步代码啦,上面已经提示过需要注意ClientComms类中internalSend方法中的catch里的代码:
- if (message instanceof MqttPublish) {
- this.clientState.undo((MqttPublish) message);
- }
- 复制代码
可以很明确的看出若消息类型是MqttPublish,则执行clientState.undo((MqttPublish) message)方法,我们前面说过消息已经在MqttAsyncClient类的publish方法中把topic和message封装成了MqttPublish类型的消息,因此此处会执行undo方法:
- // ClientState类:
- protected void undo(MqttPublish message) throws MqttPersistenceException {
- final String methodName = "undo";
- synchronized (queueLock) {
- //@TRACE 618=key={0} QoS={1}
- log.fine(CLASS_NAME, methodName, "618",
- new Object[]{new Integer(message.getMessageId()),
- new Integer(message.getMessage().getQos())});
-
- if (message.getMessage().getQos() == 1) {
- outboundQoS1.remove(new Integer(message.getMessageId()));
- } else {
- outboundQoS2.remove(new Integer(message.getMessageId()));
- }
- pendingMessages.removeElement(message);
- persistence.remove(getSendPersistenceKey(message));
- tokenStore.removeToken(message);
- if (message.getMessage().getQos() > 0) {
- //Free this message Id so it can be used again
- releaseMessageId(message.getMessageId());
- message.setMessageId(0);
- }
-
- checkQuiesceLock();
- }
- }
- 复制代码
代码已经很明显了,就是把大于maxInflight这部分消息remove移除掉,因此在实际操作中要注意自己的Mqtt消息的发布会不会在短时间内达到maxInflight默认的10的峰值,若能达到,则需要手动设置一个适合自己项目的范围阀值啦。
我们继续说clientState.send(message, token)方法里的逻辑,代码中注释中也说明了Mqtt会根据qos等级来决定消息到达机制
qos等级
- qos==0,至多发送一次,不进行重试,Broker不会返回确认消息,消息可能会丢失。
- qos==1,至少发送一次,确保消息到达Broker,Broker需要返回确认消息PUBACK,可能会发送重复消息
- qos==2,Broker肯定会收到消息,且只收到一次
根据qos等级,若qos等于1和2,则讲消息分别加入Hashtable类型的outboundQoS1和outboundQoS2中,已在后续逻辑中确保消息发送成功并到达。
注:qos等级优先级没有maxInflight高,从代码中可以看出,会先判断maxInflight再区分qos等级
代码的最后讲消息添加进Vector类型的pendingMessages里,在上一篇中我们可以了解到MQTT的发射器是轮询检查pendingMessages里是否存在数据,若存在则通过socket的OutputStream发送出去。并且会通过接收器接收从Broker发送回来的数据。
监听Broker返回的消息之数据
发送我们就不看源码啦,接收我们再看一下源码,通过源码看一看数据是怎么回到我们自己的回调里的:
- // CommsReceiver类中:
- public void run() {
- recThread = Thread.currentThread();
- recThread.setName(threadName);
- final String methodName = "run";
- MqttToken token = null;
-
- try {
- runningSemaphore.acquire();
- } catch (InterruptedException e) {
- running = false;
- return;
- }
-
- while (running && (in != null)) {
- try {
- //@TRACE 852=network read message
- log.fine(CLASS_NAME, methodName, "852");
- receiving = in.available() > 0;
- MqttWireMessage message = in.readMqttWireMessage();
- receiving = false;
-
- // 消息是否属于Mqtt确认类型
- if (message instanceof MqttAck) {
- token = tokenStore.getToken(message);
- // token一般不会为空,前面已经保存过
- if (token != null) {
- synchronized (token) {
- // ...
- clientState.notifyReceivedAck((MqttAck) message);
- }
- }
- ...
- } finally {
- receiving = false;
- runningSemaphore.release();
- }
- }
- }
- 复制代码
从代码中可以看出,Broker返回来的数据交给了clientState.notifyReceivedAck方法:
- // ClientState类:
- protected void notifyReceivedAck(MqttAck ack) throws MqttException {
- final String methodName = "notifyReceivedAck";
- ...
-
- MqttToken token = tokenStore.getToken(ack);
- MqttException mex = null;
-
- if (token == null) {
- ...
- } else if (ack instanceof MqttPubRec) {
- // qos==2 是返回
- MqttPubRel rel = new MqttPubRel((MqttPubRec) ack);
- this.send(rel, token);
- } else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {
- // qos==1/2 消息移除前通知的结果
- notifyResult(ack, token, mex);
- // Do not remove publish / delivery token at this stage
- // do this when the persistence is removed later
- } else if (ack instanceof MqttPingResp) {
- // 连接心跳数据消息
- ...
- } else if (ack instanceof MqttConnack) {
- // MQTT连接消息
- ...
- } else {
- notifyResult(ack, token, mex);
- releaseMessageId(ack.getMessageId());
- tokenStore.removeToken(ack);
- }
-
- checkQuiesceLock();
- }
- 复制代码
从上面注释可知,发布的消息qos==0,返回结果是直接走else,而qos==1/2,确认消息也最终会走到notifyResult(ack, token, mex)方法中:
- protected void notifyResult(MqttWireMessage ack, MqttToken token, MqttException ex) {
- final String methodName = "notifyResult";
- // 取消阻止等待令牌的任何线程,并保存ack
- token.internalTok.markComplete(ack, ex);
- // 通知此令牌已收到响应消息,设置已完成状态,并通过isComplete()获取状态
- token.internalTok.notifyComplete();
-
- // 让用户知道异步操作已完成,然后删除令牌
- if (ack != null && ack instanceof MqttAck && !(ack instanceof MqttPubRec)) {
- //@TRACE 648=key{0}, msg={1}, excep={2}
- log.fine(CLASS_NAME, methodName, "648", new Object[]{token.internalTok.getKey(), ack,ex});
- // CommsCallback类
- callback.asyncOperationComplete(token);
- }
- // 有些情况下,由于操作失败,因此没有确认
- if (ack == null) {
- //@TRACE 649=key={0},excep={1}
- log.fine(CLASS_NAME, methodName, "649", new Object[]{token.internalTok.getKey(), ex});
- callback.asyncOperationComplete(token);
- }
- }
-
- // Token类:
- protected void markComplete(MqttWireMessage msg, MqttException ex) {
- final String methodName = "markComplete";
- //@TRACE 404=>key={0} response={1} excep={2}
- log.fine(CLASS_NAME, methodName, "404", new Object[]{getKey(), msg, ex});
-
- synchronized (responseLock) {
- // ACK means that everything was OK, so mark the message for garbage collection.
- if (msg instanceof MqttAck) {
- this.message = null;
- }
- this.pendingComplete = true;
- // 将消息保存在response成员变量中,并通过getWireMessage()方法获取消息msg
- this.response = msg;
- this.exception = ex;
- }
- }
- // Token类:
- protected void notifyComplete() {
- ...
- synchronized (responseLock) {
- ...
- if (exception == null && pendingComplete) {
- // 设置已完成,并通过isComplete()获取状态
- completed = true;
- pendingComplete = false;
- } else {
- pendingComplete = false;
- }
-
- responseLock.notifyAll();
- }
- ...
- }
- 复制代码
此时已将MqttWireMessage消息保存到token中,异步操作已完成,调用回调监听CommsCallback里的asyncOperationComplete方法:
- // CommsCallback类:
- public void asyncOperationComplete(MqttToken token) {
- final String methodName = "asyncOperationComplete";
-
- if (running) {
- // invoke callbacks on callback thread
- completeQueue.addElement(token);
- synchronized (workAvailable) {
- // @TRACE 715=new workAvailable. key={0}
- log.fine(CLASS_NAME, methodName, "715", new Object[]{token.internalTok.getKey()});
- workAvailable.notifyAll();
- }
- } else {
- // invoke async callback on invokers thread
- try {
- handleActionComplete(token);
- } catch (Throwable ex) {
- // Users code could throw an Error or Exception e.g. in the case
- // of class NoClassDefFoundError
- // @TRACE 719=callback threw ex:
- log.fine(CLASS_NAME, methodName, "719", null, ex);
-
- // Shutdown likely already in progress but no harm to confirm
- clientComms.shutdownConnection(null, new MqttException(ex));
- }
- }
- }
- 复制代码
CommsCallback是Mqtt连接就已经开始一直运行,因此running为true,所以现在已经将token添加进了completeQueue完成队列中,CommsCallback跟发射器一样,一直轮询等待数据,因此此时completeQueue已有数据,此时CommsCallback的run函数则会有接下来的操作:
- // CommsCallback类:
- public void run() {
- ...
- while (running) {
- try {
- ...
- if (running) {
- // Check for deliveryComplete callbacks...
- MqttToken token = null;
- synchronized (completeQueue) {
- // completeQueue不为空
- if (!completeQueue.isEmpty()) {
- // 获取第一个token
- token = (MqttToken) completeQueue.elementAt(0);
- completeQueue.removeElementAt(0);
- }
- }
- if (null != token) {
- // token不为null,执行handleActionComplete
- handleActionComplete(token);
- }
- ...
- }
-
- if (quiescing) {
- clientState.checkQuiesceLock();
- }
-
- } catch (Throwable ex) {
- ...
- } finally {
- ...
- }
- }
- }
-
- private void handleActionComplete(MqttToken token)
- throws MqttException {
- final String methodName = "handleActionComplete";
- synchronized (token) {
- // 由上面已经,isComplete()已设置为true
- if (token.isComplete()) {
- // Finish by doing any post processing such as delete
- // from persistent store but only do so if the action
- // is complete
- clientState.notifyComplete(token);
- }
- // 取消阻止任何服务员,如果待完成,现在设置完成
- token.internalTok.notifyComplete();
- if (!token.internalTok.isNotified()) {
- ...
- // 现在调用异步操作完成回调
- fireActionEvent(token);
- }
- ...
- }
- }
- 复制代码
run中调用了handleActionComplete函数,接着后调用了clientState.notifyComplete()方法和fireActionEvent(token)方法,先看notifyComplete():
- // ClientState类:
- protected void notifyComplete(MqttToken token) throws MqttException {
-
- final String methodName = "notifyComplete";
- // 获取保存到Token中的Broker返回的消息,上面有说明
- MqttWireMessage message = token.internalTok.getWireMessage();
-
- if (message != null && message instanceof MqttAck) {
- ...
- MqttAck ack = (MqttAck) message;
-
- if (ack instanceof MqttPubAck) {
- // qos==1,用户通知现在从持久性中删除
- persistence.remove(getSendPersistenceKey(message));
- persistence.remove(getSendBufferedPersistenceKey(message));
- outboundQoS1.remove(new Integer(ack.getMessageId()));
- decrementInFlight();
- releaseMessageId(message.getMessageId());
- tokenStore.removeToken(message);
- // @TRACE 650=removed Qos 1 publish. key={0}
- log.fine(CLASS_NAME, methodName, "650",
- new Object[]{new Integer(ack.getMessageId())});
- } else if (ack instanceof MqttPubComp) {
- ...
- }
-
- checkQuiesceLock();
- }
- }
- 复制代码
再来看fireActionEvent(token)方法:
- // CommsCallback类:
- public void fireActionEvent(MqttToken token) {
- final String methodName = "fireActionEvent";
-
- if (token != null) {
- IMqttActionListener asyncCB = token.getActionCallback();
- if (asyncCB != null) {
- if (token.getException() == null) {
- ...
- asyncCB.onSuccess(token);
- } else {
- ...
- asyncCB.onFailure(token, token.getException());
- }
- }
- }
- }
- 复制代码
从这段代码中终于能看到回调onSuccess和onFailure的方法啦,那asyncCB是谁呢?
- // MqttToken类:
- public IMqttActionListener getActionCallback() {
- return internalTok.getActionCallback();
- }
- // Token类:
- public IMqttActionListener getActionCallback() {
- return callback;
- }
- 复制代码
看到这,一脸懵逼,这到底是谁呢,其实我们可以直接看这个回调设置方法,看看是从哪设置进来的就可以啦:
- // Token类:
- public void setActionCallback(IMqttActionListener listener) {
- this.callback = listener;
- }
- // MqttToken类:
- public void setActionCallback(IMqttActionListener listener) {
- internalTok.setActionCallback(listener);
- }
- // ConnectActionListener类:
- public void connect() throws MqttPersistenceException {
- // 初始化MqttToken
- MqttToken token = new MqttToken(client.getClientId());
- // 将此类设置成回调类
- token.setActionCallback(this);
- token.setUserContext(this);
-
- ...
- }
- 复制代码
其实早在MQTT连接时,就已经将此callback设置好,因此asyncCB就是ConnectActionListener,所以此时就已经走到了ConnectActionListener类里的onSuccess和onFailure的方法中,我们只挑一个onSuccess看一看:
- // ConnectActionListener类:
- public void onSuccess(IMqttToken token) {
- if (originalMqttVersion == MqttConnectOptions.MQTT_VERSION_DEFAULT) {
- options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_DEFAULT);
- }
- // 此时将Broker的数据保存进了userToken里
- userToken.internalTok.markComplete(token.getResponse(), null);
- userToken.internalTok.notifyComplete();
- userToken.internalTok.setClient(this.client);
-
- comms.notifyConnect();
-
- if (userCallback != null) {
- userToken.setUserContext(userContext);
- userCallback.onSuccess(userToken);
- }
-
- if (mqttCallbackExtended != null) {
- String serverURI =
- comms.getNetworkModules()[comms.getNetworkModuleIndex()].getServerURI();
- mqttCallbackExtended.connectComplete(reconnect, serverURI);
- }
-
- }
- 复制代码
这里的userCallback又是谁呢?上一篇其实说过的,userCallback其实就是MqttConnection.connect函数中IMqttActionListener listener,所以此时又来到了MqttConnection类里connect方法里的listener监听回调内:
- // MqttConnection类:
- public void connect(MqttConnectOptions options, String invocationContext,
- String activityToken) {
- ...
- service.traceDebug(TAG, "Connecting {" + serverURI + "} as {" + clientId + "}");
- final Bundle resultBundle = new Bundle();
- resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN,
- activityToken);
- resultBundle.putString(
- MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT,
- invocationContext);
- resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
- MqttServiceConstants.CONNECT_ACTION);
- try {
- ...
- // 此时逻辑已经来到这里
- IMqttActionListener listener = new MqttConnectionListener(
- resultBundle) {
-
- @Override
- public void onSuccess(IMqttToken asyncActionToken) {
- // 执行如下代码:
- doAfterConnectSuccess(resultBundle);
- service.traceDebug(TAG, "connect success!");
- }
-
- @Override
- public void onFailure(IMqttToken asyncActionToken,
- Throwable exception) {
- resultBundle.putString(
- MqttServiceConstants.CALLBACK_ERROR_MESSAGE,
- exception.getLocalizedMessage());
- resultBundle.putSerializable(
- MqttServiceConstants.CALLBACK_EXCEPTION, exception);
- service.traceError(TAG,
- "connect fail, call connect to reconnect.reason:"
- + exception.getMessage());
-
- doAfterConnectFail(resultBundle);
-
- }
- };
-
- if (myClient != null) {
- if (isConnecting) {
- ...
- } else {
- service.traceDebug(TAG, "myClient != null and the client is not connected");
- service.traceDebug(TAG, "Do Real connect!");
- setConnectingState(true);
- myClient.connect(connectOptions, invocationContext, listener);
- }
- }
-
- // if myClient is null, then create a new connection
- else {
- ...
- myClient.connect(connectOptions, invocationContext, listener);
- }
- } catch (Exception e) {
- ...
- }
- }
- 复制代码
由这段代码以及注释可以知道,现在以及执行到了MqttConnection类里的doAfterConnectSuccess方法里:
- // MqttConnection类:
- private void doAfterConnectSuccess(final Bundle resultBundle) {
- // 获取唤醒锁
- acquireWakeLock();
- service.callbackToActivity(clientHandle, Status.OK, resultBundle);
- deliverBacklog();
- setConnectingState(false);
- disconnected = false;
- // 释放唤醒锁
- releaseWakeLock();
- }
-
- private void deliverBacklog() {
- Iterator<StoredMessage> backlog = service.messageStore
- .getAllArrivedMessages(clientHandle);
- while (backlog.hasNext()) {
- StoredMessage msgArrived = backlog.next();
- Bundle resultBundle = messageToBundle(msgArrived.getMessageId(),
- msgArrived.getTopic(), msgArrived.getMessage());
- // 关注下这个action,下面会用到
- resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
- MqttServiceConstants.MESSAGE_ARRIVED_ACTION);
- service.callbackToActivity(clientHandle, Status.OK, resultBundle);
- }
- }
- 复制代码
可以看到这个函数中调用了几个方法中的其中两个service.callbackToActivity(clientHandle, Status.OK, resultBundle);和deliverBacklog();,deliverBacklog()方法最后也是调用的service.callbackToActivity方法。所以直接看service.callbackToActivity:
- // MqttService类:
- void callbackToActivity(String clientHandle, Status status,
- Bundle dataBundle) {
- // 发送广播
- Intent callbackIntent = new Intent(
- MqttServiceConstants.CALLBACK_TO_ACTIVITY);
- if (clientHandle != null) {
- callbackIntent.putExtra(
- MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle);
- }
- callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status);
- if (dataBundle != null) {
- callbackIntent.putExtras(dataBundle);
- }
- LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent);
- }
- 复制代码
service.callbackToActivity方法其实就是发送广播,那谁来接收广播呢?其实接收广播的就在最开始的MqttAndroidClient,MqttAndroidClient继承自BroadcastReceiver,所以说MqttAndroidClient本身就是一个广播接收者,所以我们来看它的onReceive方法:
- // MqttAndroidClient类:
- @Override
- public void onReceive(Context context, Intent intent) {
- Bundle data = intent.getExtras();
-
- String handleFromIntent = data
- .getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE);
-
- if ((handleFromIntent == null)
- || (!handleFromIntent.equals(clientHandle))) {
- return;
- }
-
- String action = data.getString(MqttServiceConstants.CALLBACK_ACTION);
- // 判断消息的action类型
- if (MqttServiceConstants.CONNECT_ACTION.equals(action)) {
- connectAction(data);
- } else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) {
- connectExtendedAction(data);
- } else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) {
- messageArrivedAction(data);
- } else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) {
- subscribeAction(data);
- } else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) {
- unSubscribeAction(data);
- } else if (MqttServiceConstants.SEND_ACTION.equals(action)) {
- // 发布成功与否的回调
- sendAction(data);
- } else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) {
- messageDeliveredAction(data);
- } else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION
- .equals(action)) {
- connectionLostAction(data);
- } else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) {
- disconnected(data);
- } else if (MqttServiceConstants.TRACE_ACTION.equals(action)) {
- traceAction(data);
- } else {
- mqttService.traceError(MqttService.TAG, "Callback action doesn't exist.");
- }
- }
- 复制代码
从代码和注释以及上面的deliverBacklog方法中可以知道,我们现在需要关注的action为MESSAGE_ARRIVED_ACTION,所以就可以调用方法messageArrivedAction(data):
- // MqttAndroidClient类:
- private void messageArrivedAction(Bundle data) {
- if (callback != null) {
- String messageId = data
- .getString(MqttServiceConstants.CALLBACK_MESSAGE_ID);
- String destinationName = data
- .getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME);
-
- ParcelableMqttMessage message = data
- .getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL);
- try {
- if (messageAck == Ack.AUTO_ACK) {
- callback.messageArrived(destinationName, message);
- mqttService.acknowledgeMessageArrival(clientHandle, messageId);
- } else {
- message.messageId = messageId;
- callback.messageArrived(destinationName, message);
- }
-
- // let the service discard the saved message details
- } catch (Exception e) {
- // Swallow the exception
- }
- }
- }
-
- @Override
- public void setCallback(MqttCallback callback) {
- this.callback = callback;
- }
- 复制代码
在messageArrivedAction方法中可以看到,我们最后调用了callback回调了messageArrived方法,那么 callback通过上面下部分代码可以知道,其实这个callback就是我们上一篇文章中所说的我们初始化MqttAndroidClient后,通过方法setCallback设置的我们自己定义的实现MqttCallback接口的回调类。
监听Broker返回的消息之发布消息成功与否
再看下sendAction(data)方法:
- private void sendAction(Bundle data) {
- IMqttToken token = getMqttToken(data);
- // remove on delivery
- simpleAction(token, data);
- }
-
- private void simpleAction(IMqttToken token, Bundle data) {
- if (token != null) {
- Status status = (Status) data
- .getSerializable(MqttServiceConstants.CALLBACK_STATUS);
- if (status == Status.OK) {
- // 如果发布成功回调此方法
- ((MqttTokenAndroid) token).notifyComplete();
- } else {
- Exception exceptionThrown =
- (Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION);
- // 发布失败回调
- ((MqttTokenAndroid) token).notifyFailure(exceptionThrown);
- }
- } else {
- if (mqttService != null) {
- mqttService.traceError(MqttService.TAG, "simpleAction : token is null");
- }
- }
- }
- 复制代码
接下来再看一看发布成功回调的MqttTokenAndroid的notifyComplete函数:
- // MqttTokenAndroid类:
- void notifyComplete() {
- synchronized (waitObject) {
- isComplete = true;
- waitObject.notifyAll();
- if (listener != null) {
- listener.onSuccess(this);
- }
- }
- }
- 复制代码
这里又调用了listener.onSuccess(this)方法,那么这个listener是谁?其实listener就是我们调用MqttAndroidClient类的publish发布的最后一个参数,即我们自定义的监听发布消息是否发布成功的回调类。上面在MqttConnection类的publish方法中封装过MqttServiceConstants.SEND_ACTION的Bundle数据,而此数据是被MqttConnection类里的MqttConnectionListener携带。所以MqttConnectionListener里的onSuccess被调用时就会调用service.callbackToActivity,继而到sendBroadcast发送广播,最后调用sendAction方法,回调自定义的IMqttActionListener的实现类。而MqttConnectionListener里的onSuccess是在CommsCallback类里的fireActionEvent方法中,往上走就到CommsCallback类的了handleActionComplete和run()函数。
现在看是不是有点懵毕竟上面有两个 监听Broker返回的消息,一个是用来监听Broker发给客户端数据的监听,另一个是客户端发布消息是否发布成功的监听而已。两者都是使用MqttActionListener,不过前者在MqttActionListener监听回调里最后调用的是自定义的MqttCallback回调而已。并且两者监听的位置不一样,前者是在 MqttConnection类的connect时就已确认下来的,对于一个MQTT连接只会有一个,所以这个是一直用来监听数据的;而后者监听发布消息是否成功是每个publish都需要传入的,并在MqttConnection类里的publish初始化。这么讲是不是就清晰一些啦。
哈哈,到此MQTT的publish发布以及接收Broker数据的源码分析也看完啦。
(注:若有什么地方阐述有误,敬请指正。)