当前位置:   article > 正文

paho mqtt 重连后无法订阅问题_paho-mqtt mqtt断网重连无法订阅

paho-mqtt mqtt断网重连无法订阅

参考代码

https://github.com/eclipse/paho.mqtt.c/blob/master/src/samples/paho_c_sub.c

方法1:

conn_opts.cleanstart = 0;

缺点:

当我把mqtt server 清除数据重新部署后,mqtt client reconnect后,无法自动订阅;

 

方法2:

 

  1. conn_opts.cleanstart = 0;
  2. ......
  3. while (1)
  4. {
  5. mysleep(1000);
  6. if (finished)
  7. {
  8. MQTTAsync_disconnect(client, &disc_opts);
  9. printf("reconnet\n");
  10. finished = 0;
  11. MQTTAsync_connect(client, &conn_opts);
  12. }
  13. }

 完整代码 paho.mqtt.c-master\src\samples\paho_c_sub.c

  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2018 IBM Corp., and others
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * http://www.eclipse.org/legal/epl-v10.html
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial contribution
  15. * Ian Craggs - fix for bug 413429 - connectionLost not called
  16. * Guilherme Maciel Ferreira - add keep alive option
  17. * Ian Craggs - add full capability
  18. *******************************************************************************/
  19. #include "MQTTAsync.h"
  20. #include "MQTTClientPersistence.h"
  21. #include "pubsub_opts.h"
  22. #include <stdio.h>
  23. #include <signal.h>
  24. #include <string.h>
  25. #include <stdlib.h>
  26. #if defined(WIN32)
  27. #include <windows.h>
  28. #define sleep Sleep
  29. #else
  30. #include <sys/time.h>
  31. #include <unistd.h>
  32. #endif
  33. #if defined(_WRS_KERNEL)
  34. #include <OsWrapper.h>
  35. #endif
  36. volatile int finished = 0;
  37. int subscribed = 0;
  38. int disconnected = 0;
  39. void mysleep(int ms)
  40. {
  41. #if defined(WIN32)
  42. Sleep(ms);
  43. #else
  44. usleep(ms * 1000);
  45. #endif
  46. }
  47. void cfinish(int sig)
  48. {
  49. signal(SIGINT, NULL);
  50. finished = 1;
  51. }
  52. struct pubsub_opts opts =
  53. {
  54. 0, 0, 0, 0, "\n", 100, /* debug/app options */
  55. NULL, NULL, 1, 0, 0, /* message options */
  56. MQTTVERSION_DEFAULT, NULL, "paho-c-sub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
  57. NULL, NULL, 0, 0, /* will options */
  58. 0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
  59. 0, {NULL, NULL}, /* MQTT V5 options */
  60. };
  61. int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
  62. {
  63. size_t delimlen = 0;
  64. if (opts.verbose)
  65. printf("%d %s\t", message->payloadlen, topicName);
  66. if (opts.delimiter)
  67. delimlen = strlen(opts.delimiter);
  68. if (opts.delimiter == NULL || (message->payloadlen > delimlen &&
  69. strncmp(opts.delimiter, &((char*)message->payload)[message->payloadlen - delimlen], delimlen) == 0))
  70. printf("%.*s", message->payloadlen, (char*)message->payload);
  71. else
  72. printf("%.*s%s", message->payloadlen, (char*)message->payload, opts.delimiter);
  73. if (message->struct_version == 1 && opts.verbose)
  74. logProperties(&message->properties);
  75. fflush(stdout);
  76. MQTTAsync_freeMessage(&message);
  77. MQTTAsync_free(topicName);
  78. return 1;
  79. }
  80. void onDisconnect(void* context, MQTTAsync_successData* response)
  81. {
  82. disconnected = 1;
  83. }
  84. void onSubscribe5(void* context, MQTTAsync_successData5* response)
  85. {
  86. subscribed = 1;
  87. }
  88. void onSubscribe(void* context, MQTTAsync_successData* response)
  89. {
  90. subscribed = 1;
  91. }
  92. void onSubscribeFailure5(void* context, MQTTAsync_failureData5* response)
  93. {
  94. if (!opts.quiet)
  95. fprintf(stderr, "Subscribe failed, rc %s reason code %s\n",
  96. MQTTAsync_strerror(response->code),
  97. MQTTReasonCode_toString(response->reasonCode));
  98. finished = 1;
  99. }
  100. void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
  101. {
  102. if (!opts.quiet)
  103. fprintf(stderr, "Subscribe failed, rc %s\n",
  104. MQTTAsync_strerror(response->code));
  105. finished = 1;
  106. }
  107. void onConnectFailure5(void* context, MQTTAsync_failureData5* response)
  108. {
  109. if (!opts.quiet)
  110. fprintf(stderr, "Connect failed, rc %s reason code %s\n",
  111. MQTTAsync_strerror(response->code),
  112. MQTTReasonCode_toString(response->reasonCode));
  113. finished = 1;
  114. }
  115. void onConnectFailure(void* context, MQTTAsync_failureData* response)
  116. {
  117. if (!opts.quiet)
  118. fprintf(stderr, "Connect failed, rc %s\n", response ? MQTTAsync_strerror(response->code) : "none");
  119. finished = 1;
  120. }
  121. void onConnect5(void* context, MQTTAsync_successData5* response)
  122. {
  123. MQTTAsync client = (MQTTAsync)context;
  124. MQTTAsync_callOptions copts = MQTTAsync_callOptions_initializer;
  125. int rc;
  126. if (opts.verbose)
  127. printf("Subscribing to topic %s with client %s at QoS %d\n", opts.topic, opts.clientid, opts.qos);
  128. copts.onSuccess5 = onSubscribe5;
  129. copts.onFailure5 = onSubscribeFailure5;
  130. copts.context = client;
  131. if ((rc = MQTTAsync_subscribe(client, opts.topic, opts.qos, &copts)) != MQTTASYNC_SUCCESS)
  132. {
  133. if (!opts.quiet)
  134. fprintf(stderr, "Failed to start subscribe, return code %s\n", MQTTAsync_strerror(rc));
  135. finished = 1;
  136. }
  137. }
  138. void onConnect(void* context, MQTTAsync_successData* response)
  139. {
  140. MQTTAsync client = (MQTTAsync)context;
  141. MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
  142. int rc;
  143. if (opts.verbose)
  144. printf("Subscribing to topic %s with client %s at QoS %d\n", opts.topic, opts.clientid, opts.qos);
  145. ropts.onSuccess = onSubscribe;
  146. ropts.onFailure = onSubscribeFailure;
  147. ropts.context = client;
  148. if ((rc = MQTTAsync_subscribe(client, opts.topic, opts.qos, &ropts)) != MQTTASYNC_SUCCESS)
  149. {
  150. if (!opts.quiet)
  151. fprintf(stderr, "Failed to start subscribe, return code %s\n", MQTTAsync_strerror(rc));
  152. finished = 1;
  153. }
  154. }
  155. MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  156. void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
  157. {
  158. fprintf(stderr, "Trace : %d, %s\n", level, message);
  159. }
  160. int main(int argc, char** argv)
  161. {
  162. MQTTAsync client;
  163. MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
  164. MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
  165. MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
  166. MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
  167. int rc = 0;
  168. char* url = NULL;
  169. const char* version = NULL;
  170. const char* program_name = "paho_c_sub";
  171. MQTTAsync_nameValue* infos = MQTTAsync_getVersionInfo();
  172. #if !defined(WIN32)
  173. struct sigaction sa;
  174. #endif
  175. if (argc < 2)
  176. usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
  177. if (getopts(argc, argv, &opts) != 0)
  178. usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
  179. if (strchr(opts.topic, '#') || strchr(opts.topic, '+'))
  180. opts.verbose = 1;
  181. if (opts.connection)
  182. url = opts.connection;
  183. else
  184. {
  185. url = malloc(100);
  186. sprintf(url, "%s:%s", opts.host, opts.port);
  187. }
  188. if (opts.verbose)
  189. printf("URL is %s\n", url);
  190. if (opts.tracelevel > 0)
  191. {
  192. MQTTAsync_setTraceCallback(trace_callback);
  193. MQTTAsync_setTraceLevel(opts.tracelevel);
  194. }
  195. if (opts.MQTTVersion >= MQTTVERSION_5)
  196. create_opts.MQTTVersion = MQTTVERSION_5;
  197. rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE,
  198. NULL, &create_opts);
  199. if (rc != MQTTASYNC_SUCCESS)
  200. {
  201. if (!opts.quiet)
  202. fprintf(stderr, "Failed to create client, return code: %s\n", MQTTAsync_strerror(rc));
  203. exit(EXIT_FAILURE);
  204. }
  205. rc = MQTTAsync_setCallbacks(client, client, NULL, messageArrived, NULL);
  206. if (rc != MQTTASYNC_SUCCESS)
  207. {
  208. if (!opts.quiet)
  209. fprintf(stderr, "Failed to set callbacks, return code: %s\n", MQTTAsync_strerror(rc));
  210. exit(EXIT_FAILURE);
  211. }
  212. #if defined(WIN32)
  213. signal(SIGINT, cfinish);
  214. signal(SIGTERM, cfinish);
  215. #else
  216. memset(&sa, 0, sizeof(struct sigaction));
  217. sa.sa_handler = cfinish;
  218. sa.sa_flags = 0;
  219. sigaction(SIGINT, &sa, NULL);
  220. sigaction(SIGTERM, &sa, NULL);
  221. #endif
  222. if (opts.MQTTVersion == MQTTVERSION_5)
  223. {
  224. MQTTAsync_connectOptions conn_opts5 = MQTTAsync_connectOptions_initializer5;
  225. conn_opts = conn_opts5;
  226. conn_opts.onSuccess5 = onConnect5;
  227. conn_opts.onFailure5 = onConnectFailure5;
  228. conn_opts.cleanstart = 1;
  229. }
  230. else
  231. {
  232. conn_opts.onSuccess = onConnect;
  233. conn_opts.onFailure = onConnectFailure;
  234. conn_opts.cleansession = 0;//conn_opts.cleansession = 1;
  235. }
  236. conn_opts.keepAliveInterval = opts.keepalive;
  237. conn_opts.username = opts.username;
  238. conn_opts.password = opts.password;
  239. conn_opts.MQTTVersion = opts.MQTTVersion;
  240. conn_opts.context = client;
  241. conn_opts.automaticReconnect = 1;
  242. if (opts.will_topic) /* will options */
  243. {
  244. will_opts.message = opts.will_payload;
  245. will_opts.topicName = opts.will_topic;
  246. will_opts.qos = opts.will_qos;
  247. will_opts.retained = opts.will_retain;
  248. conn_opts.will = &will_opts;
  249. }
  250. if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
  251. strncmp(opts.connection, "wss://", 6) == 0))
  252. {
  253. if (opts.insecure)
  254. ssl_opts.verify = 0;
  255. ssl_opts.CApath = opts.capath;
  256. ssl_opts.keyStore = opts.cert;
  257. ssl_opts.trustStore = opts.cafile;
  258. ssl_opts.privateKey = opts.key;
  259. ssl_opts.privateKeyPassword = opts.keypass;
  260. ssl_opts.enabledCipherSuites = opts.ciphers;
  261. conn_opts.ssl = &ssl_opts;
  262. }
  263. if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  264. {
  265. if (!opts.quiet)
  266. fprintf(stderr, "Failed to start connect, return code %s\n", MQTTAsync_strerror(rc));
  267. exit(EXIT_FAILURE);
  268. }
  269. while (1)
  270. {
  271. mysleep(1000);
  272. if (finished)
  273. {
  274. MQTTAsync_disconnect(client, &disc_opts);
  275. printf("reconnet\n");
  276. finished = 0;
  277. MQTTAsync_connect(client, &conn_opts);
  278. }
  279. }
  280. /*
  281. while (!subscribed)
  282. mysleep(100);
  283. if (finished)
  284. goto exit;
  285. while (!finished)
  286. mysleep(100);
  287. disc_opts.onSuccess = onDisconnect;
  288. if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
  289. {
  290. if (!opts.quiet)
  291. fprintf(stderr, "Failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc));
  292. exit(EXIT_FAILURE);
  293. }
  294. while (!disconnected)
  295. mysleep(100);
  296. */
  297. exit:
  298. MQTTAsync_destroy(&client);
  299. return EXIT_SUCCESS;
  300. }

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/176457
推荐阅读
相关标签
  

闽ICP备14008679号