当前位置:   article > 正文

C语言基于paho实现MQTT客户端实战案例_paho mqtt c开发

paho mqtt c开发

目标

1.实现MQTT客户端,可以订阅主题进行处理,也可以进行主题消息发布。
2.建立缓存队列进行MQTT消息的处理。

说明

本案例在centos虚拟机中用QT CREATOR运行,所以没有单独的makefile,利用了QT CREATOR提供的qmake。
CPU类型为X86。
项目文件列表如下,日志模块可以参考博文 嵌入式系统简易日志模块搭建
在这里插入图片描述
log.c和log.h的代码就不再贴了。需要的话可以去下整个工程,工程下载链接放在文章最后。
不算日志模块的话整个工程的代码量也就千行左右。
验证的时候需要装个mosquitto—mqtt的服务端。MQTT的原理可以去百度下。默认端口一般是1883.

项目代码

make文件 mqttClient.pro

TEMPLATE = app
CONFIG += console
CONFIG -= app_bundle
CONFIG -= qt

SOURCES += \
        main.c \
        log/log.c \
        mqtt/mqttClientMgr.c \
        queue/queue.c \

HEADERS += \
        log/log.h \
        mqtt/mqttClientMgr.h \
        include/list.h \
        queue/queue.h \

INCLUDEPATH += $$PWD/log \
               $$PWD/mqtt \
               $$PWD/include \
               $$PWD/queue \

LIBS += -pthread
LIBS += -ldl

LIBS += -lpaho-mqtt3c
LIBS += -ltinyxml2
LIBS += /usr/lib64/libsqlite3.so.0.8.6

#LIBS += /home/ymj/mqttClient/bin/lib_arm/libpaho-mqtt3c.so.1.0
#LIBS += -L/home/ymj/mqttClient/bin/lib_arm -ltinyxml -lpaho-mqtt3c
#LIBS += /home/ymj/mqttClient/bin/lib_arm/libsqlite3.so.0.8.6

LIBS += -lrt

DESTDIR=$$PWD/bin

QMAKE_LFLAGS += -Wl,-Bsymbolic
QMAKE_LFLAGS += -Wl,-rpath=./lib

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

链的库有点多,除了paho还有数据库sqlite3和tinyxml2,之后也许会抽空写下这俩东东的使用方法。
自己运行的时候电脑里没有这俩库的话可以把这两行删掉,只要有paho库就可以。
文章最后会提供整个工程的下载,里面有这三个第三方库的arm和x86版本。

项目入口 main.c

#include <stdio.h>
#include <string.h>
#include <dirent.h>
#include <stdlib.h>
#include <unistd.h>

#include "log.h"
#include "mqttClientMgr.h"

#define  WORK_DIR "../"

int changeWorkDir(void)
{
    char dir[128] = {0};

    if(chdir(WORK_DIR) == -1)
    {
        LS_LOG(LOG_ERROR, "Could not chdir to \"%s\": aborting\n", WORK_DIR);
        return LS_ERR;
    }
    getcwd(dir, sizeof(dir));
    LS_LOG(LOG_INFO, "---workdir:%s---\n", dir);

    return LS_OK;
}

/*C语言调用paho实现MQTT客户端
*/
int main()
{
    //日志模块
    logDataInit();
    logManage();

    //修改工作目录
    changeWorkDir();

    LS_LOG(LOG_INFO, "---mqttClient start---\n");
    void *thread_reval = NULL;

    //MQTT客户端
    mqttClientInit();

    threadWaitDone(&thread_reval);

    if(thread_reval != NULL)
    {
        LS_LOG(LOG_DEBUG, "thread_reval : %s is not null\n", (char *)thread_reval);
    }
    //不会运行至此
    LS_LOG(LOG_INFO, "---robot end---\n");
    
    return 0;

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

没啥说的,mqttClientInit()接口为本文的重点内容。
threadWaitDone(&thread_reval)这玩意保证了编出来的进程会一直运行,不会一下就执行完return了。
调用的头文件可能有点多,继续往后走。
由于需要用到链表和队列,下面先把这两个小工具的代码贴出来。

链表 list.h

/*LIST*/
/***************************************************************************
 *                                    服务接口定义
 *****************************************************************************/

struct list_node
{
    struct list_node *next;                          /**< point to next node. */
    struct list_node *prev;                          /**< point to prev node. */
};
typedef struct list_node list_t;                  /**< Type for lists. */


struct slist_node
{
    struct slist_node *next;                         /**< point to next node. */
};
typedef struct slist_node slist_t;

/*****************************************************************************
 *  * 根据结构体成员地址返回结构体指针
 *****************************************************************************/
#define container_of(ptr, type, member) \
    ((type *)((char *)(ptr) - (unsigned long)(&((type *)0)->member)))


/***************************************************************************
 *  * 初始化一个链表对象
 *****************************************************************************/
#define LIST_HEAD_INIT(name) { &(name), &(name) }

#define LIST_HEAD(name) \
    struct list_node name = LIST_HEAD_INIT(name)
/*****************************************************************************
 *  * 初始化链表
 *****************************************************************************/
static inline void list_init(list_t *l)
{
    l->next = l->prev = l;
}

/*****************************************************************************
 *  * 在链表尾处插入节点
 *****************************************************************************/
static inline void list_insert_after(list_t *l, list_t *n)
{
    l->next->prev = n;
    n->next = l->next;

    l->next = n;
    n->prev = l;
}

/*****************************************************************************
 *  * 在链表头处插入节点
 *****************************************************************************/
static inline void list_insert_before(list_t *l, list_t *n)
{
    l->prev->next = n;
    n->prev = l->prev;

    l->prev = n;
    n->next = l;
}

/*****************************************************************************
 *  * 移除节点
 *****************************************************************************/
static inline void list_remove(list_t *n)
{
    n->next->prev = n->prev;
    n->prev->next = n->next;

    n->next = n->prev = n;
}

/*****************************************************************************
 *  * 判断链表是否为空
 *****************************************************************************/
static inline int list_isempty(const list_t *l)
{
    return l->next == l;
}

/*****************************************************************************
 *  * 获取联表节点个数
 *****************************************************************************/
static inline unsigned int list_len(const list_t *l)
{
    unsigned int len = 0;
    const list_t *p = l;
    while (p->next != l)
    {
        p = p->next;
        len ++;
    }

    return len;
}

/*****************************************************************************
 *  * 获取节点所属结构体指针
 *****************************************************************************/
#define list_entry(node, type, member) \
    container_of(node, type, member)

/*****************************************************************************
 *  * 遍历链表节点
 *****************************************************************************/
#define list_for_each(pos, head) \
    for (pos = (head)->next; pos != (head); pos = pos->next)

/*****************************************************************************
 *  * 安全遍历链表节点
 *****************************************************************************/
#define list_for_each_safe(pos, n, head) \
    for (pos = (head)->next, n = pos->next; pos != (head); \
        pos = n, n = pos->next)

/*****************************************************************************
 *  * 遍历链表节点所属结构体
 *****************************************************************************/
#define list_for_each_entry(pos, head, member) \
    for (pos = list_entry((head)->next, typeof(*pos), member); \
         &pos->member != (head); \
         pos = list_entry(pos->member.next, typeof(*pos), member))

/*****************************************************************************
 *  * 安全遍历链表节点所属结构体
 *****************************************************************************/
#define list_for_each_entry_safe(pos, n, head, member) \
    for (pos = list_entry((head)->next, typeof(*pos), member), \
         n = list_entry(pos->member.next, typeof(*pos), member); \
         &pos->member != (head); \
         pos = n, n = list_entry(n->member.next, typeof(*n), member))

/****************************************************************************
 *  * rt_list_first_entry - 获取链表中的第一个节点
 *  * @ptr:    the list head to take the element from.
 *  * @type:   the type of the struct this is embedded in.
 *  * @member: the name of the list_struct within the struct.
 *  *
 *  * 注意:that list is expected to be not empty.
 ****************************************************************************/
#define list_first_entry(ptr, type, member) \
    list_entry((ptr)->next, type, member)


/***************************************************************************
 *  * @brief 单链表初始化
 *  *
 *  * @param l the single list to be initialized
 ***************************************************************************/
static inline void slist_init(slist_t *l)
{
    l->next = NULL;
}

static inline void slist_append(slist_t *l, slist_t *n)
{
    struct slist_node *node;

    node = l;
    while (node->next) node = node->next;

    /* append the node to the tail */
    node->next = n;
    n->next = NULL;
}

static inline void slist_insert(slist_t *l, slist_t *n)
{
    n->next = l->next;
    l->next = n;
}

static inline unsigned int slist_len(const slist_t *l)
{
    unsigned int len = 0;
    const slist_t *list = l->next;
    while (list != NULL)
    {
        list = list->next;
        len ++;
    }

    return len;
}

static inline slist_t *slist_remove(slist_t *l, slist_t *n)
{
    /* remove slist head */
    struct slist_node *node = l;
    while (node->next && node->next != n) node = node->next;

    /* remove node */
    if (node->next != (slist_t *)0) node->next = node->next->next;

    return l;
}

static inline slist_t *slist_first(slist_t *l)
{
    return l->next;
}

static inline slist_t *slist_tail(slist_t *l)
{
    while (l->next) l = l->next;

    return l;
}

static inline slist_t *slist_next(slist_t *n)
{
    return n->next;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218

消息队列 queue.h


typedef struct QUEUE_ST
{
    char* data;
    int ElemNum;
    int ElemSize;
    int front;
    int rear;
}QueueSt;

/***************************************************************************
function: QueueCreate
input:
output:
Description:提供统一队列创建函数
****************************************************************************/
QueueSt *QueueCreate(int ElementSize,int ElementNum);

/***************************************************************************
function: IsQueueEmpty
input:
output:
Description:判断队列是否为空
****************************************************************************/
int IsQueueEmpty(QueueSt *queue);

/***************************************************************************
function: EnQueue
input:
output:
Description:入队操作
****************************************************************************/
int EnQueue(QueueSt *queue, void* InData,int size);

/***************************************************************************
function: ExQueue
input:
output:
Description:出队操作
****************************************************************************/
int ExQueue(QueueSt *queue, void *OutData,int size);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

消息队列 queue.c

#include <string.h>

#include "queue.h"
#include "log.h"

/***************************************************************************
function: MqttQueueCreate
input:
output:
Description:创建队列
****************************************************************************/
QueueSt *QueueCreate(int ElementSize,int ElementNum)
{
   QueueSt *queue = (QueueSt *)malloc(sizeof(QueueSt));
   if(NULL == queue)
   {
     LS_LOG(LOG_ERROR, "malloc fail!\n");
     return NULL;
   }

   memset((char *)queue, 0, sizeof(QueueSt));

   queue->data =(char*)malloc((unsigned long)(ElementSize * ElementNum));
   if(NULL == queue->data)
   {
     LS_LOG(LOG_ERROR, "malloc fail !\n");
     return NULL;
   }

   queue->ElemNum  = ElementNum;
   queue->ElemSize = ElementSize;

   queue->rear  = 0;
   queue->front = 0;

   return queue;
}

/***************************************************************************
function: IsQueueEmpty
input:
output:
Description:判断队列是否为空
****************************************************************************/
int IsQueueEmpty(QueueSt *queue)
{
   if(NULL == queue)
   {
      LS_LOG(LOG_ERROR, "queue is NULL!\n");
      return LS_ERR;
   }

   return (queue->front == queue->rear ? TRUE : FALSE); //空为TRUE,否则为FALSE
}
/***************************************************************************
function: ExQueue
input:
output:
Description:出队操作
****************************************************************************/
int ExQueue(QueueSt *queue, void *OutData, int size)
{
   if(NULL == queue || NULL == OutData)
   {
        LS_LOG(LOG_ERROR, "input para is NULL!\n");
        return LS_ERR;
   }

   if(TRUE == IsQueueEmpty(queue))
   {
        LS_LOG(LOG_INFO, "queue empty!\n");
        return LS_ERR;
   }

   if(size > queue->ElemSize)
   {
       LS_LOG(LOG_ERROR, "error:size > ElemSize!\n");
       return LS_ERR;
   }

   if (OutData != NULL)
   {
       memcpy(OutData,queue->data + queue->front * queue->ElemSize, (unsigned long)size);
   }
   else
   {
       LS_LOG(LOG_ERROR, "OutData is NULL!\n");
       return LS_ERR;
   }

   queue->front = (queue->front + 1) % queue->ElemNum; //++队首 % N

   return LS_OK;

}

/***************************************************************************
function: IsQueueFull
input:
output:
Description:判断队列是否为满
****************************************************************************/
int IsQueueFull(QueueSt *queue)
{
   if(NULL == queue)
   {
      LS_LOG(LOG_ERROR,"queue is NULL!\n");
      return LS_ERR;
   }

   return ((queue->rear + 1) % queue->ElemNum == queue->front ? TRUE : FALSE); //空为TRUE,否则为FALSE
}

/***************************************************************************
function: EnQueue
input:
output:
Description:入队操作
****************************************************************************/
int EnQueue(QueueSt *queue, void* InData, int size)
{
   if(NULL == queue || NULL == InData)
   {
        LS_LOG(LOG_ERROR, "input para is NULL!\n");
        return LS_ERR;
   }

   if(TRUE == IsQueueFull(queue))
   {
        LS_LOG(LOG_ERROR, "queue is full!\n");
        return LS_ERR;
   }

   if(size > queue->ElemSize)
   {
       LS_LOG(LOG_ERROR, "error:size > ElemSize!\n");
       return LS_ERR;
   }

   memcpy(queue->data + queue->rear * queue->ElemSize, (char *)InData, (unsigned long)size);
   queue->rear = (queue->rear + 1) % queue->ElemNum; //++队尾 % N

   return LS_OK;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145

接下来就是本文的重头戏了。

MQTT客户端模块 mqttClientMgr.h

#include <pthread.h>
#include <semaphore.h>

#include "MQTTClient.h"
#include "list.h"
#include "queue.h"

#define CLIENTID    "ExampleClientSub"
#define TOPIC       "MQTT Examples"
#define PAYLOAD     "Hello World!"
#define QOS         0
#define TIMEOUT     10000L
#define DISCONNECT	"out"

/**********订阅主题**********/
#define SUB_TOPIC_NUM   1

#define  TOPIC_SUB_IOT_REG               "v1/127001/response/iotregister"

/********发布主题*********/
#define PUB_TOPIC_NUM   1

#define  TOPIC_PUB_REG_DEVICE_ID         0
#define  TOPIC_PUB_REG_DEVICE            "v1/127001/sensor/add"


typedef struct SEND_RECORD_ST
{
  long long mid;
  char type[64];
  list_t list;
}SendRecordSt;

typedef struct MQTT_PUB_DATA_ST
{
    int TopicId;
    int len;
    char* data;
}MqttPubDataSt;

typedef struct MQTT_SUB_DATA_ST
{
    int clientId;//用于区分不同的mqtt客户端
    int len;
    char *topic;
    char *data;
}MqttSubDataSt;

typedef struct MQTT_PUB_MGR_ST
{
    MqttPubDataSt stMqttPubData;
    QueueSt *pMqttPubQueue;
    list_t   PubDataList;
    pthread_mutex_t  lock;//访问互斥锁
    sem_t QueueWaitsem;
    sem_t DeliverOksem;
}MqttPubMgrSt;

typedef struct MQTT_SUB_MGR_ST
{
    MqttSubDataSt stMqttSubData;
    QueueSt *pMqttSubQueue;
    pthread_mutex_t  lock;//访问互斥锁
    sem_t QueueWaitsem;
}MqttSubMgrSt;

typedef struct MQTT_MGR_ST
{
    MQTTClient MqttClient;
    int  MqttConnectFlag;
    int  StartBusinessFlag;
    long long  MessageId;
    pthread_mutex_t  lock;//访问互斥锁

    MqttPubMgrSt stPubMgr;

    MqttSubMgrSt stSubMgr;

    list_t AddFailList;
    int ReAddCnt;
    sem_t ReAddSem;
    pthread_mutex_t  Addlock;//访问互斥锁

    list_t ActivateFailList;
    int ReActivateCnt;
    sem_t ReActivateSem;
    pthread_mutex_t  Activatelock;//访问互斥锁

    list_t UpdateFailList;
    int ReUpdateCnt;
    sem_t ReUpdateSem;
    pthread_mutex_t  Updatelock;//访问互斥锁
}MqttMgrSt;


void threadWaitDone(void **reval);
int mqttClientInit(void);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98

新增主题或者删除主题的订阅或者发布需要修改SUB_TOPIC_NUM和PUB_TOPIC_NUM ,不然会数组越界挂掉,看到mqttClientMgr.c就知道原因了。
加了一些锁和信号量。

MQTT客户端模块 mqttClientMgr.c

#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#include "mqttClientMgr.h"
#include "log.h"

static MqttMgrSt stMqttMgr;
static volatile MQTTClient_deliveryToken deliveredtoken;
static pthread_t MqttClientThreadId;
static pthread_t ProcSubQueueThreadId;

#define  PUB_QUEUE_ELEM_NUM      200
#define  SUB_QUEUE_ELEM_NUM      500
#define  MAX_BROKER_ADDR_LEN     32
#define MQTT_PLATFORM_ENABLE     (1)

#define BROKER_IP     "127.0.0.1"
#define BROKER_PORT   "1883"
#define CLIENT_ID     "anyhow"
#define USER_NAME     "mosquitto"
#define PWD           "ls@123"

static char * SubTopics[SUB_TOPIC_NUM]={TOPIC_SUB_IOT_REG};
static char * PubTopics[PUB_TOPIC_NUM]={TOPIC_PUB_REG_DEVICE};


void threadWaitDone(void **reval)
{
    pthread_join(MqttClientThreadId, reval);
}


void SetThreadAttr(pthread_attr_t *thread_attr, unsigned int priority, int policy, size_t stack_size)
{
    size_t StackSize = 0;

    pthread_attr_init(thread_attr);//首先需要对属性变量进行初始化
    pthread_attr_setscope(thread_attr,PTHREAD_SCOPE_PROCESS);
    pthread_attr_setdetachstate(thread_attr,PTHREAD_CREATE_JOINABLE);
    pthread_attr_setschedpolicy(thread_attr,policy);
    struct sched_param param;
    pthread_attr_getschedparam(thread_attr, &param);
    param.sched_priority = (int)priority;
    pthread_attr_setschedparam(thread_attr, &param);
    if(stack_size < 0x4000) //以字节为单位  最小16k
    {
      StackSize = 0x4000;
    }
    pthread_attr_setstacksize(thread_attr,StackSize);
}

int CreateThread(pthread_t *thread_id, const pthread_attr_t *attr,void *(*thread_fun) (void *), void *thread_arg)
{
     int ret = -1;

     ret = pthread_create(thread_id,attr,thread_fun,thread_arg);

     return(ret);
}

/***************************************************************************
function: GetBrokerAddrinput:
output:
Description:通过配置构建MQTT broker地址
****************************************************************************/
static int GetBrokerAddr(char *BrokerAddr,int len)
{
    unsigned int size = 0;
    char BrokerPortStr[6]; //格式:protocol://host:port

    if(NULL == BrokerAddr)
    {
        LS_LOG(LOG_ERROR, "GetBrokerAddr: BrokerAddr is NULL.\n");
        return LS_ERR;
    }
    memset(BrokerPortStr, 0, sizeof(BrokerPortStr));
    memcpy(BrokerAddr, "tcp://", strlen("tcp://"));
    size = (unsigned int)len - strlen("tcp://");
    if(strlen((char*)BROKER_IP) > size)
    {
        LS_LOG(LOG_ERROR, "GetBrokerAddr: BrokerAddr buf is not enough!\n");
        return LS_ERR;
    }
    strcat(BrokerAddr, (char*)BROKER_IP);
    size = size - strlen((char*)BROKER_IP);
    if(size < 1)
    {
        LS_LOG(LOG_ERROR, "GetBrokerAddr: BrokerAddr buf is not enough!\n");
        return LS_ERR;
    }
    strcat(BrokerAddr, ":");
    strcat(BrokerAddr, BROKER_PORT);
    /*size = size - 1;
    TransIntToStr(BROKER_PORT,BrokerPortStr);
    if(size < (strlen(BROKER_PORT) + 1))
    {
        LS_LOG(LOG_ERROR,"GetBrokerAddr: BrokerAddr buf is not enough !\n");
        return LS_ERR;
    }
    strcat(BrokerAddr,BrokerPortStr);*/
    return LS_OK;
}

/***************************************************************************
function: MqttSubscribeAllTopics
input:
output:
Description:客户端初始化的时候完成所有主题的订阅
****************************************************************************/
void MqttSubscribeAllTopics(void)
{
    int i = 0;

    for(i = 0;i < SUB_TOPIC_NUM; i++)
    {
        LS_LOG(LOG_INFO, "Subscribing topic %s\nfor client %s using QoS %d\n\n",SubTopics[i], CLIENTID, QOS);
        MQTTClient_subscribe(stMqttMgr.MqttClient, SubTopics[i], QOS);
    }
}

/***************************************************************************
function: MqttClientRelease
input:
output:
Description:MQTT客户端释放
****************************************************************************/
void MqttClientRelease(MQTTClient client)
{
    MQTTClient_unsubscribe(client, TOPIC);
    MQTTClient_disconnect(client, 10000);
    MQTTClient_destroy(&client);
}

/***************************************************************************
function: connlost
input:
output:
Description:客户端注册的连接失败回调函数
****************************************************************************/
void connlost(void *context, char *cause)
{
    LS_LOG(LOG_ERROR, "\nConnection lost\n");
    LS_LOG(LOG_ERROR, " cause: %s\n", cause);
    stMqttMgr.MqttConnectFlag = 0;
    sem_post(&stMqttMgr.stPubMgr.QueueWaitsem);
}

/***************************************************************************
function: MqttTransData
input:
output:
Description:MqttTransData
****************************************************************************/
int MqttTransData(MqttSubDataSt *tdata)
{
   int ret = LS_ERR;

   pthread_mutex_lock(&stMqttMgr.stSubMgr.lock);
   ret= EnQueue(stMqttMgr.stSubMgr.pMqttSubQueue, (char *)tdata, sizeof(MqttSubDataSt));
   pthread_mutex_unlock(&stMqttMgr.stSubMgr.lock);

   if(LS_OK == ret)
   {
       sem_post(&stMqttMgr.stSubMgr.QueueWaitsem);
   }

   return ret;
}

/***************************************************************************
function: msgarrvd
input:
output:
Description:客户端注册的收到订阅消息回调函数
****************************************************************************/
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
    char* payloadptr = NULL;
    MqttSubDataSt stMqttSubData;
    char* pData = NULL;

    LS_LOG(LOG_DEBUG, "Message arrived on topic: %s\n", topicName);
    memset(&stMqttSubData, 0, sizeof(MqttSubDataSt));

    stMqttSubData.clientId = 0;

    payloadptr = message->payload;
    if(strcmp(payloadptr, DISCONNECT) == 0)
    {
        LS_LOG(LOG_WARN, " DISCONNECT\n out!!\n");
        stMqttMgr.MqttConnectFlag = FALSE;
    }

    LS_LOG(LOG_DEBUG, "message payload: %s\n", (char *)message->payload);

    pData = malloc((unsigned long)message->payloadlen);  //由队列处理线程free pData
    if(NULL == pData)
    {
        LS_LOG(LOG_ERROR, "malloc failed!!\n");
        goto fail;
    }
    memset(pData, 0, (unsigned long)message->payloadlen);
    memcpy(pData, payloadptr, (unsigned long)message->payloadlen);

    stMqttSubData.data = pData;
    stMqttSubData.len = message->payloadlen;
    stMqttSubData.topic = topicName;
    if(LS_OK != MqttTransData(&stMqttSubData))//成功,由队列处理线程free pData
    {
        LS_LOG(LOG_WARN, "MqttTransData failed!!\n");
        free(stMqttSubData.data);
    }
fail:
    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);
    return 1;
}

/***************************************************************************
function: delivered
input:
output:
Description:客户端注册的消息delivered回调函数
****************************************************************************/
void delivered(void *context, MQTTClient_deliveryToken dt)
{
    LS_LOG(LOG_INFO, "Message with token value %d delivery confirmed\n", dt);
    deliveredtoken = dt;
}

/***************************************************************************
function: MqttClientThread()
input:
output:
Description:MQTT客户端处理线程
****************************************************************************/
void *MqttClientThread()
{
    int rc = 0;
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    MQTTClient_message pubmsg = MQTTClient_message_initializer;
    MQTTClient_deliveryToken token = 0;

    char BrokerAddr[MAX_BROKER_ADDR_LEN];
    memset(BrokerAddr, 0, sizeof(BrokerAddr));
    if(LS_OK != GetBrokerAddr(BrokerAddr,MAX_BROKER_ADDR_LEN))
    {
        LS_LOG(LOG_ERROR, "GetBrokerAddr Err.\n");
        return NULL;
    }

    LS_LOG(LOG_INFO, "BrokerAddr: %s\n", BrokerAddr);

    MQTTClient_create(&stMqttMgr.MqttClient, BrokerAddr, (char*)CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);

    if(NULL == stMqttMgr.MqttClient)
    {
        LS_LOG(LOG_ERROR,"MQTTClient_create g_MottClient fail!\n");
        return NULL;
    }

    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession      = 1;
    conn_opts.username          = (char*)USER_NAME;
    conn_opts.password          = (char*)PWD;

    MQTTClient_setCallbacks(stMqttMgr.MqttClient, NULL, connlost, msgarrvd, delivered);

ReLink:
    do
    {
        LS_LOG(LOG_INFO, "start to connect MQTT server...\n");
        if ((rc = MQTTClient_connect(stMqttMgr.MqttClient, &conn_opts)) == MQTTCLIENT_SUCCESS)
        {
            LS_LOG(LOG_INFO, "connect success!\n");
            stMqttMgr.MqttConnectFlag = TRUE;
            break;
        }
        else
        {
            LS_LOG(LOG_ERROR, "Failed to connect, return code %d\n", rc);
            sleep(20); //时间可配置
        }
    }while(1);

    //完成所有主题的订阅初始化
    if(MQTT_PLATFORM_ENABLE)
    {
        MqttSubscribeAllTopics();
    }

    //客户端连接正常情况下进行信息发布
    while(TRUE == stMqttMgr.MqttConnectFlag) //发布消息
    {
        sem_wait(&stMqttMgr.stPubMgr.QueueWaitsem);
        //LS_LOG(LOG_INFO,"rev QueueWaitsem !!\n");

        pthread_mutex_lock(&stMqttMgr.stPubMgr.lock);
        do
        {
            if(LS_OK == ExQueue(stMqttMgr.stPubMgr.pMqttPubQueue, (void*)&stMqttMgr.stPubMgr.stMqttPubData, sizeof(MqttPubDataSt)))
            {
                pubmsg.payload    = stMqttMgr.stPubMgr.stMqttPubData.data; //添加一个成员用以决定发布TOPIC 记得释放free
                pubmsg.payloadlen = stMqttMgr.stPubMgr.stMqttPubData.len;
                pubmsg.qos        = QOS;
                pubmsg.retained   = 0;
                deliveredtoken    = 0;

                if(MQTT_PLATFORM_ENABLE)
                {
                    rc = MQTTClient_publishMessage(stMqttMgr.MqttClient, PubTopics[stMqttMgr.stPubMgr.stMqttPubData.TopicId], &pubmsg, &token);
                    if(strlen(pubmsg.payload) < 1024)
                    {
                        LS_LOG(LOG_INFO, "MQTT Publish message %s\n""on topic %s token %d \n",
                              (char*)pubmsg.payload,
                              PubTopics[stMqttMgr.stPubMgr.stMqttPubData.TopicId],
                              token);
                    }
                }

                if(rc != 0)
                {
                    LS_LOG(LOG_ERROR, "MQTT Publish message Failed, rc=%d !!\n", rc);
                }

                free(stMqttMgr.stPubMgr.stMqttPubData.data); //重要,异地malloc,此地释放
            }
        }
        while(FALSE == IsQueueEmpty(stMqttMgr.stPubMgr.pMqttPubQueue));
        pthread_mutex_unlock(&stMqttMgr.stPubMgr.lock);
    }

    //客户端连接断开后进行重连处理
    goto ReLink;

    //本应用不会运行至此
    MqttClientRelease(stMqttMgr.MqttClient);
}

/***************************************************************************
function: MqttProcessIotResponse()
input:
output:
Description:APP处理Iot平台回应
****************************************************************************/
void MqttProcessIotResponse(MqttSubDataSt *tSubData)
{
    //此处进行数据处理

}

/***************************************************************************
function: MqttProcSubInfoThread
input:
output:
Description:MQTT处理订阅消息队列线程
****************************************************************************/
void *MqttProcSubInfoThread(void *param)
{
   LS_LOG(LOG_INFO, "MqttProcSubInfoThread start !\n");

   while(1)
   {
      sem_wait(&stMqttMgr.stSubMgr.QueueWaitsem);
      pthread_mutex_lock(&stMqttMgr.stSubMgr.lock);
      do
      {
          if(LS_OK == ExQueue(stMqttMgr.stSubMgr.pMqttSubQueue, (void*)&stMqttMgr.stSubMgr.stMqttSubData, sizeof(MqttSubDataSt)))
          {
              MqttProcessIotResponse(&stMqttMgr.stSubMgr.stMqttSubData);
              free(stMqttMgr.stSubMgr.stMqttSubData.data);
          }
      }
      while(FALSE == IsQueueEmpty(stMqttMgr.stSubMgr.pMqttSubQueue));
      pthread_mutex_unlock(&stMqttMgr.stSubMgr.lock);
   }
}

int mqttClientInit(void)
{
    int ret = -1;
    pthread_attr_t Thread_Attr;

    memset((char *)&stMqttMgr,0,sizeof(MqttMgrSt));
    stMqttMgr.MqttConnectFlag = FALSE;
    stMqttMgr.StartBusinessFlag = FALSE;
    stMqttMgr.MessageId = 1000000000000000;
    pthread_mutex_init(&stMqttMgr.lock,NULL);

    sem_init(&stMqttMgr.stPubMgr.QueueWaitsem,0,0);
    pthread_mutex_init(&stMqttMgr.stPubMgr.lock,NULL);
    list_init(&stMqttMgr.stPubMgr.PubDataList);

    sem_init(&stMqttMgr.stSubMgr.QueueWaitsem,0,0);
    pthread_mutex_init(&stMqttMgr.stSubMgr.lock,NULL);

    list_init(&stMqttMgr.AddFailList);
    stMqttMgr.ReAddCnt = 0;
    sem_init(&stMqttMgr.ReAddSem,0,0);
    pthread_mutex_init(&stMqttMgr.Addlock,NULL);

    list_init(&stMqttMgr.ActivateFailList);
    stMqttMgr.ReActivateCnt = 0;
    sem_init(&stMqttMgr.ReActivateSem,0,0);
    pthread_mutex_init(&stMqttMgr.Activatelock,NULL);

    list_init(&stMqttMgr.UpdateFailList);
    stMqttMgr.ReUpdateCnt = 0;
    sem_init(&stMqttMgr.ReUpdateSem,0,0);
    pthread_mutex_init(&stMqttMgr.Updatelock,NULL);

    stMqttMgr.stPubMgr.pMqttPubQueue = QueueCreate(sizeof(MqttPubDataSt), PUB_QUEUE_ELEM_NUM);
    if(NULL == stMqttMgr.stPubMgr.pMqttPubQueue)
    {
        LS_LOG(LOG_ERROR, "Create MQTT Pub Queue failed!\n");
        return LS_ERR;
    }
    stMqttMgr.stSubMgr.pMqttSubQueue = QueueCreate(sizeof(MqttSubDataSt), SUB_QUEUE_ELEM_NUM);
    if(NULL == stMqttMgr.stSubMgr.pMqttSubQueue)
    {
        LS_LOG(LOG_ERROR, "Create MQTT Sub Queue failed!\n");
        return LS_ERR;
    }

    SetThreadAttr(&Thread_Attr, 20, SCHED_OTHER, 0x100000);
    ret = CreateThread(&MqttClientThreadId, &Thread_Attr, MqttClientThread, NULL);

    if(ret != 0)
    {
        LS_LOG(LOG_ERROR, "Create MQTT client thread failed!\n");
        return LS_ERR;
    }

    SetThreadAttr(&Thread_Attr, 20, SCHED_OTHER, 0x8000);

    ret = CreateThread(&ProcSubQueueThreadId, &Thread_Attr, MqttProcSubInfoThread, NULL);

    if(ret != 0)
    {
        LS_LOG(LOG_ERROR, "Create MQTT Process Sub Info thread failed!\n");
        return LS_ERR;
    }

    return LS_OK;
}

/***************************************************************************
function: MqttSendData
input:  MqttPubDataSt类型指针的数据
output:
Description:MQTT发送数据
return: LS_OK-成功  LS_ERR-失败(需要重新发送)
****************************************************************************/
int MqttSendData(char *data, int len, int TopicId, SendRecordSt *pSendRecord)
{
    int ret      = LS_ERR;
    MqttPubDataSt StPubData;
    char *pdata  = NULL;

    memset(&StPubData, 0, sizeof(MqttPubDataSt));

    if(NULL != data && len > 0)
    {
        pdata = (char *)malloc((unsigned long)len);
        if(NULL == pdata)
        {
            LS_LOG(LOG_INFO, "malloc failed!!\n");
            return LS_ERR;
        }
        memset(pdata, 0, (unsigned long)len);
        memcpy(pdata, data, (unsigned long)len);
    }
    StPubData.TopicId = TopicId;
    StPubData.data    = pdata;
    StPubData.len     = len;

    pthread_mutex_lock(&stMqttMgr.stPubMgr.lock);

    ret= EnQueue(stMqttMgr.stPubMgr.pMqttPubQueue, (char *)&StPubData, sizeof(MqttPubDataSt));

    if(NULL != pSendRecord)  //加入发送管理链表
    {
        list_insert_before(&stMqttMgr.stPubMgr.PubDataList, &pSendRecord->list);
    }

    pthread_mutex_unlock(&stMqttMgr.stPubMgr.lock);

    if(LS_OK != ret)
    {
        LS_LOG(LOG_INFO, "EnQueue error!!\n");
        free(pdata);
        return LS_ERR;
    }
    sem_post(&stMqttMgr.stPubMgr.QueueWaitsem);
    return LS_OK;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  • 377
  • 378
  • 379
  • 380
  • 381
  • 382
  • 383
  • 384
  • 385
  • 386
  • 387
  • 388
  • 389
  • 390
  • 391
  • 392
  • 393
  • 394
  • 395
  • 396
  • 397
  • 398
  • 399
  • 400
  • 401
  • 402
  • 403
  • 404
  • 405
  • 406
  • 407
  • 408
  • 409
  • 410
  • 411
  • 412
  • 413
  • 414
  • 415
  • 416
  • 417
  • 418
  • 419
  • 420
  • 421
  • 422
  • 423
  • 424
  • 425
  • 426
  • 427
  • 428
  • 429
  • 430
  • 431
  • 432
  • 433
  • 434
  • 435
  • 436
  • 437
  • 438
  • 439
  • 440
  • 441
  • 442
  • 443
  • 444
  • 445
  • 446
  • 447
  • 448
  • 449
  • 450
  • 451
  • 452
  • 453
  • 454
  • 455
  • 456
  • 457
  • 458
  • 459
  • 460
  • 461
  • 462
  • 463
  • 464
  • 465
  • 466
  • 467
  • 468
  • 469
  • 470
  • 471
  • 472
  • 473
  • 474
  • 475
  • 476
  • 477
  • 478
  • 479
  • 480
  • 481
  • 482
  • 483
  • 484
  • 485
  • 486
  • 487
  • 488
  • 489
  • 490
  • 491
  • 492
  • 493
  • 494
  • 495
  • 496
  • 497
  • 498
  • 499
  • 500

新增或删除主题的订阅和发布需要修改这俩全局数组
static char * SubTopics[SUB_TOPIC_NUM]={TOPIC_SUB_IOT_REG};
static char * PubTopics[PUB_TOPIC_NUM]={TOPIC_PUB_REG_DEVICE};

MqttSendData接口提供了MQTT消息发布的接口,就是先把MQTT消息进队列,然后在MqttProcSubInfoThread线程出队列,进行发布。
mqttClientInit开始,做了锁,信号俩个,消息队列的初始化,消息队列有俩,一个用来处理发布,一个用来处理订阅;建立俩线程,一个是MQTT客户端的建立线程,一个是订阅的MQTT消息处理线程。
msgarrvd接口里就可以对收到的消息进行处理,本文的处理还是把MQTT消息进队列,MqttProcSubInfoThread线程出队列,各位可以在MqttProcessIotResponse接口里对订阅到的MQTT消息进行处理。

到此为止,整个工程算是贴完了,下面就是见证奇迹的时刻。

项目验证

ssh端用tail -f /home/ymj/mqttClient/log/mqttlog/mqttlog.log命令动态查看日志。
使用软件自带的工具运行代码。

在这里插入图片描述

结果显示
[20220908-16:45:46][INFO][main.c][changeWorkDir][22]—workdir:/home/ymj/mqttClient—
[20220908-16:45:46][INFO][main.c][main][39]—mqttClient start—
[20220908-16:45:46][INFO][mqttClientMgr.c][MqttClientThread][255]BrokerAddr: tcp://127.0.0.1:1883
[20220908-16:45:46][INFO][mqttClientMgr.c][MqttClientThread][275]start to connect MQTT server…
[20220908-16:45:46][INFO][mqttClientMgr.c][MqttProcSubInfoThread][363]MqttProcSubInfoThread start !
[20220908-16:45:46][ERROR][mqttClientMgr.c][MqttClientThread][284]Failed to connect, return code -1
[20220908-16:46:06][INFO][mqttClientMgr.c][MqttClientThread][275]start to connect MQTT server…
[20220908-16:46:07][ERROR][mqttClientMgr.c][MqttClientThread][284]Failed to connect, return code -1

连接MQTT服务端失败,为啥呢?因为没有起服务端的broker,哈哈。
运行mosquitto,默认端口为1883,也可以自己指定端口,方法自己百度去。

在这里插入图片描述
日志显示连接broker成功,并打印出订阅的主题
在这里插入图片描述
宿主机打开MQTT客户端工具mqtt.fx,这个软件也可以从网上直接搜到
我的虚拟机ip是192.168.8.235,所以配置连接如下
在这里插入图片描述
在这里插入图片描述
配置完成后点击OK,退回来,connect。
填入代码里订阅的主题和要发布的内容
在这里插入图片描述
点击发布
在这里插入图片描述
程序成功接收到,验证完成,我擦,后面的乱码是啥,不管了,你们自己优化去。
3032.9.19:乱码原因找到了,才想起来就更新下,因为MQTT的那个消息结构体有两个连续的char*字段导致,可以修改成固定长度的char字符串数组或者分开定义字段即可。
没记错的话应该是这个结构体:
typedef struct MQTT_SUB_DATA_ST
{
int clientId;//用于区分不同的mqtt客户端
int len;
char *topic;
char *data;

}MqttSubDataSt;

程序的MQTT发布也自己写代码验证去。调用代码中提供的接口,搭好环境验证即可。

附录

项目工程如下
https://download.csdn.net/download/qqq1112345/86512208

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/176345
推荐阅读
相关标签
  

闽ICP备14008679号