当前位置:   article > 正文

Liunx下用C++实现MQTT的接收与发送_c++ mqtt demo

c++ mqtt demo

继续上次《Liunx下用C语言实现MQTT的接收与发送(上)(下)》,本人探寻了如何基于paho.mqtt.c库开发C++版本的MQTT。事实上,paho官网有paho.mqtt.c++库,不过笔者进去后玩不怎么转,可能是笔者不太灵光。


环境准备

笔者忽然想到,在《Liunx下用C语言实现MQTT的接收与发送(上)》中,好像安装了些mqtt的动态库,他们是啥情况呢?

本人推断是封装了,在src里面,被我之前排除的库文件。简单测试调用了一下,还真是。

因此,本人又开始封装了。(这里的脑回路太漫长了,几句话也说不清楚,这MQTT任务倒腾了我三天半终于结束了)。


封装mqtt的C++版本库

mqtt.h:

  1. #ifndef MQTT_H
  2. #define MQTT_H
  3. #include <MQTTAsync.h>
  4. #include <string>
  5. using namespace std;
  6. class MQTT {
  7. public:
  8. bool useSSL;
  9. //实例 ID,购买后从控制台获取
  10. string instanceId;
  11. //接入点域名,从控制台获取
  12. string host;
  13. //客户端使用的 GroupID,从控制台申请
  14. string groupId;
  15. //客户端 ClientID 的后缀,由业务自行指定,只需要保证全局唯一即可
  16. string deviceId;
  17. //账号 AccessKey,从账号控制台获取
  18. string accessKey;
  19. //账号 SecretKey,从账号控制台获取
  20. string secretKey;
  21. //使用的协议端口,默认 tcp 协议使用1883,如果需要使用 SSL 加密,端口设置成8883,具体协议和端口参考文档链接https://help.aliyun.com/document_detail/44867.html?spm=a2c4g.11186623.6.547.38d81cf7XRnP0C
  22. int port = 1883;
  23. //测试收发消息的 Topic
  24. string topic;
  25. int (*messageArrived)(void *context, char *topicName, int topicLen, MQTTAsync_message *m);
  26. void (*onConnectFailure)(void *context, MQTTAsync_failureData *response);
  27. void (*onSubcribe)(void *context, MQTTAsync_successData *response);
  28. void (*onConnectServer)(void *context, MQTTAsync_successData *response);
  29. void (*onConnectClient)(void *context, MQTTAsync_successData *response);
  30. void (*onDisconnect)(void *context, MQTTAsync_successData *response);
  31. void (*onPublishFailure)(void *context, MQTTAsync_failureData *response);
  32. void (*onPublish)(void *context, MQTTAsync_successData *response);
  33. void (*connectionLost)(void *context, char *cause);
  34. MQTT();
  35. void init(void);
  36. int subscript_connect(void);
  37. int subscript_disconnect(void);
  38. int publish_connect(void);
  39. int publish_send(char *msg);
  40. int publish_disconnect(void);
  41. MQTTAsync client;
  42. char clientIdUrl[64];
  43. char passWord[100];
  44. char userName[128];
  45. int connected;
  46. };
  47. #endif

mqtt.cpp:

  1. #include <signal.h>
  2. #include <memory.h>
  3. #include <stdlib.h>
  4. #if defined(WIN32)
  5. #define sleep Sleep
  6. #else
  7. #include <unistd.h>
  8. #include <openssl/hmac.h>
  9. #include <openssl/bio.h>
  10. #endif
  11. #include "mqtt.h"
  12. MQTT::MQTT()
  13. {
  14. instanceId = "mqtt-cn-oew1w1cq50a";
  15. host = "localhost";// ip地址
  16. groupId = "GID_DEVICE";
  17. deviceId = "fs1704";
  18. accessKey = "LTAI4G6gf4KQpMTmMEsaf6oF";
  19. secretKey = "pwAzZb7DFMjju3TYI5lmcShw2nlZUs";
  20. topic = "MQTT/test";
  21. port = 1883;
  22. connected = 0;
  23. useSSL = false;
  24. }
  25. void MQTT::init(void)
  26. {
  27. char tempData[100];
  28. int len = 0;
  29. printf("mqtt:init:topic:%s:deviceId:%s\n", topic.c_str(), deviceId.c_str());
  30. //ClientID要求使用 GroupId 和 DeviceId 拼接而成,长度不得超过64个字符
  31. snprintf(clientIdUrl, 64, "%s@@@%s", groupId.c_str(), deviceId.c_str());
  32. //username和 Password 签名模式下的设置方法,参考文档 https://help.aliyun.com/document_detail/48271.html?spm=a2c4g.11186623.6.553.217831c3BSFry7
  33. HMAC(EVP_sha1(), (unsigned char *)secretKey.c_str(), strlen(secretKey.c_str()), (const unsigned char *)clientIdUrl, strlen(clientIdUrl), (unsigned char *)tempData, (unsigned int *)&len);
  34. int passWordLen = EVP_EncodeBlock((unsigned char *) passWord, (const unsigned char *)tempData, len);
  35. passWord[passWordLen] = '\0';
  36. printf("passWord is %s\n", passWord);
  37. snprintf(userName, 128, "Signature|%s|%s", accessKey.c_str(), instanceId.c_str());
  38. }
  39. int MQTT::subscript_connect(void)
  40. {
  41. int cleanSession = 1;
  42. int rc = 0;
  43. //1.create client
  44. MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
  45. create_opts.sendWhileDisconnected = 0;
  46. create_opts.maxBufferedMessages = 10;
  47. char url[100];
  48. if (useSSL) {
  49. snprintf(url, 100, "ssl://%s:%d", host.c_str(), port);
  50. } else {
  51. snprintf(url, 100, "tcp://%s:%d", host.c_str(), port);
  52. }
  53. rc = MQTTAsync_createWithOptions(&client, url, clientIdUrl, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
  54. rc = MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, NULL);
  55. //2.connect to server
  56. MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  57. conn_opts.MQTTVersion = MQTTVERSION_3_1_1;
  58. conn_opts.keepAliveInterval = 60;
  59. conn_opts.cleansession = cleanSession;
  60. conn_opts.username = userName;
  61. conn_opts.password = passWord;
  62. conn_opts.onSuccess = onConnectServer;
  63. conn_opts.onFailure = onConnectFailure;
  64. conn_opts.context = client;
  65. //如果需要使用 SSL 加密
  66. if (useSSL) {
  67. MQTTAsync_SSLOptions ssl =MQTTAsync_SSLOptions_initializer;
  68. conn_opts.ssl = &ssl;
  69. } else {
  70. conn_opts.ssl = NULL;
  71. }
  72. conn_opts.automaticReconnect = 1;
  73. conn_opts.connectTimeout = 3;
  74. if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
  75. printf("Failed to start connect, return code %d\n", rc);
  76. return EXIT_FAILURE;
  77. }
  78. return EXIT_SUCCESS;
  79. }
  80. int MQTT::subscript_disconnect(void)
  81. {
  82. int rc = 0;
  83. MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
  84. disc_opts.onSuccess = onDisconnect;
  85. if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) {
  86. printf("Failed to start disconnect, return code %d\n", rc);
  87. return EXIT_FAILURE;
  88. }
  89. while (connected)
  90. sleep(1);
  91. MQTTAsync_destroy(&client);
  92. return EXIT_SUCCESS;
  93. }
  94. int MQTT::publish_connect(void)
  95. {
  96. int cleanSession = 1;
  97. int rc = 0;
  98. //1.create client
  99. MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
  100. create_opts.sendWhileDisconnected = 0;
  101. create_opts.maxBufferedMessages = 10;
  102. char url[100];
  103. if (useSSL) {
  104. snprintf(url, 100, "ssl://%s:%d", host.c_str(), port);
  105. } else {
  106. snprintf(url, 100, "tcp://%s:%d", host.c_str(), port);
  107. }
  108. rc = MQTTAsync_createWithOptions(&client, url, clientIdUrl, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
  109. rc = MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, NULL);
  110. //2.connect to server
  111. MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  112. conn_opts.MQTTVersion = MQTTVERSION_3_1_1;
  113. conn_opts.keepAliveInterval = 60;
  114. conn_opts.cleansession = cleanSession;
  115. conn_opts.username = userName;
  116. conn_opts.password = passWord;
  117. conn_opts.onSuccess = onConnectClient;
  118. conn_opts.onFailure = onConnectFailure;
  119. conn_opts.context = client;
  120. //如果需要使用 SSL 加密
  121. if (useSSL) {
  122. MQTTAsync_SSLOptions ssl =MQTTAsync_SSLOptions_initializer;
  123. conn_opts.ssl = &ssl;
  124. } else {
  125. conn_opts.ssl = NULL;
  126. }
  127. conn_opts.automaticReconnect = 1;
  128. conn_opts.connectTimeout = 3;
  129. if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
  130. printf("Failed to start connect, return code %d\n", rc);
  131. return EXIT_FAILURE;
  132. }
  133. return EXIT_SUCCESS;
  134. }
  135. int MQTT::publish_send(char *msg)
  136. {
  137. int rc = 0;
  138. //3.publish msg
  139. MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
  140. pub_opts.onSuccess = onPublish;
  141. pub_opts.onFailure = onPublishFailure;
  142. rc = MQTTAsync_send(client, topic.c_str(), strlen(msg), msg, 0, 0, &pub_opts);
  143. if(rc != MQTTASYNC_SUCCESS) {
  144. printf("mqtt:publish:failed:%s\n", msg);
  145. return EXIT_FAILURE;
  146. } else {
  147. printf("mqtt:publish:success:%s\n", msg);
  148. return EXIT_SUCCESS;
  149. }
  150. }
  151. int MQTT::publish_disconnect(void)
  152. {
  153. int rc = 0;
  154. MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
  155. disc_opts.onSuccess = onDisconnect;
  156. if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) {
  157. printf("Failed to start disconnect, return code %d\n", rc);
  158. return EXIT_FAILURE;
  159. }
  160. while (connected)
  161. sleep(1);
  162. MQTTAsync_destroy(&client);
  163. return EXIT_SUCCESS;
  164. }

可以看到,代码中出现了#include <MQTTAsync.h>,这个库在哪里呢?

其实它就在动态库里/usr/local/lib下面的,具体是哪个我忘了,没记录。

为了方便管理,我也把这两个文件放到mqtt文件夹里。


MQTT数据订阅端

  1. #include <iostream>
  2. #include <stdlib.h>
  3. #include <stdio.h>
  4. #include <unistd.h>
  5. #include "mqtt.h"
  6. MQTT mqttServer;
  7. using namespace std;
  8. int messageArrivedServer(void *context, char *topicName, int topicLen, MQTTAsync_message *m)
  9. {
  10. printf("recv message from %s ,body is %s\n", topicName, (char *) m->payload);
  11. // TODO::在此处增加接收内容解析程序
  12. MQTTAsync_freeMessage(&m);
  13. MQTTAsync_free(topicName);
  14. return 1;
  15. }
  16. void onConnectFailureServer(void *context, MQTTAsync_failureData *response)
  17. {
  18. if(mqttServer.connected)
  19. mqttServer.connected = 0;
  20. printf("mqtt:connect failed, rc %d\n", response ? response->code : -1);
  21. MQTTAsync client = (MQTTAsync) context;
  22. }
  23. void onSubcribeServer(void *context, MQTTAsync_successData *response)
  24. {
  25. printf("subscribe success\n");
  26. }
  27. void onConnectServer(void *context, MQTTAsync_successData *response)
  28. {
  29. if(!mqttServer.connected)
  30. mqttServer.connected = 1;
  31. //连接成功的回调,只会在第一次 connect 成功后调用,后续自动重连成功时并不会调用,因此应用需要自行保证每次 connect 成功后重新订阅
  32. printf("mqtt:server:connect success\n");
  33. MQTTAsync client = (MQTTAsync) context;
  34. //do sub when connect success
  35. MQTTAsync_responseOptions sub_opts = MQTTAsync_responseOptions_initializer;
  36. sub_opts.onSuccess = mqttServer.onSubcribe;
  37. int rc = 0;
  38. if ((rc = MQTTAsync_subscribe(client, mqttServer.topic.c_str(), 1, &sub_opts)) != MQTTASYNC_SUCCESS) {
  39. printf("Failed to subscribe, return code %d\n", rc);
  40. }
  41. }
  42. void onDisconnectServer(void *context, MQTTAsync_successData *response)
  43. {
  44. if(mqttServer.connected)
  45. mqttServer.connected = 0;
  46. printf("mqtt:connect disconnect\n");
  47. }
  48. void onPublishFailureServer(void *context, MQTTAsync_failureData *response)
  49. {
  50. printf("Publish failed, rc %d\n", response ? -1 : response->code);
  51. }
  52. void connectionLostServer(void *context, char *cause)
  53. {
  54. if(mqttServer.connected)
  55. mqttServer.connected = 0;
  56. printf("mqtt:server:connection lost\n");
  57. }
  58. int main()
  59. {
  60. mqttServer.topic = "MQTT/#";
  61. mqttServer.deviceId = "fs1704";//mac地址; id不能相同
  62. mqttServer.messageArrived = messageArrivedServer;
  63. mqttServer.onConnectFailure = onConnectFailureServer;
  64. mqttServer.onSubcribe = onSubcribeServer;
  65. mqttServer.onConnectServer = onConnectServer;
  66. mqttServer.onDisconnect = onDisconnectServer;
  67. mqttServer.onPublishFailure = onPublishFailureServer;
  68. mqttServer.connectionLost = connectionLostServer;
  69. mqttServer.init();
  70. while(true) {
  71. if(mqttServer.subscript_connect()) {
  72. sleep(1);
  73. continue;
  74. }
  75. break;
  76. }
  77. while(true) {
  78. sleep(2);
  79. }
  80. mqttServer.subscript_disconnect();
  81. return 0;
  82. }

可以看到,接收内容处理是在int messageArrivedServer(void *context, char *topicName, int topicLen, MQTTAsync_message *m)函数的,我也在此处打了TODO。

订阅话题是在void onConnectServer(void *context, MQTTAsync_successData *response)函数完成。


MQTT数据发布端

  1. #include <iostream>
  2. #include <stdlib.h>
  3. #include <stdio.h>
  4. #include <unistd.h>
  5. #include "mqtt.h"
  6. MQTT mqttClient;
  7. using namespace std;
  8. void onConnectFailureClient(void *context, MQTTAsync_failureData *response)
  9. {
  10. mqttClient.connected = 0;
  11. printf("mqtt:connect failed, rc %d\n", response ? response->code : -1);
  12. MQTTAsync client = (MQTTAsync) context;
  13. }
  14. void onConnectClient(void *context, MQTTAsync_successData *response)
  15. {
  16. mqttClient.connected = 1;
  17. //连接成功的回调,只会在第一次 connect 成功后调用,后续自动重连成功时并不会调用,因此应用需要自行保证每次 connect 成功后重新订阅
  18. printf("mqtt:client:connect success\n");
  19. }
  20. void onDisconnectClient(void *context, MQTTAsync_successData *response)
  21. {
  22. mqttClient.connected = 0;
  23. printf("mqtt:connect disconnect\n");
  24. }
  25. void onPublishFailureClient(void *context, MQTTAsync_failureData *response)
  26. {
  27. printf("Publish failed, rc %d\n", response ? -1 : response->code);
  28. }
  29. void onPublishClient(void *context, MQTTAsync_successData *response)
  30. {
  31. printf("mqtt:publish:send success\n");
  32. }
  33. void connectionLostClient(void *context, char *cause)
  34. {
  35. mqttClient.connected = 0;
  36. printf("mqtt:client:connection lost\n");
  37. }
  38. int main()
  39. {
  40. char msg[1024];
  41. mqttClient.topic = "MQTT/test";
  42. mqttClient.deviceId = "fs0504";//mac地址; id不能相同
  43. mqttClient.onConnectFailure = onConnectFailureClient;
  44. mqttClient.onConnectClient = onConnectClient;
  45. mqttClient.onDisconnect = onDisconnectClient;
  46. mqttClient.onPublishFailure = onPublishFailureClient;
  47. mqttClient.onPublish = onPublishClient;
  48. mqttClient.connectionLost = connectionLostClient;
  49. mqttClient.init();
  50. while(true) {
  51. if(mqttClient.publish_connect()) {
  52. sleep(1);
  53. continue;
  54. }
  55. while(true) {
  56. sprintf(msg, "hello!! this is mqtt demo\n");
  57. mqttClient.publish_send(msg);
  58. sleep(1);
  59. }
  60. mqttClient.publish_disconnect();
  61. }
  62. return 0;
  63. }

mqttClient.publish_send(msg);即实现了内容的发布。通常MQTT作为组件存在时,并不是一直处于运行状态的,还是要用到他发布功能的时候就连接上,用完或者不要用的时候就要 mqttClient.publish_disconnect();


测试结果

笔者已经把实现的Demo上传到资源了,读者可以自行测试。可以直接作为组件库使用。

本人的测试环境是Jetson Nano B01下搭载英伟达官方镜像Ubuntu20.04.

注意一定要先,安装paho.mqtt.c库哦,安装流程请移步到

Liunx下用C语言实现MQTT的接收与发送(上)_星羽空间-CSDN博客Liunx下用C语言实现MQTT的接收与发送https://blog.csdn.net/qq_25662827/article/details/122567741


本篇文章,最终代码见(5积分,买不了吃亏,买不了上当,绝对值了):

https://download.csdn.net/download/qq_25662827/77133536icon-default.png?t=M0H8https://download.csdn.net/download/qq_25662827/77133536

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号