当前位置:   article > 正文

paho源码解析--发布_paho-mqtt5.0源码

paho-mqtt5.0源码

MQTTAsync_createWithOptions:

  1. Socket_setWriteContinueCallback(MQTTAsync_writeContinue);
  2. Socket_setWriteCompleteCallback(MQTTAsync_writeComplete);
  3. Socket_setWriteAvailableCallback(MQTTProtocol_writeAvailable);

MQTTAsync_connect

  1. 检查连接参数的合法性
  2. 启动发送和接收线程
  3. 设置连接相关的各种参数
  4. 添加连接指令到命令队列中
  1. int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
  2. {
  3. /*
  4. 启动发送和接受线程
  5. */
  6. if (sendThread_state != STARTING && sendThread_state != RUNNING)
  7. {
  8. sendThread_state = STARTING;
  9. Thread_start(MQTTAsync_sendThread, NULL);
  10. }
  11. if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
  12. {
  13. receiveThread_state = STARTING;
  14. Thread_start(MQTTAsync_receiveThread, handle);
  15. }
  16. /* 添加连接请求到工作队列中 */
  17. if ((conn = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
  18. {
  19. rc = PAHO_MEMORY_ERROR;
  20. goto exit;
  21. }
  22. memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
  23. conn->client = m;
  24. if (options)
  25. {
  26. conn->command.onSuccess = options->onSuccess;
  27. conn->command.onFailure = options->onFailure;
  28. conn->command.onSuccess5 = options->onSuccess5;
  29. conn->command.onFailure5 = options->onFailure5;
  30. conn->command.context = options->context;
  31. }
  32. conn->command.type = CONNECT;
  33. conn->command.details.conn.currentURI = 0;
  34. rc = MQTTAsync_addCommand(conn, sizeof(conn));
  35. }

命令队列(MQTTAsync_commands )是一个双向链表,是一个全局变量,如下:

  1. //维护了两个全局变量
  2. List* MQTTAsync_handles = NULL;
  3. List* MQTTAsync_commands = NULL;
  4. /**
  5. * Structure to hold all data for one list element
  6. */
  7. typedef struct ListElementStruct
  8. {
  9. struct ListElementStruct *prev, /**< pointer to previous list element */
  10. *next; /**< pointer to next list element */
  11. void* content; /**< pointer to element content */
  12. } ListElement;
  13. /**
  14. * Structure to hold all data for one list
  15. */
  16. typedef struct
  17. {
  18. ListElement *first, /**< first element in the list */
  19. *last, /**< last element in the list */
  20. *current; /**< current element in the list, for iteration */
  21. int count; /**< no of items */
  22. size_t size; /**< heap storage used */
  23. } List;
  1. enum msgTypes
  2. {
  3. CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
  4. PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
  5. PINGREQ, PINGRESP, DISCONNECT, AUTH
  6. };
  7. typedef struct
  8. {
  9. int type;//msgTypes枚举类型
  10. MQTTAsync_onSuccess* onSuccess;
  11. MQTTAsync_onFailure* onFailure;
  12. MQTTAsync_onSuccess5* onSuccess5;
  13. MQTTAsync_onFailure5* onFailure5;
  14. MQTTAsync_token token;
  15. void* context;
  16. START_TIME_TYPE start_time;
  17. MQTTProperties properties;
  18. union
  19. {
  20. struct
  21. {
  22. int count;
  23. char** topics;
  24. int* qoss;
  25. MQTTSubscribe_options opts;
  26. MQTTSubscribe_options* optlist;
  27. } sub;
  28. struct
  29. {
  30. int count;
  31. char** topics;
  32. } unsub;
  33. struct
  34. {
  35. char* destinationName;
  36. int payloadlen;
  37. void* payload;
  38. int qos;
  39. int retained;
  40. } pub;
  41. struct
  42. {
  43. int internal;
  44. int timeout;
  45. enum MQTTReasonCodes reasonCode;
  46. } dis;
  47. struct
  48. {
  49. int currentURI;
  50. int MQTTVersion; /**< current MQTT version being used to connect */
  51. } conn;
  52. } details;
  53. } MQTTAsync_command;
  54. typedef struct MQTTAsync_struct
  55. {
  56. char* serverURI;
  57. int ssl;
  58. int websocket;
  59. Clients* c;
  60. /* "Global", to the client, callback definitions */
  61. MQTTAsync_connectionLost* cl;
  62. MQTTAsync_messageArrived* ma;
  63. MQTTAsync_deliveryComplete* dc;
  64. void* clContext; /* the context to be associated with the conn lost callback*/
  65. void* maContext; /* the context to be associated with the msg arrived callback*/
  66. void* dcContext; /* the context to be associated with the deliv complete callback*/
  67. MQTTAsync_connected* connected;
  68. void* connected_context; /* the context to be associated with the connected callback*/
  69. MQTTAsync_disconnected* disconnected;
  70. void* disconnected_context; /* the context to be associated with the disconnected callback*/
  71. MQTTAsync_updateConnectOptions* updateConnectOptions;
  72. void* updateConnectOptions_context;
  73. /* Each time connect is called, we store the options that were used. These are reused in
  74. any call to reconnect, or an automatic reconnect attempt */
  75. MQTTAsync_command connect; /* Connect operation properties */
  76. MQTTAsync_command disconnect; /* Disconnect operation properties */
  77. MQTTAsync_command* pending_write; /* Is there a socket write pending? */
  78. List* responses;
  79. unsigned int command_seqno;
  80. MQTTPacket* pack;
  81. /* added for offline buffering */
  82. MQTTAsync_createOptions* createOptions;
  83. int shouldBeConnected;
  84. int noBufferedMessages; /* the current number of buffered (publish) messages for this client */
  85. /* added for automatic reconnect */
  86. int automaticReconnect;
  87. int minRetryInterval;
  88. int maxRetryInterval;
  89. int serverURIcount;
  90. char** serverURIs;
  91. int connectTimeout;
  92. int currentInterval;
  93. int currentIntervalBase;
  94. START_TIME_TYPE lastConnectionFailedTime;
  95. int retrying;
  96. int reconnectNow;
  97. /* MQTT V5 properties */
  98. MQTTProperties* connectProps;
  99. MQTTProperties* willProps;
  100. } MQTTAsyncs;
  101. typedef struct
  102. {
  103. MQTTAsync_command command;
  104. MQTTAsyncs* client;
  105. unsigned int seqno; /* only used on restore */
  106. int not_restored;
  107. char* key; /* if not_restored, this holds the key */
  108. } MQTTAsync_queuedCommand;

MQTTAsync_sendThread

  1. 修改发送线程工作状态
  2. 如果工作队列中有指令,处理工作队列中的指令(MQTTAsync_processCommand
  3. 等待send_cond信号量,如果没有1秒超时退出
  4. 检查超时
    发送线程工作状态,是一个全局变量,paho库支持多个client,但是发送线程却只有一个线程。

发送线程的主要执行是在MQTTAsync_processCommand中,主要如下几点:

  • 从队列中寻找可执行的指令
  • 从工作列表中去掉寻找到的指令的link
  • 执行指令
  • 指令执行结果的处理

ref:

paho mqtt c 源码分析-1 - 简书

物联网 -Paho MQTT C Cient的实现和详解_阿进的写字台的博客-CSDN博客

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/176395
推荐阅读
  

闽ICP备14008679号