当前位置:   article > 正文

物联网协议之MQTT源码分析(二)

mqttclientt publish(string topic, byte[] message);

此篇文章继上一篇物联网协议之MQTT源码分析(一)而写的第二篇MQTT发布消息以及接收Broker消息的源码分析,想看MQTT连接的小伙伴可以去看我上一篇哦。

juejin.im/post/5cd66c…

MQTT发布消息

MQTT发布消息是由MqttAndroidClient类的publish函数执行的,我们来看看这个函数:

  1. // MqttAndroidClient类:
  2. @Override
  3. public IMqttDeliveryToken publish(String topic, byte[] payload, int qos,
  4. boolean retained, Object userContext,
  5. IMqttActionListener callback)
  6. throws MqttException, MqttPersistenceException {
  7. // 将消息内容、qos消息等级、retained消息是否保留封装成MqttMessage
  8. MqttMessage message = new MqttMessage(payload);
  9. message.setQos(qos);
  10. message.setRetained(retained);
  11. // 每一条消息都有自己的token
  12. MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid(
  13. this, userContext, callback, message);
  14. String activityToken = storeToken(token);
  15. IMqttDeliveryToken internalToken = mqttService.publish(clientHandle,
  16. topic, payload, qos, retained, null, activityToken);
  17. token.setDelegate(internalToken);
  18. return token;
  19. }
  20. 复制代码

从上面代码可以看出,发布消息需要topic消息主题、payload消息内容、callback回调监听等,经由mqttService.publish继续执行发布操作:

  1. // MqttService类:MQTT唯一组件
  2. public IMqttDeliveryToken publish(String clientHandle, String topic,
  3. byte[] payload, int qos, boolean retained,
  4. String invocationContext, String activityToken)
  5. throws MqttPersistenceException, MqttException {
  6. MqttConnection client = getConnection(clientHandle);
  7. return client.publish(topic, payload, qos, retained, invocationContext,
  8. activityToken);
  9. }
  10. 复制代码

MqttConnection在上一篇中讲解过,MQTT的连接会初始化一个MqttConnection,并保存在一个Map集合connections中,并通过getConnection(clientHandle)方法获取。很明显我们要接着看client.publish函数啦:

  1. // MqttConnection类:
  2. public IMqttDeliveryToken publish(String topic, byte[] payload, int qos,
  3. boolean retained, String invocationContext,
  4. String activityToken) {
  5. // 用于发布消息,是否发布成功的回调
  6. final Bundle resultBundle = new Bundle();
  7. resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
  8. MqttServiceConstants.SEND_ACTION);
  9. resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN,
  10. activityToken);
  11. resultBundle.putString(
  12. MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT,
  13. invocationContext);
  14. IMqttDeliveryToken sendToken = null;
  15. if ((myClient != null) && (myClient.isConnected())) {
  16. // 携带resultBundle数据,用于监听回调发布消息是否成功
  17. IMqttActionListener listener = new MqttConnectionListener(
  18. resultBundle);
  19. try {
  20. MqttMessage message = new MqttMessage(payload);
  21. message.setQos(qos);
  22. message.setRetained(retained);
  23. sendToken = myClient.publish(topic, payload, qos, retained,
  24. invocationContext, listener);
  25. storeSendDetails(topic, message, sendToken, invocationContext,
  26. activityToken);
  27. } catch (Exception e) {
  28. handleException(resultBundle, e);
  29. }
  30. } else {
  31. resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE,
  32. NOT_CONNECTED);
  33. service.traceError(MqttServiceConstants.SEND_ACTION, NOT_CONNECTED);
  34. service.callbackToActivity(clientHandle, Status.ERROR, resultBundle);
  35. }
  36. return sendToken;
  37. }
  38. 复制代码

这段代码中很明显可以看出发布的操作又交给了myClient.publish方法,那myClient是谁呢?上一篇文章中讲过myClient是MqttAsyncClient,是在MQTT连接时在MqttConnection类的connect方法中初始化的,详情请看上一篇。

  1. // MqttAsyncClient类:
  2. public IMqttDeliveryToken publish(String topic, byte[] payload, int qos
  3. , boolean retained,Object userContext,
  4. IMqttActionListener callback) throws MqttException,MqttPersistenceException {
  5. MqttMessage message = new MqttMessage(payload);
  6. message.setQos(qos);
  7. message.setRetained(retained);
  8. return this.publish(topic, message, userContext, callback);
  9. }
  10. public IMqttDeliveryToken publish(String topic, MqttMessage message
  11. , Object userContext,
  12. IMqttActionListener callback) throws MqttException,MqttPersistenceException {
  13. final String methodName = "publish";
  14. // @TRACE 111=< topic={0} message={1}userContext={1} callback={2}
  15. log.fine(CLASS_NAME, methodName, "111", new Object[]{topic, userContext, callback});
  16. // Checks if a topic is valid when publishing a message.
  17. MqttTopic.validate(topic, false/* wildcards NOT allowed */);
  18. MqttDeliveryToken token = new MqttDeliveryToken(getClientId());
  19. token.setActionCallback(callback);
  20. token.setUserContext(userContext);
  21. token.setMessage(message);
  22. token.internalTok.setTopics(new String[]{topic});
  23. MqttPublish pubMsg = new MqttPublish(topic, message);
  24. comms.sendNoWait(pubMsg, token);
  25. // @TRACE 112=<
  26. log.fine(CLASS_NAME, methodName, "112");
  27. return token;
  28. }
  29. 复制代码

从这段代码中可以看到,现在把把topic和message封装成了MqttPublish类型的消息,并继续由comms.sendNoWait执行,comms是ClientComms,ClientComms是在初始化MqttAsyncClient的构造方法中初始化的,详情看上一篇。

  1. // ClientComms类:
  2. public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException {
  3. final String methodName = "sendNoWait";
  4. // 判断状态或者消息类型
  5. if (isConnected() ||
  6. (!isConnected() && message instanceof MqttConnect) ||
  7. (isDisconnecting() && message instanceof MqttDisconnect)) {
  8. if (disconnectedMessageBuffer != null && disconnectedMessageBuffer.getMessageCount() != 0) {
  9. //@TRACE 507=Client Connected, Offline Buffer available, but not empty. Adding
  10. // message to buffer. message={0}
  11. log.fine(CLASS_NAME, methodName, "507", new Object[]{message.getKey()});
  12. if (disconnectedMessageBuffer.isPersistBuffer()) {
  13. this.clientState.persistBufferedMessage(message);
  14. }
  15. disconnectedMessageBuffer.putMessage(message, token);
  16. } else {
  17. // 现在不是disconnect因此,逻辑走这里
  18. this.internalSend(message, token);
  19. }
  20. } else if (disconnectedMessageBuffer != null) {
  21. //@TRACE 508=Offline Buffer available. Adding message to buffer. message={0}
  22. log.fine(CLASS_NAME, methodName, "508", new Object[]{message.getKey()});
  23. if (disconnectedMessageBuffer.isPersistBuffer()) {
  24. this.clientState.persistBufferedMessage(message);
  25. }
  26. disconnectedMessageBuffer.putMessage(message, token);
  27. } else {
  28. //@TRACE 208=failed: not connected
  29. log.fine(CLASS_NAME, methodName, "208");
  30. throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
  31. }
  32. }
  33. void internalSend(MqttWireMessage message, MqttToken token) throws MqttException {
  34. final String methodName = "internalSend";
  35. ...
  36. try {
  37. // Persist if needed and send the message
  38. this.clientState.send(message, token);
  39. } catch (MqttException e) {
  40. // 注意此处代码***
  41. if (message instanceof MqttPublish) {
  42. this.clientState.undo((MqttPublish) message);
  43. }
  44. throw e;
  45. }
  46. }
  47. 复制代码

comms.sendNoWait方法中又调用了本类中的internalSend方法,并且在internalSend方法中又调用了clientState.send(message, token)方法继续发布。ClientState对象是在ClientComms初始化的构造方法中初始化的。此处需要注意一下catch里的代码,下面会具体说明。

  1. // ClientState类:
  2. public void send(MqttWireMessage message, MqttToken token) throws MqttException {
  3. final String methodName = "send";
  4. ...
  5. if (message instanceof MqttPublish) {
  6. synchronized (queueLock) {
  7. /**
  8. * 注意这里:actualInFlight实际飞行中>maxInflight最大飞行中
  9. * maxInflight:是我们在自己代码中通过连接选项MqttConnectOptions.setMaxInflight();设置的,默认大小为10
  10. */
  11. if (actualInFlight >= this.maxInflight) {
  12. //@TRACE 613= sending {0} msgs at max inflight window
  13. log.fine(CLASS_NAME, methodName, "613",
  14. new Object[]{new Integer(actualInFlight)});
  15. throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
  16. }
  17. MqttMessage innerMessage = ((MqttPublish) message).getMessage();
  18. //@TRACE 628=pending publish key={0} qos={1} message={2}
  19. log.fine(CLASS_NAME, methodName, "628",
  20. new Object[]{new Integer(message.getMessageId()),
  21. new Integer(innerMessage.getQos()), message});
  22. /**
  23. * 根据自己设置的qos等级,来决定是否需要恢复消息
  24. * 这里需要说明一下qos等级区别:
  25. * qos==0,至多发送一次,不进行重试,Broker不会返回确认消息。
  26. * qos==1,至少发送一次,确保消息到达Broker,Broker需要返回确认消息PUBACK
  27. * qos==2,Broker肯定会收到消息,且只收到一次,qos==1可能会发送重复消息
  28. */
  29. switch (innerMessage.getQos()) {
  30. case 2:
  31. outboundQoS2.put(new Integer(message.getMessageId()), message);
  32. persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
  33. break;
  34. case 1:
  35. outboundQoS1.put(new Integer(message.getMessageId()), message);
  36. persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
  37. break;
  38. }
  39. tokenStore.saveToken(token, message);
  40. pendingMessages.addElement(message);
  41. queueLock.notifyAll();
  42. }
  43. } else {
  44. ...
  45. }
  46. }
  47. 复制代码

这段代码中我们发现了一个可能需要我们自己设置的属性maxInflight,如果实际发送中的消息大于maxInflight约束的最大的话就会抛出MqttException异常,那么这个异常catch里是怎么处理的呢,这就要往回看一步代码啦,上面已经提示过需要注意ClientComms类中internalSend方法中的catch里的代码:

  1. if (message instanceof MqttPublish) {
  2. this.clientState.undo((MqttPublish) message);
  3. }
  4. 复制代码

可以很明确的看出若消息类型是MqttPublish,则执行clientState.undo((MqttPublish) message)方法,我们前面说过消息已经在MqttAsyncClient类的publish方法中把topic和message封装成了MqttPublish类型的消息,因此此处会执行undo方法:

  1. // ClientState类:
  2. protected void undo(MqttPublish message) throws MqttPersistenceException {
  3. final String methodName = "undo";
  4. synchronized (queueLock) {
  5. //@TRACE 618=key={0} QoS={1}
  6. log.fine(CLASS_NAME, methodName, "618",
  7. new Object[]{new Integer(message.getMessageId()),
  8. new Integer(message.getMessage().getQos())});
  9. if (message.getMessage().getQos() == 1) {
  10. outboundQoS1.remove(new Integer(message.getMessageId()));
  11. } else {
  12. outboundQoS2.remove(new Integer(message.getMessageId()));
  13. }
  14. pendingMessages.removeElement(message);
  15. persistence.remove(getSendPersistenceKey(message));
  16. tokenStore.removeToken(message);
  17. if (message.getMessage().getQos() > 0) {
  18. //Free this message Id so it can be used again
  19. releaseMessageId(message.getMessageId());
  20. message.setMessageId(0);
  21. }
  22. checkQuiesceLock();
  23. }
  24. }
  25. 复制代码

代码已经很明显了,就是把大于maxInflight这部分消息remove移除掉,因此在实际操作中要注意自己的Mqtt消息的发布会不会在短时间内达到maxInflight默认的10的峰值,若能达到,则需要手动设置一个适合自己项目的范围阀值啦。

我们继续说clientState.send(message, token)方法里的逻辑,代码中注释中也说明了Mqtt会根据qos等级来决定消息到达机制

qos等级

  • qos==0,至多发送一次,不进行重试,Broker不会返回确认消息,消息可能会丢失。
  • qos==1,至少发送一次,确保消息到达Broker,Broker需要返回确认消息PUBACK,可能会发送重复消息
  • qos==2,Broker肯定会收到消息,且只收到一次

根据qos等级,若qos等于1和2,则讲消息分别加入Hashtable类型的outboundQoS1和outboundQoS2中,已在后续逻辑中确保消息发送成功并到达。

注:qos等级优先级没有maxInflight高,从代码中可以看出,会先判断maxInflight再区分qos等级

代码的最后讲消息添加进Vector类型的pendingMessages里,在上一篇中我们可以了解到MQTT的发射器是轮询检查pendingMessages里是否存在数据,若存在则通过socket的OutputStream发送出去。并且会通过接收器接收从Broker发送回来的数据。

监听Broker返回的消息之数据

发送我们就不看源码啦,接收我们再看一下源码,通过源码看一看数据是怎么回到我们自己的回调里的:

  1. // CommsReceiver类中:
  2. public void run() {
  3. recThread = Thread.currentThread();
  4. recThread.setName(threadName);
  5. final String methodName = "run";
  6. MqttToken token = null;
  7. try {
  8. runningSemaphore.acquire();
  9. } catch (InterruptedException e) {
  10. running = false;
  11. return;
  12. }
  13. while (running && (in != null)) {
  14. try {
  15. //@TRACE 852=network read message
  16. log.fine(CLASS_NAME, methodName, "852");
  17. receiving = in.available() > 0;
  18. MqttWireMessage message = in.readMqttWireMessage();
  19. receiving = false;
  20. // 消息是否属于Mqtt确认类型
  21. if (message instanceof MqttAck) {
  22. token = tokenStore.getToken(message);
  23. // token一般不会为空,前面已经保存过
  24. if (token != null) {
  25. synchronized (token) {
  26. // ...
  27. clientState.notifyReceivedAck((MqttAck) message);
  28. }
  29. }
  30. ...
  31. } finally {
  32. receiving = false;
  33. runningSemaphore.release();
  34. }
  35. }
  36. }
  37. 复制代码

从代码中可以看出,Broker返回来的数据交给了clientState.notifyReceivedAck方法:

  1. // ClientState类:
  2. protected void notifyReceivedAck(MqttAck ack) throws MqttException {
  3. final String methodName = "notifyReceivedAck";
  4. ...
  5. MqttToken token = tokenStore.getToken(ack);
  6. MqttException mex = null;
  7. if (token == null) {
  8. ...
  9. } else if (ack instanceof MqttPubRec) {
  10. // qos==2 是返回
  11. MqttPubRel rel = new MqttPubRel((MqttPubRec) ack);
  12. this.send(rel, token);
  13. } else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {
  14. // qos==1/2 消息移除前通知的结果
  15. notifyResult(ack, token, mex);
  16. // Do not remove publish / delivery token at this stage
  17. // do this when the persistence is removed later
  18. } else if (ack instanceof MqttPingResp) {
  19. // 连接心跳数据消息
  20. ...
  21. } else if (ack instanceof MqttConnack) {
  22. // MQTT连接消息
  23. ...
  24. } else {
  25. notifyResult(ack, token, mex);
  26. releaseMessageId(ack.getMessageId());
  27. tokenStore.removeToken(ack);
  28. }
  29. checkQuiesceLock();
  30. }
  31. 复制代码

从上面注释可知,发布的消息qos==0,返回结果是直接走else,而qos==1/2,确认消息也最终会走到notifyResult(ack, token, mex)方法中:

  1. protected void notifyResult(MqttWireMessage ack, MqttToken token, MqttException ex) {
  2. final String methodName = "notifyResult";
  3. // 取消阻止等待令牌的任何线程,并保存ack
  4. token.internalTok.markComplete(ack, ex);
  5. // 通知此令牌已收到响应消息,设置已完成状态,并通过isComplete()获取状态
  6. token.internalTok.notifyComplete();
  7. // 让用户知道异步操作已完成,然后删除令牌
  8. if (ack != null && ack instanceof MqttAck && !(ack instanceof MqttPubRec)) {
  9. //@TRACE 648=key{0}, msg={1}, excep={2}
  10. log.fine(CLASS_NAME, methodName, "648", new Object[]{token.internalTok.getKey(), ack,ex});
  11. // CommsCallback类
  12. callback.asyncOperationComplete(token);
  13. }
  14. // 有些情况下,由于操作失败,因此没有确认
  15. if (ack == null) {
  16. //@TRACE 649=key={0},excep={1}
  17. log.fine(CLASS_NAME, methodName, "649", new Object[]{token.internalTok.getKey(), ex});
  18. callback.asyncOperationComplete(token);
  19. }
  20. }
  21. // Token类:
  22. protected void markComplete(MqttWireMessage msg, MqttException ex) {
  23. final String methodName = "markComplete";
  24. //@TRACE 404=>key={0} response={1} excep={2}
  25. log.fine(CLASS_NAME, methodName, "404", new Object[]{getKey(), msg, ex});
  26. synchronized (responseLock) {
  27. // ACK means that everything was OK, so mark the message for garbage collection.
  28. if (msg instanceof MqttAck) {
  29. this.message = null;
  30. }
  31. this.pendingComplete = true;
  32. // 将消息保存在response成员变量中,并通过getWireMessage()方法获取消息msg
  33. this.response = msg;
  34. this.exception = ex;
  35. }
  36. }
  37. // Token类:
  38. protected void notifyComplete() {
  39. ...
  40. synchronized (responseLock) {
  41. ...
  42. if (exception == null && pendingComplete) {
  43. // 设置已完成,并通过isComplete()获取状态
  44. completed = true;
  45. pendingComplete = false;
  46. } else {
  47. pendingComplete = false;
  48. }
  49. responseLock.notifyAll();
  50. }
  51. ...
  52. }
  53. 复制代码

此时已将MqttWireMessage消息保存到token中,异步操作已完成,调用回调监听CommsCallback里的asyncOperationComplete方法:

  1. // CommsCallback类:
  2. public void asyncOperationComplete(MqttToken token) {
  3. final String methodName = "asyncOperationComplete";
  4. if (running) {
  5. // invoke callbacks on callback thread
  6. completeQueue.addElement(token);
  7. synchronized (workAvailable) {
  8. // @TRACE 715=new workAvailable. key={0}
  9. log.fine(CLASS_NAME, methodName, "715", new Object[]{token.internalTok.getKey()});
  10. workAvailable.notifyAll();
  11. }
  12. } else {
  13. // invoke async callback on invokers thread
  14. try {
  15. handleActionComplete(token);
  16. } catch (Throwable ex) {
  17. // Users code could throw an Error or Exception e.g. in the case
  18. // of class NoClassDefFoundError
  19. // @TRACE 719=callback threw ex:
  20. log.fine(CLASS_NAME, methodName, "719", null, ex);
  21. // Shutdown likely already in progress but no harm to confirm
  22. clientComms.shutdownConnection(null, new MqttException(ex));
  23. }
  24. }
  25. }
  26. 复制代码

CommsCallback是Mqtt连接就已经开始一直运行,因此running为true,所以现在已经将token添加进了completeQueue完成队列中,CommsCallback跟发射器一样,一直轮询等待数据,因此此时completeQueue已有数据,此时CommsCallback的run函数则会有接下来的操作:

  1. // CommsCallback类:
  2. public void run() {
  3. ...
  4. while (running) {
  5. try {
  6. ...
  7. if (running) {
  8. // Check for deliveryComplete callbacks...
  9. MqttToken token = null;
  10. synchronized (completeQueue) {
  11. // completeQueue不为空
  12. if (!completeQueue.isEmpty()) {
  13. // 获取第一个token
  14. token = (MqttToken) completeQueue.elementAt(0);
  15. completeQueue.removeElementAt(0);
  16. }
  17. }
  18. if (null != token) {
  19. // token不为null,执行handleActionComplete
  20. handleActionComplete(token);
  21. }
  22. ...
  23. }
  24. if (quiescing) {
  25. clientState.checkQuiesceLock();
  26. }
  27. } catch (Throwable ex) {
  28. ...
  29. } finally {
  30. ...
  31. }
  32. }
  33. }
  34. private void handleActionComplete(MqttToken token)
  35. throws MqttException {
  36. final String methodName = "handleActionComplete";
  37. synchronized (token) {
  38. // 由上面已经,isComplete()已设置为true
  39. if (token.isComplete()) {
  40. // Finish by doing any post processing such as delete
  41. // from persistent store but only do so if the action
  42. // is complete
  43. clientState.notifyComplete(token);
  44. }
  45. // 取消阻止任何服务员,如果待完成,现在设置完成
  46. token.internalTok.notifyComplete();
  47. if (!token.internalTok.isNotified()) {
  48. ...
  49. // 现在调用异步操作完成回调
  50. fireActionEvent(token);
  51. }
  52. ...
  53. }
  54. }
  55. 复制代码

run中调用了handleActionComplete函数,接着后调用了clientState.notifyComplete()方法和fireActionEvent(token)方法,先看notifyComplete():

  1. // ClientState类:
  2. protected void notifyComplete(MqttToken token) throws MqttException {
  3. final String methodName = "notifyComplete";
  4. // 获取保存到Token中的Broker返回的消息,上面有说明
  5. MqttWireMessage message = token.internalTok.getWireMessage();
  6. if (message != null && message instanceof MqttAck) {
  7. ...
  8. MqttAck ack = (MqttAck) message;
  9. if (ack instanceof MqttPubAck) {
  10. // qos==1,用户通知现在从持久性中删除
  11. persistence.remove(getSendPersistenceKey(message));
  12. persistence.remove(getSendBufferedPersistenceKey(message));
  13. outboundQoS1.remove(new Integer(ack.getMessageId()));
  14. decrementInFlight();
  15. releaseMessageId(message.getMessageId());
  16. tokenStore.removeToken(message);
  17. // @TRACE 650=removed Qos 1 publish. key={0}
  18. log.fine(CLASS_NAME, methodName, "650",
  19. new Object[]{new Integer(ack.getMessageId())});
  20. } else if (ack instanceof MqttPubComp) {
  21. ...
  22. }
  23. checkQuiesceLock();
  24. }
  25. }
  26. 复制代码

再来看fireActionEvent(token)方法:

  1. // CommsCallback类:
  2. public void fireActionEvent(MqttToken token) {
  3. final String methodName = "fireActionEvent";
  4. if (token != null) {
  5. IMqttActionListener asyncCB = token.getActionCallback();
  6. if (asyncCB != null) {
  7. if (token.getException() == null) {
  8. ...
  9. asyncCB.onSuccess(token);
  10. } else {
  11. ...
  12. asyncCB.onFailure(token, token.getException());
  13. }
  14. }
  15. }
  16. }
  17. 复制代码

从这段代码中终于能看到回调onSuccess和onFailure的方法啦,那asyncCB是谁呢?

  1. // MqttToken类:
  2. public IMqttActionListener getActionCallback() {
  3. return internalTok.getActionCallback();
  4. }
  5. // Token类:
  6. public IMqttActionListener getActionCallback() {
  7. return callback;
  8. }
  9. 复制代码

看到这,一脸懵逼,这到底是谁呢,其实我们可以直接看这个回调设置方法,看看是从哪设置进来的就可以啦:

  1. // Token类:
  2. public void setActionCallback(IMqttActionListener listener) {
  3. this.callback = listener;
  4. }
  5. // MqttToken类:
  6. public void setActionCallback(IMqttActionListener listener) {
  7. internalTok.setActionCallback(listener);
  8. }
  9. // ConnectActionListener类:
  10. public void connect() throws MqttPersistenceException {
  11. // 初始化MqttToken
  12. MqttToken token = new MqttToken(client.getClientId());
  13. // 将此类设置成回调类
  14. token.setActionCallback(this);
  15. token.setUserContext(this);
  16. ...
  17. }
  18. 复制代码

其实早在MQTT连接时,就已经将此callback设置好,因此asyncCB就是ConnectActionListener,所以此时就已经走到了ConnectActionListener类里的onSuccess和onFailure的方法中,我们只挑一个onSuccess看一看:

  1. // ConnectActionListener类:
  2. public void onSuccess(IMqttToken token) {
  3. if (originalMqttVersion == MqttConnectOptions.MQTT_VERSION_DEFAULT) {
  4. options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_DEFAULT);
  5. }
  6. // 此时将Broker的数据保存进了userToken里
  7. userToken.internalTok.markComplete(token.getResponse(), null);
  8. userToken.internalTok.notifyComplete();
  9. userToken.internalTok.setClient(this.client);
  10. comms.notifyConnect();
  11. if (userCallback != null) {
  12. userToken.setUserContext(userContext);
  13. userCallback.onSuccess(userToken);
  14. }
  15. if (mqttCallbackExtended != null) {
  16. String serverURI =
  17. comms.getNetworkModules()[comms.getNetworkModuleIndex()].getServerURI();
  18. mqttCallbackExtended.connectComplete(reconnect, serverURI);
  19. }
  20. }
  21. 复制代码

这里的userCallback又是谁呢?上一篇其实说过的,userCallback其实就是MqttConnection.connect函数中IMqttActionListener listener,所以此时又来到了MqttConnection类里connect方法里的listener监听回调内:

  1. // MqttConnection类:
  2. public void connect(MqttConnectOptions options, String invocationContext,
  3. String activityToken) {
  4. ...
  5. service.traceDebug(TAG, "Connecting {" + serverURI + "} as {" + clientId + "}");
  6. final Bundle resultBundle = new Bundle();
  7. resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN,
  8. activityToken);
  9. resultBundle.putString(
  10. MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT,
  11. invocationContext);
  12. resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
  13. MqttServiceConstants.CONNECT_ACTION);
  14. try {
  15. ...
  16. // 此时逻辑已经来到这里
  17. IMqttActionListener listener = new MqttConnectionListener(
  18. resultBundle) {
  19. @Override
  20. public void onSuccess(IMqttToken asyncActionToken) {
  21. // 执行如下代码:
  22. doAfterConnectSuccess(resultBundle);
  23. service.traceDebug(TAG, "connect success!");
  24. }
  25. @Override
  26. public void onFailure(IMqttToken asyncActionToken,
  27. Throwable exception) {
  28. resultBundle.putString(
  29. MqttServiceConstants.CALLBACK_ERROR_MESSAGE,
  30. exception.getLocalizedMessage());
  31. resultBundle.putSerializable(
  32. MqttServiceConstants.CALLBACK_EXCEPTION, exception);
  33. service.traceError(TAG,
  34. "connect fail, call connect to reconnect.reason:"
  35. + exception.getMessage());
  36. doAfterConnectFail(resultBundle);
  37. }
  38. };
  39. if (myClient != null) {
  40. if (isConnecting) {
  41. ...
  42. } else {
  43. service.traceDebug(TAG, "myClient != null and the client is not connected");
  44. service.traceDebug(TAG, "Do Real connect!");
  45. setConnectingState(true);
  46. myClient.connect(connectOptions, invocationContext, listener);
  47. }
  48. }
  49. // if myClient is null, then create a new connection
  50. else {
  51. ...
  52. myClient.connect(connectOptions, invocationContext, listener);
  53. }
  54. } catch (Exception e) {
  55. ...
  56. }
  57. }
  58. 复制代码

由这段代码以及注释可以知道,现在以及执行到了MqttConnection类里的doAfterConnectSuccess方法里:

  1. // MqttConnection类:
  2. private void doAfterConnectSuccess(final Bundle resultBundle) {
  3. // 获取唤醒锁
  4. acquireWakeLock();
  5. service.callbackToActivity(clientHandle, Status.OK, resultBundle);
  6. deliverBacklog();
  7. setConnectingState(false);
  8. disconnected = false;
  9. // 释放唤醒锁
  10. releaseWakeLock();
  11. }
  12. private void deliverBacklog() {
  13. Iterator<StoredMessage> backlog = service.messageStore
  14. .getAllArrivedMessages(clientHandle);
  15. while (backlog.hasNext()) {
  16. StoredMessage msgArrived = backlog.next();
  17. Bundle resultBundle = messageToBundle(msgArrived.getMessageId(),
  18. msgArrived.getTopic(), msgArrived.getMessage());
  19. // 关注下这个action,下面会用到
  20. resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
  21. MqttServiceConstants.MESSAGE_ARRIVED_ACTION);
  22. service.callbackToActivity(clientHandle, Status.OK, resultBundle);
  23. }
  24. }
  25. 复制代码

可以看到这个函数中调用了几个方法中的其中两个service.callbackToActivity(clientHandle, Status.OK, resultBundle);和deliverBacklog();,deliverBacklog()方法最后也是调用的service.callbackToActivity方法。所以直接看service.callbackToActivity:

  1. // MqttService类:
  2. void callbackToActivity(String clientHandle, Status status,
  3. Bundle dataBundle) {
  4. // 发送广播
  5. Intent callbackIntent = new Intent(
  6. MqttServiceConstants.CALLBACK_TO_ACTIVITY);
  7. if (clientHandle != null) {
  8. callbackIntent.putExtra(
  9. MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle);
  10. }
  11. callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status);
  12. if (dataBundle != null) {
  13. callbackIntent.putExtras(dataBundle);
  14. }
  15. LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent);
  16. }
  17. 复制代码

service.callbackToActivity方法其实就是发送广播,那谁来接收广播呢?其实接收广播的就在最开始的MqttAndroidClient,MqttAndroidClient继承自BroadcastReceiver,所以说MqttAndroidClient本身就是一个广播接收者,所以我们来看它的onReceive方法:

  1. // MqttAndroidClient类:
  2. @Override
  3. public void onReceive(Context context, Intent intent) {
  4. Bundle data = intent.getExtras();
  5. String handleFromIntent = data
  6. .getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE);
  7. if ((handleFromIntent == null)
  8. || (!handleFromIntent.equals(clientHandle))) {
  9. return;
  10. }
  11. String action = data.getString(MqttServiceConstants.CALLBACK_ACTION);
  12. // 判断消息的action类型
  13. if (MqttServiceConstants.CONNECT_ACTION.equals(action)) {
  14. connectAction(data);
  15. } else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) {
  16. connectExtendedAction(data);
  17. } else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) {
  18. messageArrivedAction(data);
  19. } else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) {
  20. subscribeAction(data);
  21. } else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) {
  22. unSubscribeAction(data);
  23. } else if (MqttServiceConstants.SEND_ACTION.equals(action)) {
  24. // 发布成功与否的回调
  25. sendAction(data);
  26. } else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) {
  27. messageDeliveredAction(data);
  28. } else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION
  29. .equals(action)) {
  30. connectionLostAction(data);
  31. } else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) {
  32. disconnected(data);
  33. } else if (MqttServiceConstants.TRACE_ACTION.equals(action)) {
  34. traceAction(data);
  35. } else {
  36. mqttService.traceError(MqttService.TAG, "Callback action doesn't exist.");
  37. }
  38. }
  39. 复制代码

从代码和注释以及上面的deliverBacklog方法中可以知道,我们现在需要关注的action为MESSAGE_ARRIVED_ACTION,所以就可以调用方法messageArrivedAction(data):

  1. // MqttAndroidClient类:
  2. private void messageArrivedAction(Bundle data) {
  3. if (callback != null) {
  4. String messageId = data
  5. .getString(MqttServiceConstants.CALLBACK_MESSAGE_ID);
  6. String destinationName = data
  7. .getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME);
  8. ParcelableMqttMessage message = data
  9. .getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL);
  10. try {
  11. if (messageAck == Ack.AUTO_ACK) {
  12. callback.messageArrived(destinationName, message);
  13. mqttService.acknowledgeMessageArrival(clientHandle, messageId);
  14. } else {
  15. message.messageId = messageId;
  16. callback.messageArrived(destinationName, message);
  17. }
  18. // let the service discard the saved message details
  19. } catch (Exception e) {
  20. // Swallow the exception
  21. }
  22. }
  23. }
  24. @Override
  25. public void setCallback(MqttCallback callback) {
  26. this.callback = callback;
  27. }
  28. 复制代码

在messageArrivedAction方法中可以看到,我们最后调用了callback回调了messageArrived方法,那么 callback通过上面下部分代码可以知道,其实这个callback就是我们上一篇文章中所说的我们初始化MqttAndroidClient后,通过方法setCallback设置的我们自己定义的实现MqttCallback接口的回调类。

监听Broker返回的消息之发布消息成功与否

再看下sendAction(data)方法:

  1. private void sendAction(Bundle data) {
  2. IMqttToken token = getMqttToken(data);
  3. // remove on delivery
  4. simpleAction(token, data);
  5. }
  6. private void simpleAction(IMqttToken token, Bundle data) {
  7. if (token != null) {
  8. Status status = (Status) data
  9. .getSerializable(MqttServiceConstants.CALLBACK_STATUS);
  10. if (status == Status.OK) {
  11. // 如果发布成功回调此方法
  12. ((MqttTokenAndroid) token).notifyComplete();
  13. } else {
  14. Exception exceptionThrown =
  15. (Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION);
  16. // 发布失败回调
  17. ((MqttTokenAndroid) token).notifyFailure(exceptionThrown);
  18. }
  19. } else {
  20. if (mqttService != null) {
  21. mqttService.traceError(MqttService.TAG, "simpleAction : token is null");
  22. }
  23. }
  24. }
  25. 复制代码

接下来再看一看发布成功回调的MqttTokenAndroid的notifyComplete函数:

  1. // MqttTokenAndroid类:
  2. void notifyComplete() {
  3. synchronized (waitObject) {
  4. isComplete = true;
  5. waitObject.notifyAll();
  6. if (listener != null) {
  7. listener.onSuccess(this);
  8. }
  9. }
  10. }
  11. 复制代码

这里又调用了listener.onSuccess(this)方法,那么这个listener是谁?其实listener就是我们调用MqttAndroidClient类的publish发布的最后一个参数,即我们自定义的监听发布消息是否发布成功的回调类。上面在MqttConnection类的publish方法中封装过MqttServiceConstants.SEND_ACTION的Bundle数据,而此数据是被MqttConnection类里的MqttConnectionListener携带。所以MqttConnectionListener里的onSuccess被调用时就会调用service.callbackToActivity,继而到sendBroadcast发送广播,最后调用sendAction方法,回调自定义的IMqttActionListener的实现类。而MqttConnectionListener里的onSuccess是在CommsCallback类里的fireActionEvent方法中,往上走就到CommsCallback类的了handleActionComplete和run()函数。

现在看是不是有点懵毕竟上面有两个 监听Broker返回的消息,一个是用来监听Broker发给客户端数据的监听,另一个是客户端发布消息是否发布成功的监听而已。两者都是使用MqttActionListener,不过前者在MqttActionListener监听回调里最后调用的是自定义的MqttCallback回调而已。并且两者监听的位置不一样,前者是在 MqttConnection类的connect时就已确认下来的,对于一个MQTT连接只会有一个,所以这个是一直用来监听数据的;而后者监听发布消息是否成功是每个publish都需要传入的,并在MqttConnection类里的publish初始化。这么讲是不是就清晰一些啦。

哈哈,到此MQTT的publish发布以及接收Broker数据的源码分析也看完啦。

(注:若有什么地方阐述有误,敬请指正。)

转载于:https://juejin.im/post/5cd7bf74e51d45475b17e393

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/188594
推荐阅读
相关标签
  

闽ICP备14008679号