赞
踩
继续上一篇关于MQTT的话题,本篇呢,我们将脱离paho的库环境,进行MQTT搭建。
在paho.mqtt库中,简单翻阅一下,可以看到他的核心库的路径为 paho.mqtt.c-1.3.0\src
经过笔者的手撕源码,排除掉了许多冗余的部分,就只剩下列文件
将这些文件,笔者单独放到了“mqtt文件夹”内,做一个库封装起来。
头文件太多太繁琐,因此需要个文件把它们集成起来。
mqtt_client.h:
- #ifndef __MQTT_CLIENT_H__
- #define __MQTT_CLIENT_H__
-
- #include "MQTTClient.h"
-
- #ifdef __cplusplus
- extern "C" {
- #endif
-
- #define MQTT_SUCCESS 0
- #define MQTT_FAILURE -1
- #define MQTT_DISCONNECTED -3
- #define MQTT_MAX_MESSAGES_INFLIGHT -4
- #define MQTT_BAD_UTF8_STRING -5
- #define MQTT_NULL_PARAMETER -6
- #define MQTT_TOPICNAME_TRUNCATED -7
- #define MQTT_BAD_STRUCTURE -8
- #define MQTT_BAD_QOS -9
- #define QOS_AT_MOST_ONCE 0
- #define QOS_AT_LEAST_ONCE 1
- #define QOS_EXACTLY_ONCE 2
- #define MQTT_PORT 1883
- #define MQTT_DEFAULT_TIME_OUT 3000
-
-
- /* MQTT client object*/
- typedef struct _mqtt_client mqtt_client;
-
- typedef int CALLBACK_MESSAGE_ARRIVED(mqtt_client *m, char *topic, char *data, int length);
-
- /* structure of MQTT client object*/
- struct _mqtt_client {
- MQTTClient client;
- //int Qos; //Quality of service
- int timeout; //time out (milliseconds)
- CALLBACK_MESSAGE_ARRIVED *on_message_arrived;
-
- int received_message_id;
- char * received_topic;
- char * received_message;
- int received_message_len;
- int received_topic_len;
-
- MQTTClient_message * received_msg;
- };
-
- mqtt_client * mqtt_new(char * host, int port, char *client_id);
-
- int mqtt_delete(mqtt_client *m);
-
- int mqtt_connect(mqtt_client * m, char *username, char *password);
-
- int mqtt_disconnect(mqtt_client * m);
-
- int mqtt_is_connected(mqtt_client *m);
-
- void mqtt_yield(void);
-
- int mqtt_set_timeout(mqtt_client *m, int timeout);
-
- int mqtt_set_callback_message_arrived(mqtt_client *m, CALLBACK_MESSAGE_ARRIVED * function);
-
- int mqtt_subscribe(mqtt_client *m, char *topic, int Qos);
-
- int mqtt_unsubscribe(mqtt_client *m, char *topic);
-
- int mqtt_publish_data(mqtt_client * m, char *topic, void *data, int length, int Qos);
-
- int mqtt_publish(mqtt_client * m, char *topic, char *message, int Qos);
-
- int mqtt_receive(mqtt_client *m, unsigned long timeout);
-
- void mqtt_sleep(int milliseconds);
-
-
- #ifdef __cplusplus
- }
- #endif
-
- #endif /* __MQTT_CLIENT_H__ */

mqtt_client.c:
- #include <stdlib.h>
- #include <memory.h>
- #include <string.h>
- #include <errno.h>
- #include "mqtt_client.h"
- #include "MQTTClientPersistence.h"
-
- mqtt_client * mqtt_new(char * host, int port, char *client_id)
- {
- mqtt_client * m;
- int rc;
-
- m = malloc(sizeof(mqtt_client));
- if ( m != NULL) {
- memset(m , 0, sizeof(mqtt_client));
- rc = MQTTClient_create(&(m->client), host, client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
- if ( rc == MQTTCLIENT_SUCCESS ) {
- m->timeout = MQTT_DEFAULT_TIME_OUT;
- m->received_msg = NULL;
- //mqtt_set_callback_message_arrived(m, m->on_message_arrived);
- } else {
- free(m);
- errno = rc;
- m = NULL;
- return NULL;
- }
- }
- return m;
- }
-
- int mqtt_connect(mqtt_client * m, char *username, char *password)
- {
- int rc;
- MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
-
- if (!m) return -1;
-
- conn_opts.keepAliveInterval = 20;
- conn_opts.cleansession = 1;
-
- rc = MQTTClient_connect(m->client, &conn_opts);
- return rc;
- }
-
- int mqtt_disconnect(mqtt_client * m)
- {
- if (!m) return -1;
- return MQTTClient_disconnect(m->client, 10000);
- }
-
- int mqtt_is_connected(mqtt_client *m)
- {
- if (!m) return 0;
- return MQTTClient_isConnected(m->client);
- }
-
- void mqtt_yield(void)
- {
- MQTTClient_yield();
- }
-
- int mqtt_set_timeout(mqtt_client *m, int timeout)
- {
- if (!m) return -1;
- m->timeout = timeout;
- return MQTT_SUCCESS;
- }
-
- static int internal_callback_message_arrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
- {
- mqtt_client *m;
-
- m = (mqtt_client *)context;
- if (!m) return -1;
- if (m->on_message_arrived == NULL) return -1;
-
- if ( topicName[topicLen] != 0 )
- topicName[topicLen] = 0;
-
- return m->on_message_arrived(m, topicName, message->payload, message->payloadlen);
- }
-
- static void internal_callback_connectionLost(void *context, char *cause)
- {
- return;
- }
-
- void internal_callback_delivery_complete(void *context, MQTTClient_deliveryToken dt)
- {
- return;
- }
-
- int mqtt_set_callback_message_arrived(mqtt_client *m, CALLBACK_MESSAGE_ARRIVED * function)
- {
- int ret;
-
- if (!m) return -1;
- m->on_message_arrived = function;
- ret = MQTTClient_setCallbacks(m->client, m,
- internal_callback_connectionLost, //MQTTClient_connectionLost * cl,
- internal_callback_message_arrived, //MQTTClient_messageArrived * ma,
- internal_callback_delivery_complete //MQTTClient_deliveryComplete * dc
- );
- return ret;
- }
-
- int mqtt_subscribe(mqtt_client *m, char *topic, int Qos)
- {
- if (!m) return -1;
- return MQTTClient_subscribe (m->client, topic, Qos);
- }
-
- int mqtt_unsubscribe(mqtt_client *m, char *topic)
- {
- if (!m) return -1;
- return MQTTClient_unsubscribe (m->client, topic);
- }
-
- int mqtt_delete(mqtt_client *m)
- {
- if (!m) return -1;
- MQTTClient_destroy(&(m->client));
- return 0;
- }
-
- int mqtt_publish_data(mqtt_client * m, char *topic, void *data, int length, int Qos)
- {
- MQTTClient_message pubmsg = MQTTClient_message_initializer;
- MQTTClient_deliveryToken token = -1;
- int rc;
-
- if (!m) return -1;
-
- pubmsg.payload = data;
- pubmsg.payloadlen = length;
- pubmsg.qos = Qos;
- pubmsg.retained = 0;
-
- rc = MQTTClient_publishMessage(m->client, topic, &pubmsg, &token);
- if ( rc != MQTTCLIENT_SUCCESS )
- return rc;
-
- if ( m->timeout > 0 ) {
- rc = MQTTClient_waitForCompletion(m->client, token, m->timeout);
- if ( rc != MQTTCLIENT_SUCCESS )
- return rc;
- else
- return token;
- }
-
- return token;
- }
-
- int mqtt_publish(mqtt_client * m, char *topic, char *message, int Qos)
- {
- return mqtt_publish_data(m, topic, message, strlen(message), Qos);
- }
-
- static void mqtt_clear_received(mqtt_client *m)
- {
- if (!m) return;
-
- //MQTTClient_freeMessage();
-
- if ( m->received_msg != NULL ) {
- //free(m->received_msg->payload);
- //free(m->received_msg);
- m->received_msg = NULL;
- m->received_message = NULL;
- m->received_message_len = 0;
- m->received_message_id = 0;
- }
-
- /*
- if ( m->received_topic != NULL) {
- free(m->received_topic);
- m->received_topic = NULL;
- m->received_topic_len = 0;
- }
- */
-
- }
-
- int mqtt_receive(mqtt_client *m, unsigned long timeout)
- {
- int rc;
-
- if (!m) return -1;
-
- mqtt_clear_received(m);
-
- rc = MQTTClient_receive(m->client, &(m->received_topic),
- &(m->received_topic_len), &(m->received_msg), timeout);
-
- if ( rc == MQTTCLIENT_SUCCESS || rc == MQTTCLIENT_TOPICNAME_TRUNCATED ) {
- if ( m->received_msg == NULL) {
- rc = -1;
- } else {
- rc = MQTTCLIENT_SUCCESS;
- if ( m->received_topic[m->received_topic_len] != 0 )
- m->received_topic[m->received_topic_len] = 0;
- m->received_message = m->received_msg->payload;
- m->received_message_len = m->received_msg->payloadlen;
- m->received_message_id = m->received_msg->msgid;
- }
- }
-
- return rc;
- }
-
- void mqtt_sleep(int milliseconds)
- {
- MQTTClient_sleep(milliseconds);
- }
-

函数命名都是根据功能来的,所以笔者也没多打注释了。
把这两个文件也放到mqtt文件夹内。
废话不多说,先上代码为敬:
- #include <stdio.h>
- #include <stdlib.h>
- #include <errno.h>
- #include <signal.h>
- #include "mqtt/mqtt_client.h"
-
- int running = 1;
-
- void stop_running(int sig)
- {
- signal(SIGINT, NULL);
- running = 0;
- }
-
-
- int main(int argc, char ** argv) {
-
- mqtt_client *m; //mqtt_client 对象指针
- int ret; //返回值
- char *host = "localhost:1883";//测试服务器
- char *topic = "MQTT/test"; //主题
- char *client_id = "fs1704";//客户端ID; 对测试服务器,可以随便写
- char *username = NULL;//用户名,用于验证身份。对测试服务器,无。
- char *password = NULL;//密码,用于验证身份。对测试服务器,无。
- int Qos; //Quality of Service
-
- //create new mqtt client object
- m = mqtt_new(host, MQTT_PORT, client_id); //创建对象,MQTT_PORT = 1883
- if ( m == NULL ) {
- printf("mqtt client create failure, return code = %d\n", errno);
- return 1;
- } else {
- printf("mqtt client created\n");
- }
-
- //connect to server
- ret = mqtt_connect(m, username, password); //连接服务器
- if (ret != MQTT_SUCCESS ) {
- printf("mqtt client connect failure, return code = %d\n", ret);
- return 1;
- } else {
- printf("mqtt client connect\n");
- }
-
- //subscribe
- Qos = QOS_EXACTLY_ONCE;
- ret = mqtt_subscribe(m, topic, Qos);//订阅消息
- printf("mqtt client subscribe %s, return code = %d\n", topic, ret);
-
- signal(SIGINT, stop_running);
- signal(SIGTERM, stop_running);
-
- printf("wait for message of topic: %s ...\n", topic);
-
- //loop: waiting message
- while (running) {
- int timeout = 200;
- if ( mqtt_receive(m, timeout) == MQTT_SUCCESS ) { //recieve message,接收消息
- printf("received Topic=%s, Message=%s\n", m->received_topic, m->received_message);
- }
- mqtt_sleep(200); //sleep a while
- }
-
- mqtt_disconnect(m); //disconnect
- printf("mqtt client disconnect");
- mqtt_delete(m); //delete mqtt client object
- return 0;
- }

可以看到,订阅消息是由mqtt_subscribe()函数完成,要订阅多个消息,只需要反复调用这个函数就行了。
要做收发信息处理,就在while(running)里面strump就行了,这个比较简单,就自己根据情况倒腾吧。
- #include <stdio.h>
- #include <stdlib.h>
- #include <errno.h>
- #include "mqtt/mqtt_client.h"
-
- int main(int argc, char ** argv) {
- mqtt_client *m; //mqtt_client 对象指针
- int ret; //返回值
- char *host = "localhost:1883";//测试服务器
- char *topic = "MQTT/test"; //主题
- char *client_id = "fs1704";//客户端ID; 对测试服务器,可以随便写
- char *username = NULL;//用户名,用于验证身份。对测试服务器,无。
- char *password = NULL;//密码,用于验证身份。对测试服务器,无。
- int Qos; //Quality of Service
-
- //create new mqtt client object
- m = mqtt_new(host, MQTT_PORT, client_id); //创建对象,MQTT_PORT = 1883
- if ( m == NULL ) {
- printf("mqtt client create failure, return code = %d\n", errno);
- return 1;
- } else {
- printf("mqtt client created\n");
- }
-
- //connect to server
- ret = mqtt_connect(m, username, password); //连接服务器
- if (ret != MQTT_SUCCESS ) {
- printf("mqtt client connect failure, return code = %d\n", ret);
- return 1;
- } else {
- printf("mqtt client connect\n");
- }
-
- //publish message
- Qos = QOS_EXACTLY_ONCE; //Qos
- ret = mqtt_publish(m, topic, "This is MQTT publicer", Qos);//发布消息
- printf("mqtt client publish, return code = %d\n", ret);
-
- mqtt_disconnect(m); //disconnect
- mqtt_delete(m); //delete mqtt client object
- return 0;
- }

可以看到发布话题是通过mqtt_publish函数实现的。具体内容看上面封装的函数库吧。
笔者通过Jetson Nano亲测成功,因此,笔者更希望读者们能自行完成这些操作,这样可能会对自身学习更好一点。
咱也就不放图了,看源码应该就能想到会产生怎样的画面了吧。
笔者辛辛苦苦的手撕paho.MQTT库,因为我的项目内容需要用到。但是当我测试完后,要把这个库移植到我项目里面,发现。我的项目环境是C++,而这版本的是C语言。C语言下不能调用带有C++语法的函数(命名空间类型的)。因此,笔者踏上了怎么用C++来套这库的路上。
本篇文章,最终代码见(5积分,买不了吃亏,买不了上当,绝对值了):https://download.csdn.net/download/qq_25662827/77129677https://download.csdn.net/download/qq_25662827/77129677
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。