赞
踩
1.实现MQTT客户端,可以订阅主题进行处理,也可以进行主题消息发布。
2.建立缓存队列进行MQTT消息的处理。
本案例在centos虚拟机中用QT CREATOR运行,所以没有单独的makefile,利用了QT CREATOR提供的qmake。
CPU类型为X86。
项目文件列表如下,日志模块可以参考博文 嵌入式系统简易日志模块搭建
log.c和log.h的代码就不再贴了。需要的话可以去下整个工程,工程下载链接放在文章最后。
不算日志模块的话整个工程的代码量也就千行左右。
验证的时候需要装个mosquitto—mqtt的服务端。MQTT的原理可以去百度下。默认端口一般是1883.
TEMPLATE = app
CONFIG += console
CONFIG -= app_bundle
CONFIG -= qt
SOURCES += \
main.c \
log/log.c \
mqtt/mqttClientMgr.c \
queue/queue.c \
HEADERS += \
log/log.h \
mqtt/mqttClientMgr.h \
include/list.h \
queue/queue.h \
INCLUDEPATH += $$PWD/log \
$$PWD/mqtt \
$$PWD/include \
$$PWD/queue \
LIBS += -pthread
LIBS += -ldl
LIBS += -lpaho-mqtt3c
LIBS += -ltinyxml2
LIBS += /usr/lib64/libsqlite3.so.0.8.6
#LIBS += /home/ymj/mqttClient/bin/lib_arm/libpaho-mqtt3c.so.1.0
#LIBS += -L/home/ymj/mqttClient/bin/lib_arm -ltinyxml -lpaho-mqtt3c
#LIBS += /home/ymj/mqttClient/bin/lib_arm/libsqlite3.so.0.8.6
LIBS += -lrt
DESTDIR=$$PWD/bin
QMAKE_LFLAGS += -Wl,-Bsymbolic
QMAKE_LFLAGS += -Wl,-rpath=./lib
链的库有点多,除了paho还有数据库sqlite3和tinyxml2,之后也许会抽空写下这俩东东的使用方法。
自己运行的时候电脑里没有这俩库的话可以把这两行删掉,只要有paho库就可以。
文章最后会提供整个工程的下载,里面有这三个第三方库的arm和x86版本。
#include <stdio.h>
#include <string.h>
#include <dirent.h>
#include <stdlib.h>
#include <unistd.h>
#include "log.h"
#include "mqttClientMgr.h"
#define WORK_DIR "../"
int changeWorkDir(void)
{
char dir[128] = {0};
if(chdir(WORK_DIR) == -1)
{
LS_LOG(LOG_ERROR, "Could not chdir to \"%s\": aborting\n", WORK_DIR);
return LS_ERR;
}
getcwd(dir, sizeof(dir));
LS_LOG(LOG_INFO, "---workdir:%s---\n", dir);
return LS_OK;
}
/*C语言调用paho实现MQTT客户端
*/
int main()
{
//日志模块
logDataInit();
logManage();
//修改工作目录
changeWorkDir();
LS_LOG(LOG_INFO, "---mqttClient start---\n");
void *thread_reval = NULL;
//MQTT客户端
mqttClientInit();
threadWaitDone(&thread_reval);
if(thread_reval != NULL)
{
LS_LOG(LOG_DEBUG, "thread_reval : %s is not null\n", (char *)thread_reval);
}
//不会运行至此
LS_LOG(LOG_INFO, "---robot end---\n");
return 0;
}
没啥说的,mqttClientInit()接口为本文的重点内容。
threadWaitDone(&thread_reval)这玩意保证了编出来的进程会一直运行,不会一下就执行完return了。
调用的头文件可能有点多,继续往后走。
由于需要用到链表和队列,下面先把这两个小工具的代码贴出来。
/*LIST*/
/***************************************************************************
* 服务接口定义
*****************************************************************************/
struct list_node
{
struct list_node *next; /**< point to next node. */
struct list_node *prev; /**< point to prev node. */
};
typedef struct list_node list_t; /**< Type for lists. */
struct slist_node
{
struct slist_node *next; /**< point to next node. */
};
typedef struct slist_node slist_t;
/*****************************************************************************
* * 根据结构体成员地址返回结构体指针
*****************************************************************************/
#define container_of(ptr, type, member) \
((type *)((char *)(ptr) - (unsigned long)(&((type *)0)->member)))
/***************************************************************************
* * 初始化一个链表对象
*****************************************************************************/
#define LIST_HEAD_INIT(name) { &(name), &(name) }
#define LIST_HEAD(name) \
struct list_node name = LIST_HEAD_INIT(name)
/*****************************************************************************
* * 初始化链表
*****************************************************************************/
static inline void list_init(list_t *l)
{
l->next = l->prev = l;
}
/*****************************************************************************
* * 在链表尾处插入节点
*****************************************************************************/
static inline void list_insert_after(list_t *l, list_t *n)
{
l->next->prev = n;
n->next = l->next;
l->next = n;
n->prev = l;
}
/*****************************************************************************
* * 在链表头处插入节点
*****************************************************************************/
static inline void list_insert_before(list_t *l, list_t *n)
{
l->prev->next = n;
n->prev = l->prev;
l->prev = n;
n->next = l;
}
/*****************************************************************************
* * 移除节点
*****************************************************************************/
static inline void list_remove(list_t *n)
{
n->next->prev = n->prev;
n->prev->next = n->next;
n->next = n->prev = n;
}
/*****************************************************************************
* * 判断链表是否为空
*****************************************************************************/
static inline int list_isempty(const list_t *l)
{
return l->next == l;
}
/*****************************************************************************
* * 获取联表节点个数
*****************************************************************************/
static inline unsigned int list_len(const list_t *l)
{
unsigned int len = 0;
const list_t *p = l;
while (p->next != l)
{
p = p->next;
len ++;
}
return len;
}
/*****************************************************************************
* * 获取节点所属结构体指针
*****************************************************************************/
#define list_entry(node, type, member) \
container_of(node, type, member)
/*****************************************************************************
* * 遍历链表节点
*****************************************************************************/
#define list_for_each(pos, head) \
for (pos = (head)->next; pos != (head); pos = pos->next)
/*****************************************************************************
* * 安全遍历链表节点
*****************************************************************************/
#define list_for_each_safe(pos, n, head) \
for (pos = (head)->next, n = pos->next; pos != (head); \
pos = n, n = pos->next)
/*****************************************************************************
* * 遍历链表节点所属结构体
*****************************************************************************/
#define list_for_each_entry(pos, head, member) \
for (pos = list_entry((head)->next, typeof(*pos), member); \
&pos->member != (head); \
pos = list_entry(pos->member.next, typeof(*pos), member))
/*****************************************************************************
* * 安全遍历链表节点所属结构体
*****************************************************************************/
#define list_for_each_entry_safe(pos, n, head, member) \
for (pos = list_entry((head)->next, typeof(*pos), member), \
n = list_entry(pos->member.next, typeof(*pos), member); \
&pos->member != (head); \
pos = n, n = list_entry(n->member.next, typeof(*n), member))
/****************************************************************************
* * rt_list_first_entry - 获取链表中的第一个节点
* * @ptr: the list head to take the element from.
* * @type: the type of the struct this is embedded in.
* * @member: the name of the list_struct within the struct.
* *
* * 注意:that list is expected to be not empty.
****************************************************************************/
#define list_first_entry(ptr, type, member) \
list_entry((ptr)->next, type, member)
/***************************************************************************
* * @brief 单链表初始化
* *
* * @param l the single list to be initialized
***************************************************************************/
static inline void slist_init(slist_t *l)
{
l->next = NULL;
}
static inline void slist_append(slist_t *l, slist_t *n)
{
struct slist_node *node;
node = l;
while (node->next) node = node->next;
/* append the node to the tail */
node->next = n;
n->next = NULL;
}
static inline void slist_insert(slist_t *l, slist_t *n)
{
n->next = l->next;
l->next = n;
}
static inline unsigned int slist_len(const slist_t *l)
{
unsigned int len = 0;
const slist_t *list = l->next;
while (list != NULL)
{
list = list->next;
len ++;
}
return len;
}
static inline slist_t *slist_remove(slist_t *l, slist_t *n)
{
/* remove slist head */
struct slist_node *node = l;
while (node->next && node->next != n) node = node->next;
/* remove node */
if (node->next != (slist_t *)0) node->next = node->next->next;
return l;
}
static inline slist_t *slist_first(slist_t *l)
{
return l->next;
}
static inline slist_t *slist_tail(slist_t *l)
{
while (l->next) l = l->next;
return l;
}
static inline slist_t *slist_next(slist_t *n)
{
return n->next;
}
typedef struct QUEUE_ST
{
char* data;
int ElemNum;
int ElemSize;
int front;
int rear;
}QueueSt;
/***************************************************************************
function: QueueCreate
input:
output:
Description:提供统一队列创建函数
****************************************************************************/
QueueSt *QueueCreate(int ElementSize,int ElementNum);
/***************************************************************************
function: IsQueueEmpty
input:
output:
Description:判断队列是否为空
****************************************************************************/
int IsQueueEmpty(QueueSt *queue);
/***************************************************************************
function: EnQueue
input:
output:
Description:入队操作
****************************************************************************/
int EnQueue(QueueSt *queue, void* InData,int size);
/***************************************************************************
function: ExQueue
input:
output:
Description:出队操作
****************************************************************************/
int ExQueue(QueueSt *queue, void *OutData,int size);
#include <string.h>
#include "queue.h"
#include "log.h"
/***************************************************************************
function: MqttQueueCreate
input:
output:
Description:创建队列
****************************************************************************/
QueueSt *QueueCreate(int ElementSize,int ElementNum)
{
QueueSt *queue = (QueueSt *)malloc(sizeof(QueueSt));
if(NULL == queue)
{
LS_LOG(LOG_ERROR, "malloc fail!\n");
return NULL;
}
memset((char *)queue, 0, sizeof(QueueSt));
queue->data =(char*)malloc((unsigned long)(ElementSize * ElementNum));
if(NULL == queue->data)
{
LS_LOG(LOG_ERROR, "malloc fail !\n");
return NULL;
}
queue->ElemNum = ElementNum;
queue->ElemSize = ElementSize;
queue->rear = 0;
queue->front = 0;
return queue;
}
/***************************************************************************
function: IsQueueEmpty
input:
output:
Description:判断队列是否为空
****************************************************************************/
int IsQueueEmpty(QueueSt *queue)
{
if(NULL == queue)
{
LS_LOG(LOG_ERROR, "queue is NULL!\n");
return LS_ERR;
}
return (queue->front == queue->rear ? TRUE : FALSE); //空为TRUE,否则为FALSE
}
/***************************************************************************
function: ExQueue
input:
output:
Description:出队操作
****************************************************************************/
int ExQueue(QueueSt *queue, void *OutData, int size)
{
if(NULL == queue || NULL == OutData)
{
LS_LOG(LOG_ERROR, "input para is NULL!\n");
return LS_ERR;
}
if(TRUE == IsQueueEmpty(queue))
{
LS_LOG(LOG_INFO, "queue empty!\n");
return LS_ERR;
}
if(size > queue->ElemSize)
{
LS_LOG(LOG_ERROR, "error:size > ElemSize!\n");
return LS_ERR;
}
if (OutData != NULL)
{
memcpy(OutData,queue->data + queue->front * queue->ElemSize, (unsigned long)size);
}
else
{
LS_LOG(LOG_ERROR, "OutData is NULL!\n");
return LS_ERR;
}
queue->front = (queue->front + 1) % queue->ElemNum; //++队首 % N
return LS_OK;
}
/***************************************************************************
function: IsQueueFull
input:
output:
Description:判断队列是否为满
****************************************************************************/
int IsQueueFull(QueueSt *queue)
{
if(NULL == queue)
{
LS_LOG(LOG_ERROR,"queue is NULL!\n");
return LS_ERR;
}
return ((queue->rear + 1) % queue->ElemNum == queue->front ? TRUE : FALSE); //空为TRUE,否则为FALSE
}
/***************************************************************************
function: EnQueue
input:
output:
Description:入队操作
****************************************************************************/
int EnQueue(QueueSt *queue, void* InData, int size)
{
if(NULL == queue || NULL == InData)
{
LS_LOG(LOG_ERROR, "input para is NULL!\n");
return LS_ERR;
}
if(TRUE == IsQueueFull(queue))
{
LS_LOG(LOG_ERROR, "queue is full!\n");
return LS_ERR;
}
if(size > queue->ElemSize)
{
LS_LOG(LOG_ERROR, "error:size > ElemSize!\n");
return LS_ERR;
}
memcpy(queue->data + queue->rear * queue->ElemSize, (char *)InData, (unsigned long)size);
queue->rear = (queue->rear + 1) % queue->ElemNum; //++队尾 % N
return LS_OK;
}
接下来就是本文的重头戏了。
#include <pthread.h>
#include <semaphore.h>
#include "MQTTClient.h"
#include "list.h"
#include "queue.h"
#define CLIENTID "ExampleClientSub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 0
#define TIMEOUT 10000L
#define DISCONNECT "out"
/**********订阅主题**********/
#define SUB_TOPIC_NUM 1
#define TOPIC_SUB_IOT_REG "v1/127001/response/iotregister"
/********发布主题*********/
#define PUB_TOPIC_NUM 1
#define TOPIC_PUB_REG_DEVICE_ID 0
#define TOPIC_PUB_REG_DEVICE "v1/127001/sensor/add"
typedef struct SEND_RECORD_ST
{
long long mid;
char type[64];
list_t list;
}SendRecordSt;
typedef struct MQTT_PUB_DATA_ST
{
int TopicId;
int len;
char* data;
}MqttPubDataSt;
typedef struct MQTT_SUB_DATA_ST
{
int clientId;//用于区分不同的mqtt客户端
int len;
char *topic;
char *data;
}MqttSubDataSt;
typedef struct MQTT_PUB_MGR_ST
{
MqttPubDataSt stMqttPubData;
QueueSt *pMqttPubQueue;
list_t PubDataList;
pthread_mutex_t lock;//访问互斥锁
sem_t QueueWaitsem;
sem_t DeliverOksem;
}MqttPubMgrSt;
typedef struct MQTT_SUB_MGR_ST
{
MqttSubDataSt stMqttSubData;
QueueSt *pMqttSubQueue;
pthread_mutex_t lock;//访问互斥锁
sem_t QueueWaitsem;
}MqttSubMgrSt;
typedef struct MQTT_MGR_ST
{
MQTTClient MqttClient;
int MqttConnectFlag;
int StartBusinessFlag;
long long MessageId;
pthread_mutex_t lock;//访问互斥锁
MqttPubMgrSt stPubMgr;
MqttSubMgrSt stSubMgr;
list_t AddFailList;
int ReAddCnt;
sem_t ReAddSem;
pthread_mutex_t Addlock;//访问互斥锁
list_t ActivateFailList;
int ReActivateCnt;
sem_t ReActivateSem;
pthread_mutex_t Activatelock;//访问互斥锁
list_t UpdateFailList;
int ReUpdateCnt;
sem_t ReUpdateSem;
pthread_mutex_t Updatelock;//访问互斥锁
}MqttMgrSt;
void threadWaitDone(void **reval);
int mqttClientInit(void);
新增主题或者删除主题的订阅或者发布需要修改SUB_TOPIC_NUM和PUB_TOPIC_NUM ,不然会数组越界挂掉,看到mqttClientMgr.c就知道原因了。
加了一些锁和信号量。
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "mqttClientMgr.h"
#include "log.h"
static MqttMgrSt stMqttMgr;
static volatile MQTTClient_deliveryToken deliveredtoken;
static pthread_t MqttClientThreadId;
static pthread_t ProcSubQueueThreadId;
#define PUB_QUEUE_ELEM_NUM 200
#define SUB_QUEUE_ELEM_NUM 500
#define MAX_BROKER_ADDR_LEN 32
#define MQTT_PLATFORM_ENABLE (1)
#define BROKER_IP "127.0.0.1"
#define BROKER_PORT "1883"
#define CLIENT_ID "anyhow"
#define USER_NAME "mosquitto"
#define PWD "ls@123"
static char * SubTopics[SUB_TOPIC_NUM]={TOPIC_SUB_IOT_REG};
static char * PubTopics[PUB_TOPIC_NUM]={TOPIC_PUB_REG_DEVICE};
void threadWaitDone(void **reval)
{
pthread_join(MqttClientThreadId, reval);
}
void SetThreadAttr(pthread_attr_t *thread_attr, unsigned int priority, int policy, size_t stack_size)
{
size_t StackSize = 0;
pthread_attr_init(thread_attr);//首先需要对属性变量进行初始化
pthread_attr_setscope(thread_attr,PTHREAD_SCOPE_PROCESS);
pthread_attr_setdetachstate(thread_attr,PTHREAD_CREATE_JOINABLE);
pthread_attr_setschedpolicy(thread_attr,policy);
struct sched_param param;
pthread_attr_getschedparam(thread_attr, ¶m);
param.sched_priority = (int)priority;
pthread_attr_setschedparam(thread_attr, ¶m);
if(stack_size < 0x4000) //以字节为单位 最小16k
{
StackSize = 0x4000;
}
pthread_attr_setstacksize(thread_attr,StackSize);
}
int CreateThread(pthread_t *thread_id, const pthread_attr_t *attr,void *(*thread_fun) (void *), void *thread_arg)
{
int ret = -1;
ret = pthread_create(thread_id,attr,thread_fun,thread_arg);
return(ret);
}
/***************************************************************************
function: GetBrokerAddrinput:
output:
Description:通过配置构建MQTT broker地址
****************************************************************************/
static int GetBrokerAddr(char *BrokerAddr,int len)
{
unsigned int size = 0;
char BrokerPortStr[6]; //格式:protocol://host:port
if(NULL == BrokerAddr)
{
LS_LOG(LOG_ERROR, "GetBrokerAddr: BrokerAddr is NULL.\n");
return LS_ERR;
}
memset(BrokerPortStr, 0, sizeof(BrokerPortStr));
memcpy(BrokerAddr, "tcp://", strlen("tcp://"));
size = (unsigned int)len - strlen("tcp://");
if(strlen((char*)BROKER_IP) > size)
{
LS_LOG(LOG_ERROR, "GetBrokerAddr: BrokerAddr buf is not enough!\n");
return LS_ERR;
}
strcat(BrokerAddr, (char*)BROKER_IP);
size = size - strlen((char*)BROKER_IP);
if(size < 1)
{
LS_LOG(LOG_ERROR, "GetBrokerAddr: BrokerAddr buf is not enough!\n");
return LS_ERR;
}
strcat(BrokerAddr, ":");
strcat(BrokerAddr, BROKER_PORT);
/*size = size - 1;
TransIntToStr(BROKER_PORT,BrokerPortStr);
if(size < (strlen(BROKER_PORT) + 1))
{
LS_LOG(LOG_ERROR,"GetBrokerAddr: BrokerAddr buf is not enough !\n");
return LS_ERR;
}
strcat(BrokerAddr,BrokerPortStr);*/
return LS_OK;
}
/***************************************************************************
function: MqttSubscribeAllTopics
input:
output:
Description:客户端初始化的时候完成所有主题的订阅
****************************************************************************/
void MqttSubscribeAllTopics(void)
{
int i = 0;
for(i = 0;i < SUB_TOPIC_NUM; i++)
{
LS_LOG(LOG_INFO, "Subscribing topic %s\nfor client %s using QoS %d\n\n",SubTopics[i], CLIENTID, QOS);
MQTTClient_subscribe(stMqttMgr.MqttClient, SubTopics[i], QOS);
}
}
/***************************************************************************
function: MqttClientRelease
input:
output:
Description:MQTT客户端释放
****************************************************************************/
void MqttClientRelease(MQTTClient client)
{
MQTTClient_unsubscribe(client, TOPIC);
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
}
/***************************************************************************
function: connlost
input:
output:
Description:客户端注册的连接失败回调函数
****************************************************************************/
void connlost(void *context, char *cause)
{
LS_LOG(LOG_ERROR, "\nConnection lost\n");
LS_LOG(LOG_ERROR, " cause: %s\n", cause);
stMqttMgr.MqttConnectFlag = 0;
sem_post(&stMqttMgr.stPubMgr.QueueWaitsem);
}
/***************************************************************************
function: MqttTransData
input:
output:
Description:MqttTransData
****************************************************************************/
int MqttTransData(MqttSubDataSt *tdata)
{
int ret = LS_ERR;
pthread_mutex_lock(&stMqttMgr.stSubMgr.lock);
ret= EnQueue(stMqttMgr.stSubMgr.pMqttSubQueue, (char *)tdata, sizeof(MqttSubDataSt));
pthread_mutex_unlock(&stMqttMgr.stSubMgr.lock);
if(LS_OK == ret)
{
sem_post(&stMqttMgr.stSubMgr.QueueWaitsem);
}
return ret;
}
/***************************************************************************
function: msgarrvd
input:
output:
Description:客户端注册的收到订阅消息回调函数
****************************************************************************/
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
char* payloadptr = NULL;
MqttSubDataSt stMqttSubData;
char* pData = NULL;
LS_LOG(LOG_DEBUG, "Message arrived on topic: %s\n", topicName);
memset(&stMqttSubData, 0, sizeof(MqttSubDataSt));
stMqttSubData.clientId = 0;
payloadptr = message->payload;
if(strcmp(payloadptr, DISCONNECT) == 0)
{
LS_LOG(LOG_WARN, " DISCONNECT\n out!!\n");
stMqttMgr.MqttConnectFlag = FALSE;
}
LS_LOG(LOG_DEBUG, "message payload: %s\n", (char *)message->payload);
pData = malloc((unsigned long)message->payloadlen); //由队列处理线程free pData
if(NULL == pData)
{
LS_LOG(LOG_ERROR, "malloc failed!!\n");
goto fail;
}
memset(pData, 0, (unsigned long)message->payloadlen);
memcpy(pData, payloadptr, (unsigned long)message->payloadlen);
stMqttSubData.data = pData;
stMqttSubData.len = message->payloadlen;
stMqttSubData.topic = topicName;
if(LS_OK != MqttTransData(&stMqttSubData))//成功,由队列处理线程free pData
{
LS_LOG(LOG_WARN, "MqttTransData failed!!\n");
free(stMqttSubData.data);
}
fail:
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
/***************************************************************************
function: delivered
input:
output:
Description:客户端注册的消息delivered回调函数
****************************************************************************/
void delivered(void *context, MQTTClient_deliveryToken dt)
{
LS_LOG(LOG_INFO, "Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
/***************************************************************************
function: MqttClientThread()
input:
output:
Description:MQTT客户端处理线程
****************************************************************************/
void *MqttClientThread()
{
int rc = 0;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token = 0;
char BrokerAddr[MAX_BROKER_ADDR_LEN];
memset(BrokerAddr, 0, sizeof(BrokerAddr));
if(LS_OK != GetBrokerAddr(BrokerAddr,MAX_BROKER_ADDR_LEN))
{
LS_LOG(LOG_ERROR, "GetBrokerAddr Err.\n");
return NULL;
}
LS_LOG(LOG_INFO, "BrokerAddr: %s\n", BrokerAddr);
MQTTClient_create(&stMqttMgr.MqttClient, BrokerAddr, (char*)CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
if(NULL == stMqttMgr.MqttClient)
{
LS_LOG(LOG_ERROR,"MQTTClient_create g_MottClient fail!\n");
return NULL;
}
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = (char*)USER_NAME;
conn_opts.password = (char*)PWD;
MQTTClient_setCallbacks(stMqttMgr.MqttClient, NULL, connlost, msgarrvd, delivered);
ReLink:
do
{
LS_LOG(LOG_INFO, "start to connect MQTT server...\n");
if ((rc = MQTTClient_connect(stMqttMgr.MqttClient, &conn_opts)) == MQTTCLIENT_SUCCESS)
{
LS_LOG(LOG_INFO, "connect success!\n");
stMqttMgr.MqttConnectFlag = TRUE;
break;
}
else
{
LS_LOG(LOG_ERROR, "Failed to connect, return code %d\n", rc);
sleep(20); //时间可配置
}
}while(1);
//完成所有主题的订阅初始化
if(MQTT_PLATFORM_ENABLE)
{
MqttSubscribeAllTopics();
}
//客户端连接正常情况下进行信息发布
while(TRUE == stMqttMgr.MqttConnectFlag) //发布消息
{
sem_wait(&stMqttMgr.stPubMgr.QueueWaitsem);
//LS_LOG(LOG_INFO,"rev QueueWaitsem !!\n");
pthread_mutex_lock(&stMqttMgr.stPubMgr.lock);
do
{
if(LS_OK == ExQueue(stMqttMgr.stPubMgr.pMqttPubQueue, (void*)&stMqttMgr.stPubMgr.stMqttPubData, sizeof(MqttPubDataSt)))
{
pubmsg.payload = stMqttMgr.stPubMgr.stMqttPubData.data; //添加一个成员用以决定发布TOPIC 记得释放free
pubmsg.payloadlen = stMqttMgr.stPubMgr.stMqttPubData.len;
pubmsg.qos = QOS;
pubmsg.retained = 0;
deliveredtoken = 0;
if(MQTT_PLATFORM_ENABLE)
{
rc = MQTTClient_publishMessage(stMqttMgr.MqttClient, PubTopics[stMqttMgr.stPubMgr.stMqttPubData.TopicId], &pubmsg, &token);
if(strlen(pubmsg.payload) < 1024)
{
LS_LOG(LOG_INFO, "MQTT Publish message %s\n""on topic %s token %d \n",
(char*)pubmsg.payload,
PubTopics[stMqttMgr.stPubMgr.stMqttPubData.TopicId],
token);
}
}
if(rc != 0)
{
LS_LOG(LOG_ERROR, "MQTT Publish message Failed, rc=%d !!\n", rc);
}
free(stMqttMgr.stPubMgr.stMqttPubData.data); //重要,异地malloc,此地释放
}
}
while(FALSE == IsQueueEmpty(stMqttMgr.stPubMgr.pMqttPubQueue));
pthread_mutex_unlock(&stMqttMgr.stPubMgr.lock);
}
//客户端连接断开后进行重连处理
goto ReLink;
//本应用不会运行至此
MqttClientRelease(stMqttMgr.MqttClient);
}
/***************************************************************************
function: MqttProcessIotResponse()
input:
output:
Description:APP处理Iot平台回应
****************************************************************************/
void MqttProcessIotResponse(MqttSubDataSt *tSubData)
{
//此处进行数据处理
}
/***************************************************************************
function: MqttProcSubInfoThread
input:
output:
Description:MQTT处理订阅消息队列线程
****************************************************************************/
void *MqttProcSubInfoThread(void *param)
{
LS_LOG(LOG_INFO, "MqttProcSubInfoThread start !\n");
while(1)
{
sem_wait(&stMqttMgr.stSubMgr.QueueWaitsem);
pthread_mutex_lock(&stMqttMgr.stSubMgr.lock);
do
{
if(LS_OK == ExQueue(stMqttMgr.stSubMgr.pMqttSubQueue, (void*)&stMqttMgr.stSubMgr.stMqttSubData, sizeof(MqttSubDataSt)))
{
MqttProcessIotResponse(&stMqttMgr.stSubMgr.stMqttSubData);
free(stMqttMgr.stSubMgr.stMqttSubData.data);
}
}
while(FALSE == IsQueueEmpty(stMqttMgr.stSubMgr.pMqttSubQueue));
pthread_mutex_unlock(&stMqttMgr.stSubMgr.lock);
}
}
int mqttClientInit(void)
{
int ret = -1;
pthread_attr_t Thread_Attr;
memset((char *)&stMqttMgr,0,sizeof(MqttMgrSt));
stMqttMgr.MqttConnectFlag = FALSE;
stMqttMgr.StartBusinessFlag = FALSE;
stMqttMgr.MessageId = 1000000000000000;
pthread_mutex_init(&stMqttMgr.lock,NULL);
sem_init(&stMqttMgr.stPubMgr.QueueWaitsem,0,0);
pthread_mutex_init(&stMqttMgr.stPubMgr.lock,NULL);
list_init(&stMqttMgr.stPubMgr.PubDataList);
sem_init(&stMqttMgr.stSubMgr.QueueWaitsem,0,0);
pthread_mutex_init(&stMqttMgr.stSubMgr.lock,NULL);
list_init(&stMqttMgr.AddFailList);
stMqttMgr.ReAddCnt = 0;
sem_init(&stMqttMgr.ReAddSem,0,0);
pthread_mutex_init(&stMqttMgr.Addlock,NULL);
list_init(&stMqttMgr.ActivateFailList);
stMqttMgr.ReActivateCnt = 0;
sem_init(&stMqttMgr.ReActivateSem,0,0);
pthread_mutex_init(&stMqttMgr.Activatelock,NULL);
list_init(&stMqttMgr.UpdateFailList);
stMqttMgr.ReUpdateCnt = 0;
sem_init(&stMqttMgr.ReUpdateSem,0,0);
pthread_mutex_init(&stMqttMgr.Updatelock,NULL);
stMqttMgr.stPubMgr.pMqttPubQueue = QueueCreate(sizeof(MqttPubDataSt), PUB_QUEUE_ELEM_NUM);
if(NULL == stMqttMgr.stPubMgr.pMqttPubQueue)
{
LS_LOG(LOG_ERROR, "Create MQTT Pub Queue failed!\n");
return LS_ERR;
}
stMqttMgr.stSubMgr.pMqttSubQueue = QueueCreate(sizeof(MqttSubDataSt), SUB_QUEUE_ELEM_NUM);
if(NULL == stMqttMgr.stSubMgr.pMqttSubQueue)
{
LS_LOG(LOG_ERROR, "Create MQTT Sub Queue failed!\n");
return LS_ERR;
}
SetThreadAttr(&Thread_Attr, 20, SCHED_OTHER, 0x100000);
ret = CreateThread(&MqttClientThreadId, &Thread_Attr, MqttClientThread, NULL);
if(ret != 0)
{
LS_LOG(LOG_ERROR, "Create MQTT client thread failed!\n");
return LS_ERR;
}
SetThreadAttr(&Thread_Attr, 20, SCHED_OTHER, 0x8000);
ret = CreateThread(&ProcSubQueueThreadId, &Thread_Attr, MqttProcSubInfoThread, NULL);
if(ret != 0)
{
LS_LOG(LOG_ERROR, "Create MQTT Process Sub Info thread failed!\n");
return LS_ERR;
}
return LS_OK;
}
/***************************************************************************
function: MqttSendData
input: MqttPubDataSt类型指针的数据
output:
Description:MQTT发送数据
return: LS_OK-成功 LS_ERR-失败(需要重新发送)
****************************************************************************/
int MqttSendData(char *data, int len, int TopicId, SendRecordSt *pSendRecord)
{
int ret = LS_ERR;
MqttPubDataSt StPubData;
char *pdata = NULL;
memset(&StPubData, 0, sizeof(MqttPubDataSt));
if(NULL != data && len > 0)
{
pdata = (char *)malloc((unsigned long)len);
if(NULL == pdata)
{
LS_LOG(LOG_INFO, "malloc failed!!\n");
return LS_ERR;
}
memset(pdata, 0, (unsigned long)len);
memcpy(pdata, data, (unsigned long)len);
}
StPubData.TopicId = TopicId;
StPubData.data = pdata;
StPubData.len = len;
pthread_mutex_lock(&stMqttMgr.stPubMgr.lock);
ret= EnQueue(stMqttMgr.stPubMgr.pMqttPubQueue, (char *)&StPubData, sizeof(MqttPubDataSt));
if(NULL != pSendRecord) //加入发送管理链表
{
list_insert_before(&stMqttMgr.stPubMgr.PubDataList, &pSendRecord->list);
}
pthread_mutex_unlock(&stMqttMgr.stPubMgr.lock);
if(LS_OK != ret)
{
LS_LOG(LOG_INFO, "EnQueue error!!\n");
free(pdata);
return LS_ERR;
}
sem_post(&stMqttMgr.stPubMgr.QueueWaitsem);
return LS_OK;
}
新增或删除主题的订阅和发布需要修改这俩全局数组
static char * SubTopics[SUB_TOPIC_NUM]={TOPIC_SUB_IOT_REG};
static char * PubTopics[PUB_TOPIC_NUM]={TOPIC_PUB_REG_DEVICE};
MqttSendData接口提供了MQTT消息发布的接口,就是先把MQTT消息进队列,然后在MqttProcSubInfoThread线程出队列,进行发布。
从mqttClientInit开始,做了锁,信号俩个,消息队列的初始化,消息队列有俩,一个用来处理发布,一个用来处理订阅;建立俩线程,一个是MQTT客户端的建立线程,一个是订阅的MQTT消息处理线程。
msgarrvd接口里就可以对收到的消息进行处理,本文的处理还是把MQTT消息进队列,MqttProcSubInfoThread线程出队列,各位可以在MqttProcessIotResponse接口里对订阅到的MQTT消息进行处理。
到此为止,整个工程算是贴完了,下面就是见证奇迹的时刻。
ssh端用tail -f /home/ymj/mqttClient/log/mqttlog/mqttlog.log命令动态查看日志。
使用软件自带的工具运行代码。
结果显示
[20220908-16:45:46][INFO][main.c][changeWorkDir][22]—workdir:/home/ymj/mqttClient—
[20220908-16:45:46][INFO][main.c][main][39]—mqttClient start—
[20220908-16:45:46][INFO][mqttClientMgr.c][MqttClientThread][255]BrokerAddr: tcp://127.0.0.1:1883
[20220908-16:45:46][INFO][mqttClientMgr.c][MqttClientThread][275]start to connect MQTT server…
[20220908-16:45:46][INFO][mqttClientMgr.c][MqttProcSubInfoThread][363]MqttProcSubInfoThread start !
[20220908-16:45:46][ERROR][mqttClientMgr.c][MqttClientThread][284]Failed to connect, return code -1
[20220908-16:46:06][INFO][mqttClientMgr.c][MqttClientThread][275]start to connect MQTT server…
[20220908-16:46:07][ERROR][mqttClientMgr.c][MqttClientThread][284]Failed to connect, return code -1
连接MQTT服务端失败,为啥呢?因为没有起服务端的broker,哈哈。
运行mosquitto,默认端口为1883,也可以自己指定端口,方法自己百度去。
日志显示连接broker成功,并打印出订阅的主题
宿主机打开MQTT客户端工具mqtt.fx,这个软件也可以从网上直接搜到
我的虚拟机ip是192.168.8.235,所以配置连接如下
配置完成后点击OK,退回来,connect。
填入代码里订阅的主题和要发布的内容
点击发布
程序成功接收到,验证完成,我擦,后面的乱码是啥,不管了,你们自己优化去。
3032.9.19:乱码原因找到了,才想起来就更新下,因为MQTT的那个消息结构体有两个连续的char*字段导致,可以修改成固定长度的char字符串数组或者分开定义字段即可。
没记错的话应该是这个结构体:
typedef struct MQTT_SUB_DATA_ST
{
int clientId;//用于区分不同的mqtt客户端
int len;
char *topic;
char *data;
}MqttSubDataSt;
程序的MQTT发布也自己写代码验证去。调用代码中提供的接口,搭好环境验证即可。
项目工程如下
https://download.csdn.net/download/qqq1112345/86512208
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。