赞
踩
1.1 MQTT简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是机器对机器(M2M)/物联网(IoT)连接协议。它被设计为一个极其轻量级的发布/订阅消息传输协议。对于需要较小代码占用空间和/或网络带宽非常宝贵的远程连接非常有用,是专为受限设备和低带宽、高延迟或不可靠的网络而设计。这些原则也使该协议成为新兴的“机器到机器”(M2M)或物联网(IoT)世界的连接设备,以及带宽和电池功率非常高的移动应用的理想选择。例如,它已被用于通过卫星链路与代理通信的传感器、与医疗服务提供者的拨号连接,以及一系列家庭自动化和小型设备场景。它也是移动应用的理想选择,因为它体积小,功耗低,数据包最小,并且可以有效地将信息分配给一个或多个接收器。MQTT的实现模型如下。
1.2 主要特性
MQTT协议工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
(1)使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
这一点很类似于XMPP,但是MQTT的信息冗余远小于XMPP,,因为XMPP使用XML格式文本来传递数据。
(2)对负载内容屏蔽的消息传输。
(3)使用TCP/IP提供网络连接。
主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。
(4)有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。
(5)小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。
这就是为什么在介绍里说它非常适合“在物联网领域,传感器与服务器的通信,信息的收集”,要知道嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。
(6)使用Last Will和Testament特性通知有关各方客户端异常中断的机制。
Last Will:即遗言机制,用于通知同一主题下的其他设备发送遗言的设备已经断开了连接。
Testament:遗嘱机制,功能类似于Last Will。
1.3 MQTT协议实现方式
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容
1.4 网络传输与应用消息
MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。
当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。
1.5 MQTT客户端
一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:
(1)发布其他客户端可能会订阅的信息;
(2)订阅其它客户端发布的消息;
(3)退订或删除应用程序的消息;
(4)断开与服务器连接。
1.6 MQTT服务器
MQTT服务器以称为“消息代理”(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:
(1)接受来自客户的网络连接;
(2)接受客户发布的应用信息;
(3)处理来自客户端的订阅和退订请求;
(4)向订阅的客户转发应用程序消息
1.6 MQTT协议中的订阅、主题、会话
一、订阅(Subscription)
订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。
二、会话(Session)
每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。
三、主题名(Topic Name)
连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。
四、主题筛选器(Topic Filter)
一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。
五、负载(Payload)
消息订阅者所具体接收的内容
1.7 MQTT协议中的方法
MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:
(1)Connect。等待与服务器建立连接。
(2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。
(3)Subscribe。等待完成订阅。
(4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。
(5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。
1.8 MQTT协议数据包结构
在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:
(1)固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。
(2)可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。
(3)消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。
参考博客:
https://blog.csdn.net/qq_28877125/article/details/78325003
2.1使用apt-get命令安装
1、安装服务器端
sudo apt-get install mosquitto
完成安装后,服务器就搭建好了,系统会自动运行mosquitto,默认端口为1883。
2、安装客户端
前面服务器端搭建好了,但是客户端还没有安装。这一步是可选的,如果需要在终端上测试MQTT订阅/发布的通信就需要执行这一步,这里我们也安装上去才有后续的这些测试。
sudo apt install mosquitto-clients
3、查看运行状态
sudo systemctl status mosquitto
4、重启服务器程序
查看运行进程号:ps -aux | grep mosquitto
执行命令杀死进程:kill -9 进程号
启动:mosquitto -v
-v 详细模式——启用所有日志记录类型。
关于启动参数:可以通过 --help 查看
5,测试(默认配置)
使用securecrt首先打开三个终端,
1、启动代理服务:mosquitto -v
-v 详细模式 打印调试信息
2、订阅主题:mosquitto_sub -v -t hello
-t 指定订阅的主题,主题为:hello
-v 详细模式 打印调试信息
3、发布内容:mosquitto_pub -t hello -m world
-t 指定订阅的主题,主题为:hello
-m 指定发布的消息的内容
当发布者推送消息之后,订阅者获得其订阅的主题的内容,而代理服务器控制台中会出现——连接、消息发布和心跳等调试信息。通过代理服务器的调试输出可以对MQTT协议的相关过程有所了解。
2.2 Ubuntu搭建mosquitto方式二(下载源码编译)
1.安装mosquitto所需要依赖
sudo apt-get install libssl-dev
sudo apt-get install uuid-dev
sudo apt-get install cmake
2.下载源码包
wget http://mosquitto.org/files/source/mosquitto-2.0.14.tar.gz
下载地址:https://mosquitto.org/download/
3、解压源码
tar -zxvf mosquitto-2.0.14.tar.gz
4.进入源码目录:
cd mosquitto-2.0.14/
5.编译与安装源码
make
sudo make install
6.可能遇到的问题:
【1】编译找不到openssl/ssl.h
【解决方法】——安装openssl
sudo apt-get install libssl-dev
【2】编译过程g++命令未找到:
sudo apt-get install g++
【3】编译过程找不到ares.h
sudo apt-get install libc-ares-dev
【4】编译过程找不到uuid/uuid.h
sudo apt-get install uuid-dev
【5】使用过程中找不到libmosquitto.so.1
error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory
【解决方法】——修改libmosquitto.so位置
创建链接
sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1
更新动态链接库
sudo ldconfig
参考博客:
https://blog.csdn.net/qq_33406883/article/details/107429946
https://blog.csdn.net/xukai871105/article/details/39252653
https://blog.csdn.net/lu_embedded/article/details/76305105
1.mosquitto_lib_init
int mosquitto_lib_init(void)
struct mosquitto *mosquitto_new( const char * id, bool clean_session, void * obj )
①id :用作客户端ID的字符串。如果为NULL,将生成一个随机客户端ID。如果id为NULL,clean_session必须为true。
②clean_session:设置为true以指示代理在断开连接时清除所有消息和订阅,设置为false以指示其保留它们,客户端将永远不会在断开连接时丢弃自己的传出消息。调用mosquitto_connect或mosquitto_reconnect将导致重新发送消息。使mosquitto_reinitialise将客户端重置为其原始状态。如果id参数为NULL,则必须将其设置为true。简言之:就是断开后是否保留订阅信息true/false
③obj: 用户指针,将作为参数传递给指定的任何回调,(回调参数)
ENOMEM 内存不足。
EINVAL 输入参数无效。
int mosquitto_connect( struct mosquitto * mosq, const char * host, int port, int keepalive )
①mosq : 有效的mosquitto实例,mosquitto_new()返回的mosq.
②host : 服务器ip地址
③port:服务器的端口号
④keepalive:保持连接的时间间隔, 单位秒。如果在这段时间内没有其他消息交换,则代理应该将PING消息发送到客户端的秒数。
MOSQ_ERR_SUCCESS 成功。
MOSQ_ERR_INVAL 如果输入参数无效。
MOSQ_ERR_ERRNO 如果系统调用返回错误。变量errno包含错误代码
int mosquitto_disconnect( struct mosquitto * mosq )
MOSQ_ERR_SUCCESS 成功。
MOSQ_ERR_INVAL 如果输入参数无效。
MOSQ_ERR_NO_CONN 如果客户端未连接到代理。
int mosquitto_publish( struct mosquitto * mosq, int * mid, const char * topic, int payloadlen, const void * payload, int qos, bool retain )
①mosq:有效的mosquitto实例,客户端
②mid:指向int的指针。如果不为NULL,则函数会将其设置为该特定消息的消息ID。然后可以将其与发布回调一起使用,以确定何时发送消息。请注意,尽管MQTT协议不对QoS = 0的消息使用消息ID,但libmosquitto为其分配了消息ID,以便可以使用此参数对其进行跟踪。
③topic:要发布的主题,以null结尾的字符串
④payloadlen:有效负载的大小(字节),有效值在0到268,435,455之间;主题消息的内容长度
⑤payload: 主题消息的内容,指向要发送的数据的指针,如果payloadlen >0,则它必须时有效的存储位置。
⑥qos:整数值0、1、2指示要用于消息的服务质量。
⑦retain:设置为true以保留消息。
MOSQ_ERR_SUCCESS 成功。
MOSQ_ERR_INVAL 如果输入参数无效。
MOSQ_ERR_NOMEM 如果发生内存不足的情况
MOSQ_ERR_NO_CONN 如果客户端未连接到代理。
MOSQ_ERR_PROTOCOL 与代理进行通信时是否存在协议错误。
MOSQ_ERR_PAYLOAD_SIZE 如果payloadlen太大。
MOSQ_ERR_MALFORMED_UTF8 如果主题无效,则为UTF-8
MOSQ_ERR_QOS_NOT_SUPPORTED 如果QoS大于代理支持的QoS。
MOSQ_ERR_OVERSIZE_PACKET 如果结果包大于代理支持的包。
int mosquitto_subscribe( struct mosquitto * mosq, int * mid, const char * sub, int qos )
①mosq:有效的mosquitto实例,客户端
②mid: 指向int的指针。如果不为NULL,则函数会将其设置为该特定消息的消息ID。然后可以将其与订阅回调一起使用,以确定何时发送消息。;主题的消息ID
③sub: 主题名称,订阅模式。
④qos : 此订阅请求的服务质量。
MOSQ_ERR_SUCCESS 成功。
MOSQ_ERR_INVAL 如果输入参数无效。
MOSQ_ERR_NOMEM 如果发生内存不足的情况。
MOSQ_ERR_NO_CONN 如果客户端未连接到代理。
MOSQ_ERR_MALFORMED_UTF8 如果主题无效,则为UTF-8
MOSQ_ERR_OVERSIZE_PACKET 如果结果包大于代理支持的包。
参考博客:
https://blog.csdn.net/weixin_53361650/article/details/116954595
https://mosquitto.org/api/files/mosquitto-h.html
订阅端
/********************************************************************************* * Copyright: (C) 2022 hubeiwuhan * All rights reserved. * * Filename: mosquitto_sub.c * Description: This file * * Version: 1.0.0(24/01/22) * Author: yanp <2405204881@qq.com> * ChangeLog: 1, Release initial version on "24/01/22 07:12:01" * ********************************************************************************/ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include "mosquitto.h" #define HOST "localhost" #define PORT 1883 #define KEEP_ALIVE 60 #define MSG_MAX_SIZE 512 static int running =1; void my_connect_callback(struct mosquitto *mosq, void *obj, int rc) { printf("Call the function: on_connect\n"); if(rc) { printf("on_connect error!\n"); exit(1); } else { if(mosquitto_subscribe(mosq, NULL, "topic", 2)) { printf("Set the topic error!\n"); exit(1); } } } void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) { printf("Call the function: my_disconnect_callback\n"); running = 0; } void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) { printf("Call the function: on_subscribe\n"); } void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) { printf("Call the function: on_message\n"); printf("Recieve a message of %s : %s\n", (char *)msg->topic, (char *)msg->payload); if(0 == strcmp(msg->payload, "quit")) { mosquitto_disconnect(mosq); } } int main() { int ret; struct mosquitto *mosq; ret = mosquitto_lib_init(); if(ret) { printf("Init lib error!\n"); return -1; } mosq = mosquitto_new("sub_test", true, NULL); if(mosq == NULL) { printf("New sub_test error!\n"); mosquitto_lib_cleanup(); return -1; } printf("creat a sub_er success!\n"); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); mosquitto_message_callback_set(mosq, my_message_callback); ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE); if(ret) { printf("Connect server error!\n"); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return -1; } printf("Start!\n"); while(running) { mosquitto_loop(mosq, -1, 1); } mosquitto_destroy(mosq); mosquitto_lib_cleanup(); printf("End!\n"); }
发布端
/********************************************************************************* * Copyright: (C) 2022 hubeiwuhan * All rights reserved. * * Filename: mosquitto_temp.c * Description: This file * * Version: 1.0.0(23/01/22) * Author: yanp <2405204881@qq.com> * ChangeLog: 1, Release initial version on "23/01/22 07:05:45" * ********************************************************************************/ #include <stdio.h> #include <errno.h> #include <string.h> #include <signal.h> #include <time.h> #include <unistd.h> #include <getopt.h> #include <sys/types.h> #include <sys/stat.h> #include <netdb.h> #include <stdlib.h> #include <fcntl.h> #include <libgen.h> #include <netinet/in.h> #include <mosquitto.h> #include <sys/socket.h> #include <arpa/inet.h> #include <dirent.h> #include <mosquitto.h> #define BUF_SIZE 1024 #define ALIVE_Time 60 #define hostname "localhost" int running=1; void printf_usage(char *program); int ds18b20_get_temper(float * temp); void sig_handler(int SIG_NUM); int get_time(char *tim); int main(int argc,char **argv) { int daemon_run=0; int port; int opt; char *topic = NULL; char tim[32]; float temper; char buf[512]; char tem[32]; char *user="yanpan"; struct mosquitto *mosq = NULL; int mid; char *program = basename(argv[0]); struct option long_options[] = { {"port",required_argument, NULL, 'p'}, {"help",no_argument, NULL,'h'}, {NULL, 0, NULL, 0} }; while ((opt = getopt_long(argc, argv, "p:h", long_options, NULL)) != -1) { switch (opt) { case 'p': port = atoi(optarg); break; case 'h': printf_usage(argv[0]); break; default: break; } } if(!port) { printf_usage(program) ; return 0 ; } signal(SIGUSR1, sig_handler); while(running) { if(ds18b20_get_temper(&temper)<0) { printf("ds18b20_get_temper() failed\n") ; return -1; } sprintf(tem,"\ttemperature:%5.3f",temper); get_time(tim); memset(buf,0,sizeof(buf)); snprintf(buf,sizeof(buf),"%s%s%s",user,tim,tem); printf("%s\n",buf); mosquitto_lib_init(); mosq = mosquitto_new("pub_test", true, NULL) ; if(mosq == NULL) { printf("New sub_test error!\n"); mosquitto_lib_cleanup(); return -1; } printf("Create mosquitto sucessfully!\n"); if(mosquitto_connect(mosq,hostname,port,ALIVE_Time)!= MOSQ_ERR_SUCCESS) { printf("Mosq_Connect() failed: %s\n", strerror(errno) ); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return -2; } printf("Connect %s:%d Sucessfully!\n", hostname, port); int loop = mosquitto_loop_start(mosq); if(loop != MOSQ_ERR_SUCCESS) { printf("mosquitto loop error\n"); mosquitto_destroy(mosq) ; mosquitto_lib_cleanup() ; return -3; } if( mosquitto_publish(mosq,&mid,"topic",strlen(buf),buf,0,0) != MOSQ_ERR_SUCCESS ) { printf("Mosq_Publish() error: %s\n", strerror(errno)); mosquitto_destroy(mosq) ; mosquitto_lib_cleanup() ; return -4; } else printf("Publish information of temperature Ok!\n") ; sleep(5); } } void printf_usage(char *program) { printf("使用方法:%s【选项】 \n", program); printf("\n传入参数\n"); printf(" -p[port ] 指定连接的端口号\n"); printf(" -h[help ] 打印帮助信息\n"); printf("\n例如: %s -b -p 8900\n", program); return; } int ds18b20_get_temper(float * temp) { char w1_path[128]="/sys/bus/w1/devices/"; char f_name[64]; char buff[128]; char *data_p=NULL; struct dirent *file=NULL; DIR *dir=NULL; int data_fd; int found = -1; if((dir=opendir(w1_path))<0) { printf("open w1_path failure:%s\n",strerror(errno)); return -1; } while((file=readdir(dir))!=NULL) { if(strstr(file->d_name,"28-")) { strncpy(f_name,file->d_name,sizeof(f_name)); found=1; } } closedir(dir); if(!found) { printf("can not found the folder\n"); return 0; } strncat(w1_path, f_name, sizeof(w1_path)-strlen(w1_path)); strncat(w1_path, "/w1_slave", sizeof(w1_path)-strlen(w1_path)); //printf("folder path is %s\n",w1_path); data_fd=open(w1_path,O_RDONLY); if(data_fd<0) { printf("open file failure: %s\n",strerror(errno)); return -2; } memset(buff,0,sizeof(buff)); if(read(data_fd,buff,sizeof(buff))<0) { printf("read temperature from w1_path failure:%s\n",strerror(errno)); return -3; } data_p=strstr(buff, "t="); data_p=data_p+2; if(!data_p) { printf("can't get temperature :%s\n",strerror(errno)); return -4; } *temp=atof(data_p)/1000.0; close(data_fd); return 0; } void sig_handler(int SIG_NUM) { if(SIG_NUM == SIGUSR1) running = 0 ; } int get_time(char *tim) { time_t time_val,time_mk; struct tm *time_gm,*time_local; time(&time_val); time_gm=gmtime(&time_val); sprintf(tim,"\tdate:%04d/%02d/%02d,time:%02d:%02d:%02d",time_gm->tm_year+1900,time_gm->tm_mon+1,time_gm->tm_mday,time_gm->tm_hour,time_gm->tm_min,time_gm->tm_sec); return 0; }
对发布端和接收端编译
gcc mosquitto_sub.c -o mosquitto_sub -l mosquitto
gcc mosquitto_temp.c -l mosquitto
然后打开mosquitto服务器
mosquitto -v
发布温度(每5秒发布一次)
订阅端接收
参考博客:
https://blog.csdn.net/caijiwyj/article/details/86671211
https://blog.csdn.net/qq_45125250/article/details/110390880
https://blog.csdn.net/qq_33406883/article/details/107466430
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。