当前位置:   article > 正文

Liunx下用C语言实现MQTT的接收与发送(下)_c mqtt

c mqtt

继续上一篇关于MQTT的话题,本篇呢,我们将脱离paho的库环境,进行MQTT搭建。


环境准备

在paho.mqtt库中,简单翻阅一下,可以看到他的核心库的路径为 paho.mqtt.c-1.3.0\src

经过笔者的手撕源码,排除掉了许多冗余的部分,就只剩下列文件

将这些文件,笔者单独放到了“mqtt文件夹”内,做一个库封装起来。


封装

头文件太多太繁琐,因此需要个文件把它们集成起来。

mqtt_client.h:

  1. #ifndef __MQTT_CLIENT_H__
  2. #define __MQTT_CLIENT_H__
  3. #include "MQTTClient.h"
  4. #ifdef __cplusplus
  5. extern "C" {
  6. #endif
  7. #define MQTT_SUCCESS 0
  8. #define MQTT_FAILURE -1
  9. #define MQTT_DISCONNECTED -3
  10. #define MQTT_MAX_MESSAGES_INFLIGHT -4
  11. #define MQTT_BAD_UTF8_STRING -5
  12. #define MQTT_NULL_PARAMETER -6
  13. #define MQTT_TOPICNAME_TRUNCATED -7
  14. #define MQTT_BAD_STRUCTURE -8
  15. #define MQTT_BAD_QOS -9
  16. #define QOS_AT_MOST_ONCE 0
  17. #define QOS_AT_LEAST_ONCE 1
  18. #define QOS_EXACTLY_ONCE 2
  19. #define MQTT_PORT 1883
  20. #define MQTT_DEFAULT_TIME_OUT 3000
  21. /* MQTT client object*/
  22. typedef struct _mqtt_client mqtt_client;
  23. typedef int CALLBACK_MESSAGE_ARRIVED(mqtt_client *m, char *topic, char *data, int length);
  24. /* structure of MQTT client object*/
  25. struct _mqtt_client {
  26. MQTTClient client;
  27. //int Qos; //Quality of service
  28. int timeout; //time out (milliseconds)
  29. CALLBACK_MESSAGE_ARRIVED *on_message_arrived;
  30. int received_message_id;
  31. char * received_topic;
  32. char * received_message;
  33. int received_message_len;
  34. int received_topic_len;
  35. MQTTClient_message * received_msg;
  36. };
  37. mqtt_client * mqtt_new(char * host, int port, char *client_id);
  38. int mqtt_delete(mqtt_client *m);
  39. int mqtt_connect(mqtt_client * m, char *username, char *password);
  40. int mqtt_disconnect(mqtt_client * m);
  41. int mqtt_is_connected(mqtt_client *m);
  42. void mqtt_yield(void);
  43. int mqtt_set_timeout(mqtt_client *m, int timeout);
  44. int mqtt_set_callback_message_arrived(mqtt_client *m, CALLBACK_MESSAGE_ARRIVED * function);
  45. int mqtt_subscribe(mqtt_client *m, char *topic, int Qos);
  46. int mqtt_unsubscribe(mqtt_client *m, char *topic);
  47. int mqtt_publish_data(mqtt_client * m, char *topic, void *data, int length, int Qos);
  48. int mqtt_publish(mqtt_client * m, char *topic, char *message, int Qos);
  49. int mqtt_receive(mqtt_client *m, unsigned long timeout);
  50. void mqtt_sleep(int milliseconds);
  51. #ifdef __cplusplus
  52. }
  53. #endif
  54. #endif /* __MQTT_CLIENT_H__ */

mqtt_client.c:

  1. #include <stdlib.h>
  2. #include <memory.h>
  3. #include <string.h>
  4. #include <errno.h>
  5. #include "mqtt_client.h"
  6. #include "MQTTClientPersistence.h"
  7. mqtt_client * mqtt_new(char * host, int port, char *client_id)
  8. {
  9. mqtt_client * m;
  10. int rc;
  11. m = malloc(sizeof(mqtt_client));
  12. if ( m != NULL) {
  13. memset(m , 0, sizeof(mqtt_client));
  14. rc = MQTTClient_create(&(m->client), host, client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
  15. if ( rc == MQTTCLIENT_SUCCESS ) {
  16. m->timeout = MQTT_DEFAULT_TIME_OUT;
  17. m->received_msg = NULL;
  18. //mqtt_set_callback_message_arrived(m, m->on_message_arrived);
  19. } else {
  20. free(m);
  21. errno = rc;
  22. m = NULL;
  23. return NULL;
  24. }
  25. }
  26. return m;
  27. }
  28. int mqtt_connect(mqtt_client * m, char *username, char *password)
  29. {
  30. int rc;
  31. MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  32. if (!m) return -1;
  33. conn_opts.keepAliveInterval = 20;
  34. conn_opts.cleansession = 1;
  35. rc = MQTTClient_connect(m->client, &conn_opts);
  36. return rc;
  37. }
  38. int mqtt_disconnect(mqtt_client * m)
  39. {
  40. if (!m) return -1;
  41. return MQTTClient_disconnect(m->client, 10000);
  42. }
  43. int mqtt_is_connected(mqtt_client *m)
  44. {
  45. if (!m) return 0;
  46. return MQTTClient_isConnected(m->client);
  47. }
  48. void mqtt_yield(void)
  49. {
  50. MQTTClient_yield();
  51. }
  52. int mqtt_set_timeout(mqtt_client *m, int timeout)
  53. {
  54. if (!m) return -1;
  55. m->timeout = timeout;
  56. return MQTT_SUCCESS;
  57. }
  58. static int internal_callback_message_arrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
  59. {
  60. mqtt_client *m;
  61. m = (mqtt_client *)context;
  62. if (!m) return -1;
  63. if (m->on_message_arrived == NULL) return -1;
  64. if ( topicName[topicLen] != 0 )
  65. topicName[topicLen] = 0;
  66. return m->on_message_arrived(m, topicName, message->payload, message->payloadlen);
  67. }
  68. static void internal_callback_connectionLost(void *context, char *cause)
  69. {
  70. return;
  71. }
  72. void internal_callback_delivery_complete(void *context, MQTTClient_deliveryToken dt)
  73. {
  74. return;
  75. }
  76. int mqtt_set_callback_message_arrived(mqtt_client *m, CALLBACK_MESSAGE_ARRIVED * function)
  77. {
  78. int ret;
  79. if (!m) return -1;
  80. m->on_message_arrived = function;
  81. ret = MQTTClient_setCallbacks(m->client, m,
  82. internal_callback_connectionLost, //MQTTClient_connectionLost * cl,
  83. internal_callback_message_arrived, //MQTTClient_messageArrived * ma,
  84. internal_callback_delivery_complete //MQTTClient_deliveryComplete * dc
  85. );
  86. return ret;
  87. }
  88. int mqtt_subscribe(mqtt_client *m, char *topic, int Qos)
  89. {
  90. if (!m) return -1;
  91. return MQTTClient_subscribe (m->client, topic, Qos);
  92. }
  93. int mqtt_unsubscribe(mqtt_client *m, char *topic)
  94. {
  95. if (!m) return -1;
  96. return MQTTClient_unsubscribe (m->client, topic);
  97. }
  98. int mqtt_delete(mqtt_client *m)
  99. {
  100. if (!m) return -1;
  101. MQTTClient_destroy(&(m->client));
  102. return 0;
  103. }
  104. int mqtt_publish_data(mqtt_client * m, char *topic, void *data, int length, int Qos)
  105. {
  106. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  107. MQTTClient_deliveryToken token = -1;
  108. int rc;
  109. if (!m) return -1;
  110. pubmsg.payload = data;
  111. pubmsg.payloadlen = length;
  112. pubmsg.qos = Qos;
  113. pubmsg.retained = 0;
  114. rc = MQTTClient_publishMessage(m->client, topic, &pubmsg, &token);
  115. if ( rc != MQTTCLIENT_SUCCESS )
  116. return rc;
  117. if ( m->timeout > 0 ) {
  118. rc = MQTTClient_waitForCompletion(m->client, token, m->timeout);
  119. if ( rc != MQTTCLIENT_SUCCESS )
  120. return rc;
  121. else
  122. return token;
  123. }
  124. return token;
  125. }
  126. int mqtt_publish(mqtt_client * m, char *topic, char *message, int Qos)
  127. {
  128. return mqtt_publish_data(m, topic, message, strlen(message), Qos);
  129. }
  130. static void mqtt_clear_received(mqtt_client *m)
  131. {
  132. if (!m) return;
  133. //MQTTClient_freeMessage();
  134. if ( m->received_msg != NULL ) {
  135. //free(m->received_msg->payload);
  136. //free(m->received_msg);
  137. m->received_msg = NULL;
  138. m->received_message = NULL;
  139. m->received_message_len = 0;
  140. m->received_message_id = 0;
  141. }
  142. /*
  143. if ( m->received_topic != NULL) {
  144. free(m->received_topic);
  145. m->received_topic = NULL;
  146. m->received_topic_len = 0;
  147. }
  148. */
  149. }
  150. int mqtt_receive(mqtt_client *m, unsigned long timeout)
  151. {
  152. int rc;
  153. if (!m) return -1;
  154. mqtt_clear_received(m);
  155. rc = MQTTClient_receive(m->client, &(m->received_topic),
  156. &(m->received_topic_len), &(m->received_msg), timeout);
  157. if ( rc == MQTTCLIENT_SUCCESS || rc == MQTTCLIENT_TOPICNAME_TRUNCATED ) {
  158. if ( m->received_msg == NULL) {
  159. rc = -1;
  160. } else {
  161. rc = MQTTCLIENT_SUCCESS;
  162. if ( m->received_topic[m->received_topic_len] != 0 )
  163. m->received_topic[m->received_topic_len] = 0;
  164. m->received_message = m->received_msg->payload;
  165. m->received_message_len = m->received_msg->payloadlen;
  166. m->received_message_id = m->received_msg->msgid;
  167. }
  168. }
  169. return rc;
  170. }
  171. void mqtt_sleep(int milliseconds)
  172. {
  173. MQTTClient_sleep(milliseconds);
  174. }

函数命名都是根据功能来的,所以笔者也没多打注释了。

把这两个文件也放到mqtt文件夹内。


MQTT数据订阅端

废话不多说,先上代码为敬:

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <errno.h>
  4. #include <signal.h>
  5. #include "mqtt/mqtt_client.h"
  6. int running = 1;
  7. void stop_running(int sig)
  8. {
  9. signal(SIGINT, NULL);
  10. running = 0;
  11. }
  12. int main(int argc, char ** argv) {
  13. mqtt_client *m; //mqtt_client 对象指针
  14. int ret; //返回值
  15. char *host = "localhost:1883";//测试服务器
  16. char *topic = "MQTT/test"; //主题
  17. char *client_id = "fs1704";//客户端ID; 对测试服务器,可以随便写
  18. char *username = NULL;//用户名,用于验证身份。对测试服务器,无。
  19. char *password = NULL;//密码,用于验证身份。对测试服务器,无。
  20. int Qos; //Quality of Service
  21. //create new mqtt client object
  22. m = mqtt_new(host, MQTT_PORT, client_id); //创建对象,MQTT_PORT = 1883
  23. if ( m == NULL ) {
  24. printf("mqtt client create failure, return code = %d\n", errno);
  25. return 1;
  26. } else {
  27. printf("mqtt client created\n");
  28. }
  29. //connect to server
  30. ret = mqtt_connect(m, username, password); //连接服务器
  31. if (ret != MQTT_SUCCESS ) {
  32. printf("mqtt client connect failure, return code = %d\n", ret);
  33. return 1;
  34. } else {
  35. printf("mqtt client connect\n");
  36. }
  37. //subscribe
  38. Qos = QOS_EXACTLY_ONCE;
  39. ret = mqtt_subscribe(m, topic, Qos);//订阅消息
  40. printf("mqtt client subscribe %s, return code = %d\n", topic, ret);
  41. signal(SIGINT, stop_running);
  42. signal(SIGTERM, stop_running);
  43. printf("wait for message of topic: %s ...\n", topic);
  44. //loop: waiting message
  45. while (running) {
  46. int timeout = 200;
  47. if ( mqtt_receive(m, timeout) == MQTT_SUCCESS ) { //recieve message,接收消息
  48. printf("received Topic=%s, Message=%s\n", m->received_topic, m->received_message);
  49. }
  50. mqtt_sleep(200); //sleep a while
  51. }
  52. mqtt_disconnect(m); //disconnect
  53. printf("mqtt client disconnect");
  54. mqtt_delete(m); //delete mqtt client object
  55. return 0;
  56. }

可以看到,订阅消息是由mqtt_subscribe()函数完成,要订阅多个消息,只需要反复调用这个函数就行了。

要做收发信息处理,就在while(running)里面strump就行了,这个比较简单,就自己根据情况倒腾吧。


MQTT数据发布端

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <errno.h>
  4. #include "mqtt/mqtt_client.h"
  5. int main(int argc, char ** argv) {
  6. mqtt_client *m; //mqtt_client 对象指针
  7. int ret; //返回值
  8. char *host = "localhost:1883";//测试服务器
  9. char *topic = "MQTT/test"; //主题
  10. char *client_id = "fs1704";//客户端ID; 对测试服务器,可以随便写
  11. char *username = NULL;//用户名,用于验证身份。对测试服务器,无。
  12. char *password = NULL;//密码,用于验证身份。对测试服务器,无。
  13. int Qos; //Quality of Service
  14. //create new mqtt client object
  15. m = mqtt_new(host, MQTT_PORT, client_id); //创建对象,MQTT_PORT = 1883
  16. if ( m == NULL ) {
  17. printf("mqtt client create failure, return code = %d\n", errno);
  18. return 1;
  19. } else {
  20. printf("mqtt client created\n");
  21. }
  22. //connect to server
  23. ret = mqtt_connect(m, username, password); //连接服务器
  24. if (ret != MQTT_SUCCESS ) {
  25. printf("mqtt client connect failure, return code = %d\n", ret);
  26. return 1;
  27. } else {
  28. printf("mqtt client connect\n");
  29. }
  30. //publish message
  31. Qos = QOS_EXACTLY_ONCE; //Qos
  32. ret = mqtt_publish(m, topic, "This is MQTT publicer", Qos);//发布消息
  33. printf("mqtt client publish, return code = %d\n", ret);
  34. mqtt_disconnect(m); //disconnect
  35. mqtt_delete(m); //delete mqtt client object
  36. return 0;
  37. }

可以看到发布话题是通过mqtt_publish函数实现的。具体内容看上面封装的函数库吧。


测试结果

笔者通过Jetson Nano亲测成功,因此,笔者更希望读者们能自行完成这些操作,这样可能会对自身学习更好一点。

咱也就不放图了,看源码应该就能想到会产生怎样的画面了吧。


闲谈

笔者辛辛苦苦的手撕paho.MQTT库,因为我的项目内容需要用到。但是当我测试完后,要把这个库移植到我项目里面,发现。我的项目环境是C++,而这版本的是C语言。C语言下不能调用带有C++语法的函数(命名空间类型的)。因此,笔者踏上了怎么用C++来套这库的路上。

本篇文章,最终代码见(5积分,买不了吃亏,买不了上当,绝对值了):https://download.csdn.net/download/qq_25662827/77129677https://download.csdn.net/download/qq_25662827/77129677

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/722361
推荐阅读
相关标签
  

闽ICP备14008679号