赞
踩
继续上次《Liunx下用C语言实现MQTT的接收与发送(上)(下)》,本人探寻了如何基于paho.mqtt.c库开发C++版本的MQTT。事实上,paho官网有paho.mqtt.c++库,不过笔者进去后玩不怎么转,可能是笔者不太灵光。
笔者忽然想到,在《Liunx下用C语言实现MQTT的接收与发送(上)》中,好像安装了些mqtt的动态库,他们是啥情况呢?
本人推断是封装了,在src里面,被我之前排除的库文件。简单测试调用了一下,还真是。
因此,本人又开始封装了。(这里的脑回路太漫长了,几句话也说不清楚,这MQTT任务倒腾了我三天半终于结束了)。
mqtt.h:
- #ifndef MQTT_H
- #define MQTT_H
-
- #include <MQTTAsync.h>
- #include <string>
-
- using namespace std;
-
- class MQTT {
- public:
- bool useSSL;
- //实例 ID,购买后从控制台获取
- string instanceId;
- //接入点域名,从控制台获取
- string host;
- //客户端使用的 GroupID,从控制台申请
- string groupId;
- //客户端 ClientID 的后缀,由业务自行指定,只需要保证全局唯一即可
- string deviceId;
- //账号 AccessKey,从账号控制台获取
- string accessKey;
- //账号 SecretKey,从账号控制台获取
- string secretKey;
- //使用的协议端口,默认 tcp 协议使用1883,如果需要使用 SSL 加密,端口设置成8883,具体协议和端口参考文档链接https://help.aliyun.com/document_detail/44867.html?spm=a2c4g.11186623.6.547.38d81cf7XRnP0C
- int port = 1883;
- //测试收发消息的 Topic
- string topic;
-
- int (*messageArrived)(void *context, char *topicName, int topicLen, MQTTAsync_message *m);
- void (*onConnectFailure)(void *context, MQTTAsync_failureData *response);
- void (*onSubcribe)(void *context, MQTTAsync_successData *response);
- void (*onConnectServer)(void *context, MQTTAsync_successData *response);
- void (*onConnectClient)(void *context, MQTTAsync_successData *response);
- void (*onDisconnect)(void *context, MQTTAsync_successData *response);
- void (*onPublishFailure)(void *context, MQTTAsync_failureData *response);
- void (*onPublish)(void *context, MQTTAsync_successData *response);
- void (*connectionLost)(void *context, char *cause);
-
- MQTT();
-
- void init(void);
- int subscript_connect(void);
- int subscript_disconnect(void);
- int publish_connect(void);
- int publish_send(char *msg);
- int publish_disconnect(void);
-
- MQTTAsync client;
- char clientIdUrl[64];
- char passWord[100];
- char userName[128];
-
- int connected;
- };
-
- #endif
mqtt.cpp:
- #include <signal.h>
- #include <memory.h>
- #include <stdlib.h>
-
- #if defined(WIN32)
- #define sleep Sleep
- #else
-
- #include <unistd.h>
- #include <openssl/hmac.h>
- #include <openssl/bio.h>
-
- #endif
-
- #include "mqtt.h"
-
- MQTT::MQTT()
- {
- instanceId = "mqtt-cn-oew1w1cq50a";
- host = "localhost";// ip地址
- groupId = "GID_DEVICE";
- deviceId = "fs1704";
- accessKey = "LTAI4G6gf4KQpMTmMEsaf6oF";
- secretKey = "pwAzZb7DFMjju3TYI5lmcShw2nlZUs";
- topic = "MQTT/test";
- port = 1883;
- connected = 0;
- useSSL = false;
- }
-
- void MQTT::init(void)
- {
- char tempData[100];
- int len = 0;
- printf("mqtt:init:topic:%s:deviceId:%s\n", topic.c_str(), deviceId.c_str());
- //ClientID要求使用 GroupId 和 DeviceId 拼接而成,长度不得超过64个字符
- snprintf(clientIdUrl, 64, "%s@@@%s", groupId.c_str(), deviceId.c_str());
- //username和 Password 签名模式下的设置方法,参考文档 https://help.aliyun.com/document_detail/48271.html?spm=a2c4g.11186623.6.553.217831c3BSFry7
- HMAC(EVP_sha1(), (unsigned char *)secretKey.c_str(), strlen(secretKey.c_str()), (const unsigned char *)clientIdUrl, strlen(clientIdUrl), (unsigned char *)tempData, (unsigned int *)&len);
- int passWordLen = EVP_EncodeBlock((unsigned char *) passWord, (const unsigned char *)tempData, len);
- passWord[passWordLen] = '\0';
- printf("passWord is %s\n", passWord);
- snprintf(userName, 128, "Signature|%s|%s", accessKey.c_str(), instanceId.c_str());
- }
-
- int MQTT::subscript_connect(void)
- {
- int cleanSession = 1;
- int rc = 0;
- //1.create client
- MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
- create_opts.sendWhileDisconnected = 0;
- create_opts.maxBufferedMessages = 10;
- char url[100];
- if (useSSL) {
- snprintf(url, 100, "ssl://%s:%d", host.c_str(), port);
- } else {
- snprintf(url, 100, "tcp://%s:%d", host.c_str(), port);
- }
- rc = MQTTAsync_createWithOptions(&client, url, clientIdUrl, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
- rc = MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, NULL);
- //2.connect to server
- MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
- conn_opts.MQTTVersion = MQTTVERSION_3_1_1;
- conn_opts.keepAliveInterval = 60;
- conn_opts.cleansession = cleanSession;
- conn_opts.username = userName;
- conn_opts.password = passWord;
- conn_opts.onSuccess = onConnectServer;
- conn_opts.onFailure = onConnectFailure;
- conn_opts.context = client;
- //如果需要使用 SSL 加密
- if (useSSL) {
- MQTTAsync_SSLOptions ssl =MQTTAsync_SSLOptions_initializer;
- conn_opts.ssl = &ssl;
- } else {
- conn_opts.ssl = NULL;
- }
- conn_opts.automaticReconnect = 1;
- conn_opts.connectTimeout = 3;
- if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
- printf("Failed to start connect, return code %d\n", rc);
- return EXIT_FAILURE;
- }
- return EXIT_SUCCESS;
- }
-
- int MQTT::subscript_disconnect(void)
- {
- int rc = 0;
- MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
- disc_opts.onSuccess = onDisconnect;
- if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) {
- printf("Failed to start disconnect, return code %d\n", rc);
- return EXIT_FAILURE;
- }
- while (connected)
- sleep(1);
- MQTTAsync_destroy(&client);
- return EXIT_SUCCESS;
- }
-
- int MQTT::publish_connect(void)
- {
- int cleanSession = 1;
- int rc = 0;
- //1.create client
- MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
- create_opts.sendWhileDisconnected = 0;
- create_opts.maxBufferedMessages = 10;
- char url[100];
- if (useSSL) {
- snprintf(url, 100, "ssl://%s:%d", host.c_str(), port);
- } else {
- snprintf(url, 100, "tcp://%s:%d", host.c_str(), port);
- }
- rc = MQTTAsync_createWithOptions(&client, url, clientIdUrl, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
- rc = MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, NULL);
- //2.connect to server
- MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
- conn_opts.MQTTVersion = MQTTVERSION_3_1_1;
- conn_opts.keepAliveInterval = 60;
- conn_opts.cleansession = cleanSession;
- conn_opts.username = userName;
- conn_opts.password = passWord;
- conn_opts.onSuccess = onConnectClient;
- conn_opts.onFailure = onConnectFailure;
- conn_opts.context = client;
- //如果需要使用 SSL 加密
- if (useSSL) {
- MQTTAsync_SSLOptions ssl =MQTTAsync_SSLOptions_initializer;
- conn_opts.ssl = &ssl;
- } else {
- conn_opts.ssl = NULL;
- }
- conn_opts.automaticReconnect = 1;
- conn_opts.connectTimeout = 3;
- if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
- printf("Failed to start connect, return code %d\n", rc);
- return EXIT_FAILURE;
- }
- return EXIT_SUCCESS;
- }
-
- int MQTT::publish_send(char *msg)
- {
- int rc = 0;
-
- //3.publish msg
- MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
- pub_opts.onSuccess = onPublish;
- pub_opts.onFailure = onPublishFailure;
- rc = MQTTAsync_send(client, topic.c_str(), strlen(msg), msg, 0, 0, &pub_opts);
- if(rc != MQTTASYNC_SUCCESS) {
- printf("mqtt:publish:failed:%s\n", msg);
- return EXIT_FAILURE;
- } else {
- printf("mqtt:publish:success:%s\n", msg);
- return EXIT_SUCCESS;
- }
- }
-
- int MQTT::publish_disconnect(void)
- {
- int rc = 0;
- MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
- disc_opts.onSuccess = onDisconnect;
- if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) {
- printf("Failed to start disconnect, return code %d\n", rc);
- return EXIT_FAILURE;
- }
- while (connected)
- sleep(1);
- MQTTAsync_destroy(&client);
- return EXIT_SUCCESS;
- }
可以看到,代码中出现了#include <MQTTAsync.h>,这个库在哪里呢?
其实它就在动态库里/usr/local/lib下面的,具体是哪个我忘了,没记录。
为了方便管理,我也把这两个文件放到mqtt文件夹里。
- #include <iostream>
- #include <stdlib.h>
- #include <stdio.h>
- #include <unistd.h>
-
- #include "mqtt.h"
-
- MQTT mqttServer;
-
- using namespace std;
-
- int messageArrivedServer(void *context, char *topicName, int topicLen, MQTTAsync_message *m)
- {
- printf("recv message from %s ,body is %s\n", topicName, (char *) m->payload);
-
- // TODO::在此处增加接收内容解析程序
-
-
- MQTTAsync_freeMessage(&m);
- MQTTAsync_free(topicName);
- return 1;
- }
-
- void onConnectFailureServer(void *context, MQTTAsync_failureData *response)
- {
- if(mqttServer.connected)
- mqttServer.connected = 0;
-
- printf("mqtt:connect failed, rc %d\n", response ? response->code : -1);
- MQTTAsync client = (MQTTAsync) context;
- }
-
- void onSubcribeServer(void *context, MQTTAsync_successData *response)
- {
- printf("subscribe success\n");
- }
-
- void onConnectServer(void *context, MQTTAsync_successData *response)
- {
- if(!mqttServer.connected)
- mqttServer.connected = 1;
-
- //连接成功的回调,只会在第一次 connect 成功后调用,后续自动重连成功时并不会调用,因此应用需要自行保证每次 connect 成功后重新订阅
- printf("mqtt:server:connect success\n");
- MQTTAsync client = (MQTTAsync) context;
- //do sub when connect success
- MQTTAsync_responseOptions sub_opts = MQTTAsync_responseOptions_initializer;
- sub_opts.onSuccess = mqttServer.onSubcribe;
- int rc = 0;
- if ((rc = MQTTAsync_subscribe(client, mqttServer.topic.c_str(), 1, &sub_opts)) != MQTTASYNC_SUCCESS) {
- printf("Failed to subscribe, return code %d\n", rc);
- }
- }
-
- void onDisconnectServer(void *context, MQTTAsync_successData *response)
- {
- if(mqttServer.connected)
- mqttServer.connected = 0;
-
- printf("mqtt:connect disconnect\n");
- }
-
- void onPublishFailureServer(void *context, MQTTAsync_failureData *response)
- {
- printf("Publish failed, rc %d\n", response ? -1 : response->code);
- }
-
- void connectionLostServer(void *context, char *cause)
- {
- if(mqttServer.connected)
- mqttServer.connected = 0;
-
- printf("mqtt:server:connection lost\n");
- }
-
- int main()
- {
- mqttServer.topic = "MQTT/#";
- mqttServer.deviceId = "fs1704";//mac地址; id不能相同
- mqttServer.messageArrived = messageArrivedServer;
- mqttServer.onConnectFailure = onConnectFailureServer;
- mqttServer.onSubcribe = onSubcribeServer;
- mqttServer.onConnectServer = onConnectServer;
- mqttServer.onDisconnect = onDisconnectServer;
- mqttServer.onPublishFailure = onPublishFailureServer;
- mqttServer.connectionLost = connectionLostServer;
-
- mqttServer.init();
-
- while(true) {
- if(mqttServer.subscript_connect()) {
- sleep(1);
- continue;
- }
- break;
- }
- while(true) {
- sleep(2);
- }
- mqttServer.subscript_disconnect();
- return 0;
- }
可以看到,接收内容处理是在int messageArrivedServer(void *context, char *topicName, int topicLen, MQTTAsync_message *m)函数的,我也在此处打了TODO。
订阅话题是在void onConnectServer(void *context, MQTTAsync_successData *response)函数完成。
- #include <iostream>
- #include <stdlib.h>
- #include <stdio.h>
- #include <unistd.h>
-
- #include "mqtt.h"
-
- MQTT mqttClient;
-
- using namespace std;
-
- void onConnectFailureClient(void *context, MQTTAsync_failureData *response)
- {
- mqttClient.connected = 0;
- printf("mqtt:connect failed, rc %d\n", response ? response->code : -1);
- MQTTAsync client = (MQTTAsync) context;
- }
-
- void onConnectClient(void *context, MQTTAsync_successData *response)
- {
- mqttClient.connected = 1;
- //连接成功的回调,只会在第一次 connect 成功后调用,后续自动重连成功时并不会调用,因此应用需要自行保证每次 connect 成功后重新订阅
- printf("mqtt:client:connect success\n");
- }
-
- void onDisconnectClient(void *context, MQTTAsync_successData *response)
- {
- mqttClient.connected = 0;
- printf("mqtt:connect disconnect\n");
- }
-
- void onPublishFailureClient(void *context, MQTTAsync_failureData *response)
- {
- printf("Publish failed, rc %d\n", response ? -1 : response->code);
- }
-
- void onPublishClient(void *context, MQTTAsync_successData *response)
- {
- printf("mqtt:publish:send success\n");
- }
-
- void connectionLostClient(void *context, char *cause)
- {
- mqttClient.connected = 0;
- printf("mqtt:client:connection lost\n");
- }
-
- int main()
- {
- char msg[1024];
-
- mqttClient.topic = "MQTT/test";
- mqttClient.deviceId = "fs0504";//mac地址; id不能相同
- mqttClient.onConnectFailure = onConnectFailureClient;
- mqttClient.onConnectClient = onConnectClient;
- mqttClient.onDisconnect = onDisconnectClient;
- mqttClient.onPublishFailure = onPublishFailureClient;
- mqttClient.onPublish = onPublishClient;
- mqttClient.connectionLost = connectionLostClient;
-
- mqttClient.init();
-
- while(true) {
- if(mqttClient.publish_connect()) {
- sleep(1);
- continue;
- }
-
- while(true) {
- sprintf(msg, "hello!! this is mqtt demo\n");
-
- mqttClient.publish_send(msg);
-
- sleep(1);
-
- }
- mqttClient.publish_disconnect();
- }
-
- return 0;
- }
mqttClient.publish_send(msg);即实现了内容的发布。通常MQTT作为组件存在时,并不是一直处于运行状态的,还是要用到他发布功能的时候就连接上,用完或者不要用的时候就要 mqttClient.publish_disconnect();
笔者已经把实现的Demo上传到资源了,读者可以自行测试。可以直接作为组件库使用。
本人的测试环境是Jetson Nano B01下搭载英伟达官方镜像Ubuntu20.04.
注意一定要先,安装paho.mqtt.c库哦,安装流程请移步到
本篇文章,最终代码见(5积分,买不了吃亏,买不了上当,绝对值了):
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。