赞
踩
MQTTAsync_createWithOptions:
- Socket_setWriteContinueCallback(MQTTAsync_writeContinue);
- Socket_setWriteCompleteCallback(MQTTAsync_writeComplete);
- Socket_setWriteAvailableCallback(MQTTProtocol_writeAvailable);
- int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
- {
- /*
- 启动发送和接受线程
- */
- if (sendThread_state != STARTING && sendThread_state != RUNNING)
- {
- sendThread_state = STARTING;
- Thread_start(MQTTAsync_sendThread, NULL);
- }
- if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
- {
- receiveThread_state = STARTING;
- Thread_start(MQTTAsync_receiveThread, handle);
- }
-
- /* 添加连接请求到工作队列中 */
- if ((conn = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
- conn->client = m;
- if (options)
- {
- conn->command.onSuccess = options->onSuccess;
- conn->command.onFailure = options->onFailure;
- conn->command.onSuccess5 = options->onSuccess5;
- conn->command.onFailure5 = options->onFailure5;
- conn->command.context = options->context;
- }
- conn->command.type = CONNECT;
- conn->command.details.conn.currentURI = 0;
- rc = MQTTAsync_addCommand(conn, sizeof(conn));
- }
命令队列(MQTTAsync_commands )是一个双向链表,是一个全局变量,如下:
- //维护了两个全局变量
-
- List* MQTTAsync_handles = NULL;
- List* MQTTAsync_commands = NULL;
-
- /**
- * Structure to hold all data for one list element
- */
- typedef struct ListElementStruct
- {
- struct ListElementStruct *prev, /**< pointer to previous list element */
- *next; /**< pointer to next list element */
- void* content; /**< pointer to element content */
- } ListElement;
-
-
- /**
- * Structure to hold all data for one list
- */
- typedef struct
- {
- ListElement *first, /**< first element in the list */
- *last, /**< last element in the list */
- *current; /**< current element in the list, for iteration */
- int count; /**< no of items */
- size_t size; /**< heap storage used */
- } List;
- enum msgTypes
- {
- CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
- PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
- PINGREQ, PINGRESP, DISCONNECT, AUTH
- };
-
- typedef struct
- {
- int type;//msgTypes枚举类型
- MQTTAsync_onSuccess* onSuccess;
- MQTTAsync_onFailure* onFailure;
- MQTTAsync_onSuccess5* onSuccess5;
- MQTTAsync_onFailure5* onFailure5;
- MQTTAsync_token token;
- void* context;
- START_TIME_TYPE start_time;
- MQTTProperties properties;
- union
- {
- struct
- {
- int count;
- char** topics;
- int* qoss;
- MQTTSubscribe_options opts;
- MQTTSubscribe_options* optlist;
- } sub;
- struct
- {
- int count;
- char** topics;
- } unsub;
- struct
- {
- char* destinationName;
- int payloadlen;
- void* payload;
- int qos;
- int retained;
- } pub;
- struct
- {
- int internal;
- int timeout;
- enum MQTTReasonCodes reasonCode;
- } dis;
- struct
- {
- int currentURI;
- int MQTTVersion; /**< current MQTT version being used to connect */
- } conn;
- } details;
- } MQTTAsync_command;
-
-
- typedef struct MQTTAsync_struct
- {
- char* serverURI;
- int ssl;
- int websocket;
- Clients* c;
-
-
- /* "Global", to the client, callback definitions */
- MQTTAsync_connectionLost* cl;
- MQTTAsync_messageArrived* ma;
- MQTTAsync_deliveryComplete* dc;
- void* clContext; /* the context to be associated with the conn lost callback*/
- void* maContext; /* the context to be associated with the msg arrived callback*/
- void* dcContext; /* the context to be associated with the deliv complete callback*/
-
-
- MQTTAsync_connected* connected;
- void* connected_context; /* the context to be associated with the connected callback*/
-
-
- MQTTAsync_disconnected* disconnected;
- void* disconnected_context; /* the context to be associated with the disconnected callback*/
-
-
- MQTTAsync_updateConnectOptions* updateConnectOptions;
- void* updateConnectOptions_context;
-
-
- /* Each time connect is called, we store the options that were used. These are reused in
- any call to reconnect, or an automatic reconnect attempt */
- MQTTAsync_command connect; /* Connect operation properties */
- MQTTAsync_command disconnect; /* Disconnect operation properties */
- MQTTAsync_command* pending_write; /* Is there a socket write pending? */
-
-
- List* responses;
- unsigned int command_seqno;
-
-
- MQTTPacket* pack;
-
-
- /* added for offline buffering */
- MQTTAsync_createOptions* createOptions;
- int shouldBeConnected;
- int noBufferedMessages; /* the current number of buffered (publish) messages for this client */
-
-
- /* added for automatic reconnect */
- int automaticReconnect;
- int minRetryInterval;
- int maxRetryInterval;
- int serverURIcount;
- char** serverURIs;
- int connectTimeout;
-
-
- int currentInterval;
- int currentIntervalBase;
- START_TIME_TYPE lastConnectionFailedTime;
- int retrying;
- int reconnectNow;
-
-
- /* MQTT V5 properties */
- MQTTProperties* connectProps;
- MQTTProperties* willProps;
-
-
- } MQTTAsyncs;
-
-
- typedef struct
- {
- MQTTAsync_command command;
- MQTTAsyncs* client;
- unsigned int seqno; /* only used on restore */
- int not_restored;
- char* key; /* if not_restored, this holds the key */
- } MQTTAsync_queuedCommand;
发送线程的主要执行是在MQTTAsync_processCommand中,主要如下几点:
ref:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。