赞
踩
陈拓 2021/04/16-2021/04/16
在《用C语言实现mosquitto MQTT订阅消息》
https://zhuanlan.zhihu.com/p/365190438
https://blog.csdn.net/chentuo2000/article/details/115747492
一文中我们用C语言实现了mosquitto MQTT同步订阅消息。mosquitto的同步函数是以阻塞方式工作的,也就是订阅程序一直等待接收消息,阻塞了其他程序的运行,效率很低。
下面我们用异步mosquitto的函数实现MQTT消息订阅,异步是非阻塞的方式,比同步方式性能更好。
该函数连接MQTT代理。这是一个非阻塞调用。如果使用mosquitto_connect_async,则客户端必须使用线程接口mosquitto_loop_start。
注意:如果代码中直接使用mosquitto_loop循环,则必须使用同步mosquitto_connect函数连接代理。
例如用下面的循环代替mosquitto_loop_forever函数时
while(running) {
mosquitto_loop(mosq, -1, 1);
}
此函数开启一个新线程,在线程里循环调用 mosquitto_loop。
而同步循环函数mosquitto_loop_forever在无限阻塞循环中调用mosquitto_loop。
mosquitto_loop是客户端的主循环函数,必须经常调用它以保持客户机和代理之间的通信正常工作。mosquitto_loop_forever和mosquitto_loop_start都是通过调用mosquitto_loop来实现的。
你也可以直接使用此函数,但不能在回调中调用它。
例如上面用循环mosquitto_loop代替mosquitto_loop_forever函数的例子。
- #include <mosquitto.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
-
-
- // 定义运行标志决定是否需要结束
- static int running = 1;
-
- // 当客户端从代理接收到CONNACK消息时调用回调
- void on_connect(struct mosquitto *mosq, void *obj, int reason_code)
- {
- int rc;
-
- printf("on_connect: %s\n", mosquitto_connack_string(reason_code));
- if(reason_code != 0){
- mosquitto_disconnect(mosq);
- }
-
- rc = mosquitto_subscribe(mosq, NULL, "example/temperature", 1);
- if(rc != MOSQ_ERR_SUCCESS){
- fprintf(stderr, "Error subscribing: %s\n", mosquitto_strerror(rc));
- mosquitto_disconnect(mosq);
- }
- }
-
- // 当代理在响应订阅发送SUBACK时调用回调
- void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
- {
- int i;
- bool have_subscription = false;
-
- for(i=0; i<qos_count; i++){
- printf("on_subscribe: %d:granted qos = %d\n", i, granted_qos[i]);
- if(granted_qos[i] <= 2){
- have_subscription = true;
- }
- }
- if(have_subscription == false){
- fprintf(stderr, "Error: All subscriptions rejected.\n");
- mosquitto_disconnect(mosq);
- }
- }
-
- // 当客户端收到消息时调用回调该函数
- void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
- {
- // 打印有效载荷
- printf("%s %d %s\n", msg->topic, msg->qos, (char *)msg->payload);
- }
-
- // 当断开连接时调用回调该函数
- void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
- {
- printf("Call the function: my_disconnect_callback\n");
- running = 0;
- }
-
- int main(int argc, char *argv[])
- {
- struct mosquitto *mosq;
- int rc;
-
- // 初始化mosquitto库
- mosquitto_lib_init();
-
- // 创建新的客户端实例。
- mosq = mosquitto_new(NULL, true, NULL);
- if(mosq == NULL){
- fprintf(stderr, "Error: Out of memory.\n");
- return 1;
- }
-
- // 配置回调函数
- mosquitto_connect_callback_set(mosq, on_connect);
- mosquitto_subscribe_callback_set(mosq, on_subscribe);
- mosquitto_message_callback_set(mosq, on_message);
- mosquitto_disconnect_callback_set(mosq, on_disconnect);
-
- // 连接服务器
- mosquitto_username_pw_set(mosq, "ct", "1qaz2wsx");
- rc = mosquitto_connect_async (mosq, "raspberrypi", 1883, 60);
- if(rc != MOSQ_ERR_SUCCESS){
- mosquitto_destroy(mosq);
- fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
- return 1;
- }
-
- // 异步循环
- rc = mosquitto_loop_start(mosq);
- if(rc != MOSQ_ERR_SUCCESS)
- {
- mosquitto_destroy(mosq);
- fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
- return 1;
- }
-
- // 开始循环
- printf("Start!\n");
- while(running)
- {
- sleep(1);
- }
-
- // 结束后的清理工作
- mosquitto_destroy(mosq);
- mosquitto_lib_cleanup();
- printf("End!\n");
-
- return 0;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
cd mosquitto-2.0.9/examples/subscribe/
cp basic-1.c basic-1.c-bak
nano basic-1.c
用重写的C程序替换原来的程序。
gcc -o basic-1 basic-1.c -lmosquitto
./basic-1
从rc = mosquitto_subscribe(mosq, NULL, "example/temperature", 1);可知,订阅的消息主题为example/temperature。
mosquitto_pub -p 1883 -u ct -P xxxxxxxx -t example/temperature -m "26.6"
详细说明见《树莓派MQTT服务远程测试MQTT.fx》
https://zhuanlan.zhihu.com/p/363373024
https://blog.csdn.net/chentuo2000/article/details/115539377
点击Publish:
订阅测试窗口收到消息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。