赞
踩
近年来,物联网的发展如火如荼,已经渗透到我们生活的方方面面。从智能家居到工业自动化,从智慧城市到智慧农业,物联网正在以前所未有的速度改变着我们的生活。 大家现在可能已经习惯了通过手机控制家里的灯光、空调和电视,这就是物联网在智能家居领域的应用,如果在10年前看到这种设备的应用肯定觉得很牛批,而现在只要是个设备都能上云,这种家电设备的远程控制已经成了大家习以为常的配置了。而在工业领域,物联网技术可以帮助企业实现自动化生产、设备监控和预防性维护,提高生产效率和产品质量。在智慧城市建设中,物联网技术可以用于交通管理、环境监测和公共安全等方面,提升城市管理和居民生活的质量。
从物联网开始兴起的时候,各大厂家都纷纷推出了自家的IOT物联网平台。 比如: 机智云、中国移动的onenet、阿里云的IOT、百度的天工物接入、华为云的IOT、腾讯云IOT等等。 这些大厂家的物联网服务器都支持标准的MQTT协议接入,大家不用自己搭建MQTT服务器可以直接使用这些现成的服务器接入设备开发是非常的方便的。
我在这几年也写了很多物联网开发的案例,不管是、中国移动的onenet、阿里云的IOT、百度的天工物接入、华为云的IOT、腾讯云IOT 这些服务器都写了很多教程,演示设备接入平台,完成设备上云,手机APP对接,电脑程序对接,微信小程序接入,实现远程数据监测控制等等。这些案例都放在了智能家居与物联网项目实战
专栏里。 这些案例里设备实现上云的方式主要是两种方式:HTTP协议、MQTT协议方式上云。 MQTT协议是标准的物联网协议,支持双向数据传输,也就是可以上传数据到服务器,也可以接收服务器下发的控制命令完成远程控制。 我写的这些案例里硬件端联网的模块主要是用到了4G模块、ESP8266-WIFI模块、GSM模块、NBIOT模块等等,通过它们联网,让单片机设备实现上云。
这些设备中有些是支持MQTT协议的(也就是本身的固件就支持MQTT协议),有些不支持的(可能有固件支持,需要自己烧写)。 如果说固件不支持MQTT协议,但只要设备支持TCP协议,那么我们也可以自己封装MQTT协议完成与MQTT服务器之间的通信。 比如:ESP8266-WIFI模块,正常的官方默认固件中,ESP8266-WIFI是不支持MQTT协议的,如果我们不烧写固件的情况下,如何自己实现MQTT协议上云? 这篇文章就介绍,通过TCP协议自己封装MQTT协议报文,完成数据上云。 直接从0开始手撸MQTT协议报文,组合报文,完成与服务器之间的通信。
MQTT协议也是分为两种,分MQTT和MQTTS,就像HTTP协议一样也分HTTP和HTTPS,那么区别呢? 带S就是要支持SSL协议,支持认证,更加安全,那么复杂度自然就上来了。 MQTT协议的端口是1883,MQTTS的端口是8883。 当前这篇文章介绍非加密的MQTT协议,也就是1883端口。MQTTS协议也手撸不了,这玩意涉及到SSL协议,那就很复杂了,如果要用,直接使用现成的开源库就行,但本篇文章不讨论这个,后面文章再单独介绍如何实现MQTTS协议。
本篇文章的环境是在windows下,利用VS2022开发程序,使用windows下网络编程接口作为基础,封装MQTT协议连接华为云MQTT服务器,完成数据上云。
所以,大家只要有一台windows电脑,电脑上安装了VS开发环境,任何版本都可以(VS2010、VS2013、VS2015、VS2017、VS2019、VS2022等等都可以的) 跟着这篇文章进行学习,不需要其他任何硬件设备,我们现在是单纯的去学习MQTT协议。
前提呢,大家还是要懂得一点网络编程的知识,了解TCP协议,大致知道TCP协议通信的简单过程,如果网络编程知识是完全0基础,建议先看另一篇文章学习下网络编程(我博客有专门讲解网络编程相关知识的文章)。 这篇文章也会简单介绍下TCP协议和基本网络编程知识
那么接下来,我们就开始动手学习吧。
如果大家电脑已经有开发环境,这章节直接忽略。 这里贴出来为了给 完全0基础 的小伙伴学习
我这里介绍下我用的环境安装过程。 所有版本的VS都可以的。
我当前环境是在Windows下,IDE用的是地表最强IDE VS2022。
下载地址:https://visualstudio.microsoft.com/zh-hans/downloads/
因为我这里只需要用到C++和C语言编程,那么安装的时候可以自己选择需要安装的包。
安装好之后,创建项目。
如果是老手了,这章节可以直接忽略。 如果对网络编程是 0基础 的小伙伴,那么就认真看一下,了解下基本知识。
网络编程是通过使用IP地址和端口号等网络信息,使两台以上的计算机能够相互通信,按照规定的协议交换数据的编程方式。
在网络编程中,程序员使用各种协议和技术,使得不同的设备可以通过网络进行数据交换和信息共享。
要实现网络编程,程序员需要了解并掌握各种网络通信协议,比如TCP/IP协议族,包括TCP、UDP、IP等,这些协议是实现设备间通信的基础。网络编程内部涉及到数据的打包、组装、发送、接收、解析等一系列过程,以实现信息的正确传输。
在TCP/IP协议族中,TCP和UDP是位于IP协议之上的传输层协议。 在OSI模型中,传输层是第四层,负责总体数据传输和数据控制,为会话层等高三层提供可靠的传输服务,为网络层提供可靠的目的地点信息。在TCP/IP协议族中,TCP和UDP正是位于这一层的协议。
这篇文章主要介绍 TCP 和 UDP 协议 以及 使用方法。
TCP协议:
TCP(传输控制协议)是一种面向连接的、可靠的传输层协议。在传输数据之前需要先建立连接,确保数据的顺序和完整性。TCP通过三次握手建立连接,并通过确认、超时和重传机制确保数据的可靠传输。TCP采用流量控制和拥塞控制机制,以避免网络拥塞,确保数据的顺利传输。因为TCP的这些特性,通常被应用于需要高可靠性和顺序性的应用,如网页浏览、电子邮件等。
UDP协议:
UDP(用户数据报协议)是一种无连接的、不可靠的传输层协议。与TCP不同,UDP在传输数据之前不需要建立连接,直接将数据打包成数据报并发送出去。因此,UDP没有TCP的那些确认、超时和重传机制,也就不保证数据的可靠传输。UDP也没有TCP的流量控制和拥塞控制机制。因为UDP的简单性和高效性,通常被应用于实时性要求较高,但对数据可靠性要求不高的应用,如语音通话、视频直播等。
要实现TCP通信,两端必须要知道对方的IP和端口号:
(1)IP地址:TCP协议是基于IP协议进行通信的,因此需要知道对方的IP地址,才能建立连接。
(2)端口号:每个TCP连接都有一个唯一的端口号,用于标识进程和应用程序。建立连接时,需要指定本地端口号和远端端口号。
(3)应用层协议:TCP协议只提供数据传输服务,应用程序需要定义自己的应用层协议,用于解析报文和处理数据。例如,HTTP协议就是基于TCP协议的应用层协议。
在正常的TCP通信过程中,第一步需要建立连接,这个过程称为“三次握手”。建立连接时,客户端向服务器发送一个SYN包,表示请求建立连接;服务器接收到SYN包后,向客户端发送一个ACK包,表示确认收到了SYN包;最后客户端再向服务器发送一个ACK包,表示确认收到了服务器的ACK包,此时连接建立成功。建立连接后,数据传输就可以开始了。
因为当前文章是在Windows下介绍MQTT协议,要用到网络编程的知识,需要使用Windows系统提供的API完成网络编程。Windows本身就有一套原生的网络编程接口可以直接使用。 在Linux系统下也是一样,都有自己一套原生的网络编程接口。
如果没有接触这些API的小伙伴不用慌~~~。 你至少用过C语言里的printf、scanf、strlen之类的函数吧? 下面介绍的这些网络编程API函数其实和它们没什么区别,都是普通的函数,功能不一样而已。 对你来说,只是多学了几个库函数,只要了解每个函数的功能就可以调用了。
那么接下来就学习一下常用的网络编程相关的函数。
微软的官方文档地址:https://learn.microsoft.com/zh-cn/windows/win32/api/_winsock/
在Windows下进行网络编程,可以使用Winsock API(Windows Sockets API)来实现。Winsock API是Windows平台上的标准网络编程接口,提供了一系列函数和数据结构,用于创建、连接、发送和接收网络数据等操作。
下面是常用的Winsock API接口函数:
(1)WSAStartup
:初始化Winsock库,必须在使用其他Winsock函数之前调用。
(2)socket
:创建一个套接字,用于网络通信。
(3)bind
:将套接字与本地地址(IP地址和端口号)绑定。
(4)listen
:开始监听连接请求,将套接字设置为被动模式。
(5)accept
:接受客户端的连接请求,创建一个新的套接字用于与客户端通信。
(6)connect
:与远程服务器建立连接。
(7)send
:发送数据到已连接的套接字。
(8)recv
:从已连接的套接字接收数据。
(9)sendto
:发送数据到指定的目标地址。
(10)recvfrom
:从指定的地址接收数据。
(11)closesocket
:关闭套接字。
(12)getaddrinfo
:根据主机名和服务名获取地址信息。
(13)gethostbyname
:根据主机名获取主机的IP地址。
(14)gethostname
:获取本地主机名。
下面是常用的几个Winsock API函数及其函数原型和参数含义的介绍:
(1)WSAStartup
:
int WSAStartup(WORD wVersionRequested, LPWSADATA lpWSAData);
wVersionRequested
:请求的Winsock版本号。lpWSAData
:指向WSADATA结构的指针,用于接收初始化结果和相关信息。(2)socket
:
SOCKET socket(int af, int type, int protocol);
af
:地址族(Address Family),如AF_INET表示IPv4。type
:套接字类型,如SOCK_STREAM表示面向连接的TCP套接字。protocol
:指定协议。通常为0,表示根据type
自动选择合适的协议。(3)bind
:
int bind(SOCKET s, const struct sockaddr* name, int namelen);
s
:要绑定的套接字。name
:指向sockaddr结构的指针,包含要绑定的本地地址信息。namelen
:name
结构的长度。(4)listen
:
int listen(SOCKET s, int backlog);
s
:要监听的套接字。backlog
:等待连接队列的最大长度。(5)accept
:
SOCKET accept(SOCKET s, struct sockaddr* addr, int* addrlen);
s
:监听套接字。addr
:用于存储客户端地址信息的sockaddr结构。addrlen
:addr
结构的长度。(6)connect
:
int connect(SOCKET s, const struct sockaddr* name, int namelen);
s
:要连接的套接字。name
:指向目标地址信息的sockaddr结构指针。namelen
:name
结构的长度。(7)send
:
int send(SOCKET s, const char* buf, int len, int flags);
s
:要发送数据的套接字。buf
:要发送的数据缓冲区。len
:要发送的数据长度。flags
:额外选项,如MSG_DONTROUTE等。(8)recv
:
int recv(SOCKET s, char* buf, int len, int flags);
s
:要接收数据的套接字。buf
:用于存储接收数据的缓冲区。len
:要接收的数据长度。flags
:额外选项。(9)sendto
:
int sendto(SOCKET s, const char* buf, int len, int flags, const struct sockaddr* to, int tolen);
s
:要发送数据的套接字。buf
:要发送的数据缓冲区。len
:要发送的数据长度。flags
:额外选项。to
:指向目标地址信息的sockaddr结构指针。tolen
:to
结构的长度。(10)recvfrom
:
int recvfrom(SOCKET s, char* buf, int len, int flags, struct sockaddr* from, int* fromlen);
s
:要接收数据的套接字。buf
:用于存储接收数据的缓冲区。len
:要接收的数据长度。flags
:额外选项。from
:用于存储发送方地址信息的sockaddr结构指针。fromlen
:from
结构的长度。(11)closesocket
:
int closesocket(SOCKET s);
s
:要关闭的套接字。(12)getaddrinfo
:
int getaddrinfo(const char* nodename, const char* servname, const struct addrinfo* hints, struct addrinfo** res);
nodename
:目标主机名或IP地址。servname
:服务名或端口号。hints
:指向addrinfo结构的指针,提供关于地址查找的提示。res
:指向addrinfo结构链表的指针,用于接收查找结果。(13)gethostbyname
:
struct hostent* gethostbyname(const char* name);
name
:要查询的主机名。(14)gethostname
:
int gethostname(char* name, int namelen);
name
:用于接收主机名的缓冲区。namelen
:name
缓冲区的长度。上面了解了这些函数,可能不知道如何使用。 这里就写一个例子,以TCP客户端的身份去连接TCP服务器,完成数据传输。
**下面代码实现一个TCP客户端,连接到指定的服务器并完成通信。 ** 可以直接将代码贴到你的工程里,运行,体验效果。
#include <iostream> #include <winsock2.h> #include <ws2tcpip.h> #pragma comment(lib, "ws2_32.lib") //告诉编译器链接Winsock库 int main() { WSADATA wsaData; //创建一个结构体变量,用于存储关于Winsock库的信息 int result = WSAStartup(MAKEWORD(2, 2), &wsaData); //初始化Winsock库,指定版本号2.2,检查返回值 if (result != 0) { std::cout << "WSAStartup failed: " << result << std::endl; //输出错误信息并退出程序 return 1; } SOCKET connectSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //创建一个TCP套接字,检查返回值 if (connectSocket == INVALID_SOCKET) { std::cout << "socket failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序 WSACleanup(); //清除Winsock库 return 1; } sockaddr_in service; //创建一个结构体变量,用于存储服务器地址信息 service.sin_family = AF_INET; //指定地址族为IPv4 inet_pton(AF_INET, "127.0.0.1", &service.sin_addr); //将字符串类型的IP地址转换为二进制网络字节序的IP地址,并存储在结构体中 service.sin_port = htons(12345); //将端口号从主机字节序转换为网络字节序,并存储在结构体中 result = connect(connectSocket, (SOCKADDR*)&service, sizeof(service)); //连接到服务器,检查返回值 if (result == SOCKET_ERROR) { std::cout << "connect failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序 closesocket(connectSocket); //关闭套接字 WSACleanup(); //清除Winsock库 return 1; } std::cout << "Connected to server." << std::endl; //连接成功,输出消息 char sendBuffer[1024] = "Hello, server!"; //创建发送缓冲区,存储待发送的数据 result = send(connectSocket, sendBuffer, sizeof(sendBuffer), 0); //向服务器发送数据,检查返回值 if (result == SOCKET_ERROR) { std::cout << "send failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序 closesocket(connectSocket); //关闭套接字 WSACleanup(); //清除Winsock库 return 1; } char recvBuffer[1024]; //创建接收缓冲区,用于存储从服务器接收到的数据 result = recv(connectSocket, recvBuffer, sizeof(recvBuffer), 0); //从服务器接收数据,检查返回值 if (result == SOCKET_ERROR) { std::cout << "recv failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序 closesocket(connectSocket); //关闭套接字 WSACleanup(); //清除Winsock库 return 1; } std::cout << "Received message from server: " << recvBuffer << std::endl; //输出从服务器收到的数据 closesocket(connectSocket); //关闭套接字 WSACleanup(); //清除Winsock库 return 0; }
运行效果:
地址: https://www.huaweicloud.com/product/iothub.html
现在可以免费创建的,按需付费。 只要是不超过规格,就可以免费使用,对于个人项目Demo来说,完全够用的
点击立即创建之后,会开始创建实例,需要等待片刻,再刷新浏览器就可以看到创建成功了。
创建完成,点击实例,即可进入实例详情页面。
进入实例详情页面后,可以看到接入信息的描述,我们当前设备准备采用MQTT协议接入华为云平台,这里可以看到MQTT协议的地址和端口号等信息。
总结:
端口号: MQTT (1883)
接入地址: 252d4bd608.st1.iotda-device.cn-north-4.myhuaweicloud.com
**根据域名地址得到IP地址信息: ** 打开windows的CMD窗口
Microsoft Windows [版本 10.0.19045.3693] (c) Microsoft Corporation。保留所有权利。 C:\Users\11266>ping 252d4bd608.st1.iotda-device.cn-north-4.myhuaweicloud.com 正在 Ping 252d4bd608.st1.iotda-device.cn-north-4.myhuaweicloud.com [117.78.5.125] 具有 32 字节的数据: 来自 117.78.5.125 的回复: 字节=32 时间=41ms TTL=94 来自 117.78.5.125 的回复: 字节=32 时间=38ms TTL=94 来自 117.78.5.125 的回复: 字节=32 时间=37ms TTL=94 来自 117.78.5.125 的回复: 字节=32 时间=39ms TTL=94 117.78.5.125 的 Ping 统计信息: 数据包: 已发送 = 4,已接收 = 4,丢失 = 0 (0% 丢失), 往返行程的估计时间(以毫秒为单位): 最短 = 37ms,最长 = 41ms,平均 = 38ms C:\Users\11266>
MQTT协议接入端口号有两个,1883是非加密端口,8883是证书加密端口,单片机无法加载证书,所以使用1883端口比较合适。 接下来的ESP8266就采用1883端口连接华为云物联网平台。
点击产品页,再点击左上角创建产品。
根据自己产品名字填写,下面的设备类型选择自定义类型。
创建成功之后,点击产品的名字就可以进入到产品的详情页。
产品创建完成之后,点击进入产品详情页面,翻到最下面可以看到模型定义。
模型简单来说: 就是存放设备上传到云平台的数据。
先点击自定义模型。
再创建一个服务ID。
接着点击新增属性。
这里就创建一个温度的属性。我们这个设备用来测温的。
产品是属于上层的抽象模型,接下来在产品模型下添加实际的设备。添加的设备最终需要与真实的设备关联在一起,完成数据交互。
创建完毕之后,点击保存并关闭,得到创建的设备密匙信息。该信息在后续生成MQTT三元组的时候需要使用。
{
"device_id": "65697df3585c81787ad4da82_stm32",
"secret": "12345678"
}
点击设备名称可以进入到设备详情页。
描述 | 限制 |
---|---|
支持的MQTT协议版本 | 3.1.1 |
与标准MQTT协议的区别 | 支持Qos 0和Qos 1支持Topic自定义不支持QoS2不支持will、retain msg |
MQTTS支持的安全等级 | 采用TCP通道基础 + TLS协议(最高TLSv1.3版本) |
单帐号每秒最大MQTT连接请求数 | 无限制 |
单个设备每分钟支持的最大MQTT连接数 | 1 |
单个MQTT连接每秒的吞吐量,即带宽,包含直连设备和网关 | 3KB/s |
MQTT单个发布消息最大长度,超过此大小的发布请求将被直接拒绝 | 1MB |
MQTT连接心跳时间建议值 | 心跳时间限定为30至1200秒,推荐设置为120秒 |
产品是否支持自定义Topic | 支持 |
消息发布与订阅 | 设备只能对自己的Topic进行消息发布与订阅 |
每个订阅请求的最大订阅数 | 无限制 |
帮助文档地址:https://support.huaweicloud.com/devg-iothub/iot_02_2200.html
对于设备而言,一般会订阅平台下发消息给设备 这个主题。
设备想接收平台下发的消息,就需要订阅平台下发消息给设备 的主题,订阅后,平台下发消息给设备,设备就会收到消息。
如果设备想要知道平台下发的消息,需要订阅上面图片里标注的主题。
以当前设备为例,最终订阅主题的格式如下:
$oc/devices/{device_id}/sys/messages/down
最终的格式:
$oc/devices/65697df3585c81787ad4da82_stm32/sys/messages/down
对于设备来说,主题发布表示向云平台上传数据,将最新的传感器数据,设备状态上传到云平台。
这个操作称为:属性上报。
帮助文档地址:https://support.huaweicloud.com/usermanual-iothub/iot_06_v5_3010.html
根据帮助文档的介绍, 当前设备发布主题,上报属性的格式总结如下:
发布的主题格式: $oc/devices/{device_id}/sys/properties/report 最终的格式: $oc/devices/65697df3585c81787ad4da82_stm32/sys/properties/report 发布主题时,需要上传数据,这个数据格式是JSON格式。 上传的JSON数据格式如下: { "services": [ { "service_id": <填服务ID>, "properties": { "<填属性名称1>": <填属性值>, "<填属性名称2>": <填属性值>, .......... } } ] } 根据JSON格式,一次可以上传多个属性字段。 这个JSON格式里的,服务ID,属性字段名称,属性值类型,在前面创建产品的时候就已经介绍了,不记得可以翻到前面去查看。 根据这个格式,组合一次上传的属性数据: {"services": [{"service_id": "stm32","properties":{"TEMP":36.2}}]}
MQTT协议登录需要填用户ID,设备ID,设备密码等信息,就像我们平时登录QQ,微信一样要输入账号密码才能登录。MQTT协议登录的这3个参数,一般称为MQTT三元组。
接下来介绍,华为云平台的MQTT三元组参数如何得到。
要登录MQTT服务器,首先记得先知道服务器的地址是多少,端口是多少。
帮助文档地址:https://console.huaweicloud.com/iotdm/?region=cn-north-4#/dm-portal/home
MQTT协议的端口支持1883和8883,它们的区别是:8883 是加密端口更加安全。但是单片机上使用比较困难,所以当前的设备是采用1883端口进连接的。
根据上面的域名和端口号,得到下面的IP地址和端口号信息: 如果设备支持填写域名可以直接填域名,不支持就直接填写IP地址。 (IP地址就是域名解析得到的)
华为云的MQTT服务器地址:117.78.5.125
华为云的MQTT端口号:1883
华为云提供了一个在线工具,用来生成MQTT鉴权三元组: https://iot-tool.obs-website.cn-north-4.myhuaweicloud.com/
打开这个工具,填入设备的信息(也就是刚才创建完设备之后保存的信息),点击生成,就可以得到MQTT的登录信息了。
下面是打开的页面:
填入设备的信息: (上面两行就是设备创建完成之后保存得到的)
直接得到三元组信息。
得到三元组之后,设备端通过MQTT协议登录鉴权的时候,填入参数即可。
ClientId 65697df3585c81787ad4da82_stm32_0_0_2023120106
Username 65697df3585c81787ad4da82_stm32
Password 12cc9b1f637da8d755fa2cbd007bb669e6f292e3e63017538b5e6e13eef0cf58
到此,云平台的部署已经完成,设备已经可以正常上传数据了。
IP地址:117.78.5.125
端口号:1883
ClientId 65697df3585c81787ad4da82_stm32_0_0_2023120106
Username 65697df3585c81787ad4da82_stm32
Password 12cc9b1f637da8d755fa2cbd007bb669e6f292e3e63017538b5e6e13eef0cf58
订阅主题:$oc/devices/65697df3585c81787ad4da82_stm32/sys/messages/down
发布主题:$oc/devices/65697df3585c81787ad4da82_stm32/sys/properties/report
发布数据:{"services": [{"service_id": "stm32","properties":{"TEMP":36.2}}]}
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅模式的“轻量级”的消息协议,可在发布者和订阅者之间传递消息。MQTT协议构建于TCP/IP协议上,由IBM在1999年发布,当前已经成为了一种主流的物联网通信协议。
MQTT最大的优点在于,能够以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。它是一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。由于其小巧、高效和可靠的特点,MQTT在物联网领域得到了广泛的应用。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT),且已经广泛应用于通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中。
MQTT协议的工作原理是基于发布/订阅模式。在这种模式下,发布者可以向一个或多个主题发布消息,而订阅者可以订阅这些主题以接收相关消息。这种模式允许多个发布者和订阅者同时存在,实现了一种灵活的消息传递机制。此外,MQTT协议还支持三种消息传递质量等级,可根据需要进行选择。
MQTT协议的另一个重要特点是其轻量级和简单的设计。它的消息头非常小,只有2个字节,这意味着在网络带宽有限的环境下也能够实现高效的消息传递。此外,MQTT协议还支持持久化连接和消息队列等高级功能,可进一步提高消息的可靠性和传递效率。
MQTT协议的应用范围非常广泛。例如,在智能家居领域,可以使用MQTT协议将各种智能设备连接在一起,实现设备的远程控制和监测。在工业领域,MQTT协议可以用于实现设备的远程监控和维护,提高生产效率和产品质量。在智慧城市建设中,MQTT协议可以用于交通管理、环境监测和公共安全等方面,提升城市管理和居民生活的质量。
目前MQTT协议主要是3.1.1 和 5.0 两个版本。 本篇文章是介绍3.1.1版本的MQTT协议。 各大标准的MQTT服务器都支持3.1.1.
链接:https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
在文档的下方就是介绍MQTT协议的每个包如何封装的。照着协议写代码就行了。
整个MQTT协议里,主要是实现3个函数就行了(其他的接口看自己需求)。
下面列出的3个函数,在一般的MQTT通信里是必备的。我们只要实现了这3个函数,那么完成基本的MQTT通信就没有问题了。
//发布主题
unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos);
//订阅或者取消订阅主题
unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether);
//登录MQTT服务器
unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password);
【1】打开文档,打开目录,翻到第3章节—MQTT控制报文。
【2】在第3章,控制报文里,找到对应的子章节,也就是我们接线需要照着文档实现协议组包的章节。
MQTT_Connect
函数先认真阅读文档: 了解这个报文的规则,以及出现错误之后错误代码的含义。
客户端到服务端的网络连接建立后,客户端发送给服务端的第一个报文必须是 CONNECT
报文。 在一个网络连接上,客户端只能发送一次CONNECT
报文。服务端必须将客户端发送的第二个 CONNECT
报文当作协议违规处理并断开客户端的连接。
接下来就按顺序查看文档,了解协议报文里每个字节如何组成的。
接下来就开始编写代码,按照文档的提示,组合报文。
首先,定义一个数组,用来存放我们按照MQTT协议封装的数据。
unsigned char mqtt_txbuf[256];//接收数据缓存区
在定义一个变量,用来保存数组的下标,每赋值一次,就自增++;
int mqtt_txlen = 0;
文档说了,数组的第一字节固定为:0x10。 (不懂为什么是0X10,仔细看下面文档里的红色框框,文档已经把二进制位的每个位都标注出来了,如果还是看不懂,就需要补习一下C语言的位运算,熟悉位运算之后,再来看应该就很容易了)。
那么第一行代码就是这么写:
//固定报头
//控制报文类型
mqtt_txbuf[mqtt_txlen++] = 0x10; //MQTT Message Type CONNECT
接下来第2个字节,文档让看2.2.3小节的说明,那么翻到2.2.3小节,了解如何填写剩余长度值。
根据文档说明,那么编写代码如下:
//剩余长度(不包括固定头部)
do
{
unsigned char encodedByte = DataLen % 128;
DataLen = DataLen / 128;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte | 128;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
好了,现在第2个字节赋值已经完毕。
那么接着看剩下的字节如何填写:
根据文档说明,编写代码如下: (继续按顺序赋值就行了)
//可变报头
//协议名
mqtt_txbuf[mqtt_txlen++] = 0; // Protocol Name Length MSB
mqtt_txbuf[mqtt_txlen++] = 4; // Protocol Name Length LSB
mqtt_txbuf[mqtt_txlen++] = 'M'; // ASCII Code for M
mqtt_txbuf[mqtt_txlen++] = 'Q'; // ASCII Code for Q
mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T
mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T
看文档说明。
编写代码:
//协议级别
mqtt_txbuf[mqtt_txlen++] = 4; // MQTT Protocol version = 4
关于每个标志的含义,文档向下翻,下面有详细的介绍,每个标志位的含义。
编写代码:
//连接标志
mqtt_txbuf[mqtt_txlen++] = 0xc2; // conn flags
查看文档说明:
编写代码:
mqtt_txbuf[mqtt_txlen++] = 0; // Keep-alive Time Length MSB
mqtt_txbuf[mqtt_txlen++] = 100; // Keep-alive Time Length LSB 100S心跳包
这里面提到的客户端标识符、用户名、密码。 就是在前面章节创建华为云IOT服务器,得到的MQTT三元组信息。
查看文档:
编写代码:
mqtt_txbuf[mqtt_txlen++] = BYTE1(ClientIDLen);// Client ID length MSB mqtt_txbuf[mqtt_txlen++] = BYTE0(ClientIDLen);// Client ID length LSB memcpy(&mqtt_txbuf[mqtt_txlen], ClientID, ClientIDLen); mqtt_txlen += ClientIDLen; if (UsernameLen > 0) { mqtt_txbuf[mqtt_txlen++] = BYTE1(UsernameLen); //username length MSB mqtt_txbuf[mqtt_txlen++] = BYTE0(UsernameLen); //username length LSB memcpy(&mqtt_txbuf[mqtt_txlen], Username, UsernameLen); mqtt_txlen += UsernameLen; } if (PasswordLen > 0) { mqtt_txbuf[mqtt_txlen++] = BYTE1(PasswordLen); //password length MSB mqtt_txbuf[mqtt_txlen++] = BYTE0(PasswordLen); //password length LSB memcpy(&mqtt_txbuf[mqtt_txlen], Password, PasswordLen); mqtt_txlen += PasswordLen; }
上面报文封装完毕之后,直接就可以通过网络接口发送出去就行了。
前提是创建套接字,连接上MQTT服务器,然后再将上面封装好的报文发送过去就行了。
关于如何Windows下如何创建套接字,连接服务器,前面章节专门讲过了,忘记了可以回去再看看。
编写代码发送出去:
MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
这个函数里面的代码: 就是直接调用的网络接口函数发送的。
int result = send(connectSocket,(const char*)buff, len, 0); //向服务器发送数据,检查返回值
if (result == SOCKET_ERROR)
{
std::cout << "send failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序
return -1;
}
发送过去之后,服务器肯定有返回值的。 失败?成功? 那么我们得判断。
通过响应
章节的文档说明,这里提到一个CONNACK
响应报文,是由服务器下发给客户端的。
在3.2小节,讲解了CONNACK
报文的字段含义。如果客户端的连接报文是正确的,服务器会下发0x20 0x00 的确认连接报文给客户端,告诉客户端连接成功。
编写代码: 写代码接收服务器返回的数据,判断返回的数据是不是符合要求。
Client_GetData(buff);//从服务器获取数据
const unsigned char parket_connetAck[] = { 0x20,0x02};
if (mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //连接成功
{
return 0;//连接成功
}
CONNACK 报文
除了固定报文头之外还有可变报头。也就是后面还有2个字节。 我们可以继续看文档下面的介绍。
一个叫连接确认标志,一个叫连接返回码。正确的情况下,这两个值应该为0x00 0x00.
关于返回码的值,我们可以对它进行判断。如果连接失败,也可以知道具体的原因。
/* 函数功能: 登录服务器 函数返回值: 0表示成功 1表示失败 */ unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password) { unsigned short i, j; int ClientIDLen = (int)strlen(ClientID); int UsernameLen = (int)strlen(Username); int PasswordLen = (int)strlen(Password); unsigned int DataLen; mqtt_txlen = 0; unsigned int size = 0; unsigned char buff[256]; //可变报头+Payload 每个字段包含两个字节的长度标识 DataLen = 10 + (ClientIDLen + 2) + (UsernameLen + 2) + (PasswordLen + 2); //固定报头 //控制报文类型 mqtt_txbuf[mqtt_txlen++] = 0x10; //MQTT Message Type CONNECT //剩余长度(不包括固定头部) do { unsigned char encodedByte = DataLen % 128; DataLen = DataLen / 128; // if there are more data to encode, set the top bit of this byte if (DataLen > 0) encodedByte = encodedByte | 128; mqtt_txbuf[mqtt_txlen++] = encodedByte; } while (DataLen > 0); //可变报头 //协议名 mqtt_txbuf[mqtt_txlen++] = 0; // Protocol Name Length MSB mqtt_txbuf[mqtt_txlen++] = 4; // Protocol Name Length LSB mqtt_txbuf[mqtt_txlen++] = 'M'; // ASCII Code for M mqtt_txbuf[mqtt_txlen++] = 'Q'; // ASCII Code for Q mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T //协议级别 mqtt_txbuf[mqtt_txlen++] = 4; // MQTT Protocol version = 4 //连接标志 mqtt_txbuf[mqtt_txlen++] = 0xc2; // conn flags mqtt_txbuf[mqtt_txlen++] = 0; // Keep-alive Time Length MSB mqtt_txbuf[mqtt_txlen++] = 100; // Keep-alive Time Length LSB 100S心跳包 mqtt_txbuf[mqtt_txlen++] = BYTE1(ClientIDLen);// Client ID length MSB mqtt_txbuf[mqtt_txlen++] = BYTE0(ClientIDLen);// Client ID length LSB memcpy(&mqtt_txbuf[mqtt_txlen], ClientID, ClientIDLen); mqtt_txlen += ClientIDLen; if (UsernameLen > 0) { mqtt_txbuf[mqtt_txlen++] = BYTE1(UsernameLen); //username length MSB mqtt_txbuf[mqtt_txlen++] = BYTE0(UsernameLen); //username length LSB memcpy(&mqtt_txbuf[mqtt_txlen], Username, UsernameLen); mqtt_txlen += UsernameLen; } if (PasswordLen > 0) { mqtt_txbuf[mqtt_txlen++] = BYTE1(PasswordLen); //password length MSB mqtt_txbuf[mqtt_txlen++] = BYTE0(PasswordLen); //password length LSB memcpy(&mqtt_txbuf[mqtt_txlen], Password, PasswordLen); mqtt_txlen += PasswordLen; } for (i = 0; i < 5; i++) { memset(mqtt_rxbuf, 0, mqtt_rxlen); MQTT_SendBuf(mqtt_txbuf, mqtt_txlen); size = Client_GetData(buff);//从服务器获取数据 if (size <= 0)continue; memcpy(mqtt_rxbuf, buff, size); printf("登录应答:\r\n"); for (j = 0; j < size; j++) { printf("%#X ", buff[j]); } printf("\r\n"); if (mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //连接成功 { return 0;//连接成功 } } return 1; }
和前面一章节一样,看文档说明编写代码。
通过文档了解到,发布消息的固定报文头由2个字节组成。第一个字节每个位的组成含义可看文档的表格介绍。
最高4位是MQTT控制报文类型,固定的值:0x3
后面4个字节分为是DUP(重发标志)、QOS等级(服务质量等级)、RETAIN(消息的保留标志)。
DUP(重发标志):
QOS等级(服务质量等级):
RETAIN(保留标志–固定位0):
那么经过文档的解释,我们编写代码如下: (关于后面4个字节可以根据自己的需求设置)
//固定报头
//控制报文类型
mqtt_txbuf[mqtt_txlen++] = 0x30; // MQTT Message Type PUBLISH
固定报文头的第2个字节是填写剩余字段长度。
在文档里对剩余长度字段
计算的介绍:
剩余长度字段: 等于可变报头的长度加上有效载荷的长度。
这里先要了解: 什么是可变报头? 什么是有效载荷的长度?
往下翻文档,可看到可变报头
的介绍。 **可变报头是:按顺序包含主题名和报文标识符。 ** 而报文标识符只有在QOS为1或者2的时候才有。
再往下翻文档,可看到有效载荷的长度
的介绍。有效载荷包含将被发布的应用消息。数据的内容和格式是应用特定的。有效载荷的长度这样计算:用固定 报头中的剩余长度字段的值减去可变报头的长度。包含零长度有效载荷的 PUBLISH 报文是合法的。
经过文档的介绍,我们知道了剩余长度字段如何介绍。
那么这个剩余长度字段计算出来之后,如何赋值到报文数组里去? 其实在Connect
报文的固定字段第2个字节也是要天剩余长度字段,在Connect
报文的章节已经介绍过,在文档的2.2.3小节的有详细说明,那么翻到2.2.3小节,了解如何填写剩余长度值。 (其实我们上一节已经介绍了一遍)
接下来就编写代码:
unsigned int topicLength = (int)strlen(topic); //计算主题的长度 unsigned int messageLength = (int)strlen(message); //计算发送的消息长度 unsigned int DataLen; //保存最终长度 //有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度 //QOS为0时没有标识符 //数据长度 主题名 报文标识符 有效载荷 if (qos) DataLen = (2 + topicLength) + 2 + messageLength; else DataLen = (2 + topicLength) + messageLength; //剩余长度 do { unsigned char encodedByte = DataLen % 128; DataLen = DataLen / 128; // if there are more data to encode, set the top bit of this byte if (DataLen > 0) encodedByte = encodedByte | 128; mqtt_txbuf[mqtt_txlen++] = encodedByte; } while (DataLen > 0);
PUBLISH 报文可变报头非规范示例:
编写代码:
mqtt_txbuf[mqtt_txlen++] = BYTE1(topicLength);//主题长度MSB
mqtt_txbuf[mqtt_txlen++] = BYTE0(topicLength);//主题长度LSB
memcpy(&mqtt_txbuf[mqtt_txlen], topic, topicLength);//拷贝主题
mqtt_txlen += topicLength;
//报文标识符
if (qos)
{
mqtt_txbuf[mqtt_txlen++] = BYTE1(id);
mqtt_txbuf[mqtt_txlen++] = BYTE0(id);
id++;
}
接着就添加有效载荷,也就是实际发送的消息内容。
编写代码:
memcpy(&mqtt_txbuf[mqtt_txlen], message, messageLength);
mqtt_txlen += messageLength;
最后将报文发送出去:
MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
//MQTT发布数据打包函数 //topic 主题 //message 消息 //qos 消息等级 unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos) { unsigned int topicLength = (int)strlen(topic); unsigned int messageLength = (int)strlen(message); unsigned short id = 0; unsigned int DataLen; mqtt_txlen = 0; printf("上报JSON消息长度:%d\r\n", messageLength); printf("message=%s\r\n", message); //有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度 //QOS为0时没有标识符 //数据长度 主题名 报文标识符 有效载荷 if (qos) DataLen = (2 + topicLength) + 2 + messageLength; else DataLen = (2 + topicLength) + messageLength; //固定报头 //控制报文类型 mqtt_txbuf[mqtt_txlen++] = 0x30; // MQTT Message Type PUBLISH //剩余长度 do { unsigned char encodedByte = DataLen % 128; DataLen = DataLen / 128; // if there are more data to encode, set the top bit of this byte if (DataLen > 0) encodedByte = encodedByte | 128; mqtt_txbuf[mqtt_txlen++] = encodedByte; } while (DataLen > 0); mqtt_txbuf[mqtt_txlen++] = BYTE1(topicLength);//主题长度MSB mqtt_txbuf[mqtt_txlen++] = BYTE0(topicLength);//主题长度LSB memcpy(&mqtt_txbuf[mqtt_txlen], topic, topicLength);//拷贝主题 mqtt_txlen += topicLength; //报文标识符 if (qos) { mqtt_txbuf[mqtt_txlen++] = BYTE1(id); mqtt_txbuf[mqtt_txlen++] = BYTE0(id); id++; } memcpy(&mqtt_txbuf[mqtt_txlen], message, messageLength); mqtt_txlen += messageLength; MQTT_SendBuf(mqtt_txbuf, mqtt_txlen); return mqtt_txlen; }
如果消息质量大于0,那么可以继续看文档下面的发布确认,对本次发送的消息进行响应处理。
和前面一样,查看文档的说明,编写代码。
订阅主题和取消订阅主题格式一样的,只是固定报头不一样。
可以封装一个函数,传入一个参数实现两种功能。
编写判断代码:
//固定报头
//控制报文类型
if (whether)mqtt_txbuf[mqtt_txlen++] = 0x82; //消息类型和标志订阅
else mqtt_txbuf[mqtt_txlen++] = 0xA2; //取消订阅
剩余长度字段:
编写代码:
unsigned int topiclen = (int)strlen(topic);
unsigned int DataLen = 2 + (topiclen + 2) + (whether ? 1 : 0);//可变报头的长度(2字节)加上有效载荷的长度
剩余长度字段的填写规则与前面一样。
编写代码:
//剩余长度
do
{
unsigned char encodedByte = DataLen % 128;
DataLen = DataLen / 128;
// if there are more data to encode, set the top bit of this byte
if (DataLen > 0)
encodedByte = encodedByte | 128;
mqtt_txbuf[mqtt_txlen++] = encodedByte;
} while (DataLen > 0);
编写代码:
//可变报头
mqtt_txbuf[mqtt_txlen++] = 0; //消息标识符 MSB
mqtt_txbuf[mqtt_txlen++] = 0x01; //消息标识符 LSB
/* 函数功能: MQTT订阅/取消订阅数据打包函数 函数参数: topic 主题 qos 消息等级 0:最多分发一次 1: 至少分发一次 2: 仅分发一次 whether 订阅/取消订阅请求包 (1表示订阅,0表示取消订阅) 返回值: 0表示成功 1表示失败 */ unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether) { unsigned char i, j; mqtt_txlen = 0; unsigned int size = 0; unsigned char buff[256]; unsigned int topiclen = (int)strlen(topic); unsigned int DataLen = 2 + (topiclen + 2) + (whether ? 1 : 0);//可变报头的长度(2字节)加上有效载荷的长度 //固定报头 //控制报文类型 if (whether)mqtt_txbuf[mqtt_txlen++] = 0x82; //消息类型和标志订阅 else mqtt_txbuf[mqtt_txlen++] = 0xA2; //取消订阅 //剩余长度 do { unsigned char encodedByte = DataLen % 128; DataLen = DataLen / 128; // if there are more data to encode, set the top bit of this byte if (DataLen > 0) encodedByte = encodedByte | 128; mqtt_txbuf[mqtt_txlen++] = encodedByte; } while (DataLen > 0); //可变报头 mqtt_txbuf[mqtt_txlen++] = 0; //消息标识符 MSB mqtt_txbuf[mqtt_txlen++] = 0x01; //消息标识符 LSB //有效载荷 mqtt_txbuf[mqtt_txlen++] = BYTE1(topiclen);//主题长度 MSB mqtt_txbuf[mqtt_txlen++] = BYTE0(topiclen);//主题长度 LSB memcpy(&mqtt_txbuf[mqtt_txlen], topic, topiclen); mqtt_txlen += topiclen; if (whether) { mqtt_txbuf[mqtt_txlen++] = qos;//QoS级别 } for (i = 0; i < 100; i++) { memset(mqtt_rxbuf, 0, mqtt_rxlen); MQTT_SendBuf(mqtt_txbuf, mqtt_txlen); //printf("订阅消息发布成功\n"); size = Client_GetData(buff);//从服务器获取数据 if (size <= 0) { continue; } memcpy(mqtt_rxbuf, buff, size); printf("订阅应答:\r\n"); for (j = 0; j < size; j++) { printf("%#X ", buff[j]); } printf("\r\n"); if (mqtt_rxbuf[0] == parket_subAck[0] && mqtt_rxbuf[1] == parket_subAck[1]) //连接成功 { return 0;//连接成功 } Sleep(1000); } return 1; //失败 }
前面章节陆续已经编写好了重要的函数,那么这里就贴出我编写好的整体的代码,进行运行测试:
#include <stdio.h> #include <stdlib.h> #include <time.h> #pragma warning(disable:4996) #include <string.h> #include <stdio.h> #include <iostream> #include <winsock2.h> #include <ws2tcpip.h> #pragma comment(lib, "ws2_32.lib") //告诉编译器链接Winsock库 //---------------------------------------MQTT协议相关的子函数声明------------------------------------------------------- //发布主题 unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos); //订阅或者取消订阅主题 unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether); //登录MQTT服务器 unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password); //MQTT协议缓冲区初始化 void MQTT_Init(void); //调用底层接口发送数据包 void MQTT_SendBuf(unsigned char* buf, unsigned short len); //MQTT协议里最底层的接口,最底层的如果要移植协议到其他地方运行,那么改这里就行了。其他地方不用改的。 int Client_SendData(unsigned char* buff, unsigned int len);//发送数据到服务器 int Client_GetData(unsigned char* buff);//从服务器获取数据 //---------------------------------------全局变量定义-------------------------------------------------------------------- #define BYTE0(dwTemp) (*( char *)(&dwTemp)) #define BYTE1(dwTemp) (*((char *)(&dwTemp) + 1)) #define BYTE2(dwTemp) (*((char *)(&dwTemp) + 2)) #define BYTE3(dwTemp) (*((char *)(&dwTemp) + 3)) unsigned char mqtt_rxbuf[1024 * 1024];//发送数据缓存区 unsigned char mqtt_txbuf[256];//接收数据缓存区 unsigned int mqtt_rxlen; unsigned int mqtt_txlen; typedef enum { //名字 值 报文流动方向 描述 M_RESERVED1 = 0, // 禁止 保留 M_CONNECT, // 客户端到服务端 客户端请求连接服务端 M_CONNACK, // 服务端到客户端 连接报文确认 M_PUBLISH, // 两个方向都允许 发布消息 M_PUBACK, // 两个方向都允许 QoS 1消息发布收到确认 M_PUBREC, // 两个方向都允许 发布收到(保证交付第一步) M_PUBREL, // 两个方向都允许 发布释放(保证交付第二步) M_PUBCOMP, // 两个方向都允许 QoS 2消息发布完成(保证交互第三步) M_SUBSCRIBE, // 客户端到服务端 客户端订阅请求 M_SUBACK, // 服务端到客户端 订阅请求报文确认 M_UNSUBSCRIBE, // 客户端到服务端 客户端取消订阅请求 M_UNSUBACK, // 服务端到客户端 取消订阅报文确认 M_PINGREQ, // 客户端到服务端 心跳请求 M_PINGRESP, // 服务端到客户端 心跳响应 M_DISCONNECT, // 客户端到服务端 客户端断开连接 M_RESERVED2, // 禁止 保留 }_typdef_mqtt_message; //连接成功服务器回应 20 02 00 00 //客户端主动断开连接 e0 00 const unsigned char parket_connetAck[] = { 0x20,0x02,0x00,0x00 }; const unsigned char parket_disconnet[] = { 0xe0,0x00 }; const unsigned char parket_heart[] = { 0xc0,0x00 }; const unsigned char parket_heart_reply[] = { 0xc0,0x00 }; const unsigned char parket_subAck[] = { 0x90,0x03 }; void MQTT_Init(void) { //缓冲区赋值 mqtt_rxlen = sizeof(mqtt_rxbuf); mqtt_txlen = sizeof(mqtt_txbuf); memset(mqtt_rxbuf, 0, mqtt_rxlen); memset(mqtt_txbuf, 0, mqtt_txlen); } /* 函数功能: 登录服务器 函数返回值: 0表示成功 1表示失败 */ unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password) { unsigned short i, j; int ClientIDLen = (int)strlen(ClientID); int UsernameLen = (int)strlen(Username); int PasswordLen = (int)strlen(Password); unsigned int DataLen; mqtt_txlen = 0; unsigned int size = 0; unsigned char buff[256]; //可变报头+Payload 每个字段包含两个字节的长度标识 DataLen = 10 + (ClientIDLen + 2) + (UsernameLen + 2) + (PasswordLen + 2); //固定报头 //控制报文类型 mqtt_txbuf[mqtt_txlen++] = 0x10; //MQTT Message Type CONNECT //剩余长度(不包括固定头部) do { unsigned char encodedByte = DataLen % 128; DataLen = DataLen / 128; // if there are more data to encode, set the top bit of this byte if (DataLen > 0) encodedByte = encodedByte | 128; mqtt_txbuf[mqtt_txlen++] = encodedByte; } while (DataLen > 0); //可变报头 //协议名 mqtt_txbuf[mqtt_txlen++] = 0; // Protocol Name Length MSB mqtt_txbuf[mqtt_txlen++] = 4; // Protocol Name Length LSB mqtt_txbuf[mqtt_txlen++] = 'M'; // ASCII Code for M mqtt_txbuf[mqtt_txlen++] = 'Q'; // ASCII Code for Q mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T mqtt_txbuf[mqtt_txlen++] = 'T'; // ASCII Code for T //协议级别 mqtt_txbuf[mqtt_txlen++] = 4; // MQTT Protocol version = 4 //连接标志 mqtt_txbuf[mqtt_txlen++] = 0xc2; // conn flags mqtt_txbuf[mqtt_txlen++] = 0; // Keep-alive Time Length MSB mqtt_txbuf[mqtt_txlen++] = 100; // Keep-alive Time Length LSB 100S心跳包 mqtt_txbuf[mqtt_txlen++] = BYTE1(ClientIDLen);// Client ID length MSB mqtt_txbuf[mqtt_txlen++] = BYTE0(ClientIDLen);// Client ID length LSB memcpy(&mqtt_txbuf[mqtt_txlen], ClientID, ClientIDLen); mqtt_txlen += ClientIDLen; if (UsernameLen > 0) { mqtt_txbuf[mqtt_txlen++] = BYTE1(UsernameLen); //username length MSB mqtt_txbuf[mqtt_txlen++] = BYTE0(UsernameLen); //username length LSB memcpy(&mqtt_txbuf[mqtt_txlen], Username, UsernameLen); mqtt_txlen += UsernameLen; } if (PasswordLen > 0) { mqtt_txbuf[mqtt_txlen++] = BYTE1(PasswordLen); //password length MSB mqtt_txbuf[mqtt_txlen++] = BYTE0(PasswordLen); //password length LSB memcpy(&mqtt_txbuf[mqtt_txlen], Password, PasswordLen); mqtt_txlen += PasswordLen; } for (i = 0; i < 5; i++) { memset(mqtt_rxbuf, 0, mqtt_rxlen); MQTT_SendBuf(mqtt_txbuf, mqtt_txlen); size = Client_GetData(buff);//从服务器获取数据 if (size <= 0)continue; memcpy(mqtt_rxbuf, buff, size); printf("登录应答:\r\n"); for (j = 0; j < size; j++) { printf("%#X ", buff[j]); } printf("\r\n"); if (mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //连接成功 { return 0;//连接成功 } } return 1; } /* 函数功能: MQTT订阅/取消订阅数据打包函数 函数参数: topic 主题 qos 消息等级 0:最多分发一次 1: 至少分发一次 2: 仅分发一次 whether 订阅/取消订阅请求包 (1表示订阅,0表示取消订阅) 返回值: 0表示成功 1表示失败 */ unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether) { unsigned char i, j; mqtt_txlen = 0; unsigned int size = 0; unsigned char buff[256]; unsigned int topiclen = (int)strlen(topic); unsigned int DataLen = 2 + (topiclen + 2) + (whether ? 1 : 0);//可变报头的长度(2字节)加上有效载荷的长度 //固定报头 //控制报文类型 if (whether)mqtt_txbuf[mqtt_txlen++] = 0x82; //消息类型和标志订阅 else mqtt_txbuf[mqtt_txlen++] = 0xA2; //取消订阅 //剩余长度 do { unsigned char encodedByte = DataLen % 128; DataLen = DataLen / 128; // if there are more data to encode, set the top bit of this byte if (DataLen > 0) encodedByte = encodedByte | 128; mqtt_txbuf[mqtt_txlen++] = encodedByte; } while (DataLen > 0); //可变报头 mqtt_txbuf[mqtt_txlen++] = 0; //消息标识符 MSB mqtt_txbuf[mqtt_txlen++] = 0x01; //消息标识符 LSB //有效载荷 mqtt_txbuf[mqtt_txlen++] = BYTE1(topiclen);//主题长度 MSB mqtt_txbuf[mqtt_txlen++] = BYTE0(topiclen);//主题长度 LSB memcpy(&mqtt_txbuf[mqtt_txlen], topic, topiclen); mqtt_txlen += topiclen; if (whether) { mqtt_txbuf[mqtt_txlen++] = qos;//QoS级别 } for (i = 0; i < 100; i++) { memset(mqtt_rxbuf, 0, mqtt_rxlen); MQTT_SendBuf(mqtt_txbuf, mqtt_txlen); //printf("订阅消息发布成功\n"); size = Client_GetData(buff);//从服务器获取数据 if (size <= 0) { continue; } memcpy(mqtt_rxbuf, buff, size); printf("订阅应答:\r\n"); for (j = 0; j < size; j++) { printf("%#X ", buff[j]); } printf("\r\n"); if (mqtt_rxbuf[0] == parket_subAck[0] && mqtt_rxbuf[1] == parket_subAck[1]) //连接成功 { return 0;//连接成功 } Sleep(1000); } return 1; //失败 } //MQTT发布数据打包函数 //topic 主题 //message 消息 //qos 消息等级 unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos) { unsigned int topicLength = (int)strlen(topic); unsigned int messageLength = (int)strlen(message); unsigned short id = 0; unsigned int DataLen; mqtt_txlen = 0; printf("上报JSON消息长度:%d\r\n", messageLength); printf("message=%s\r\n", message); //有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度 //QOS为0时没有标识符 //数据长度 主题名 报文标识符 有效载荷 if (qos) DataLen = (2 + topicLength) + 2 + messageLength; else DataLen = (2 + topicLength) + messageLength; //固定报头 //控制报文类型 mqtt_txbuf[mqtt_txlen++] = 0x30; // MQTT Message Type PUBLISH //剩余长度 do { unsigned char encodedByte = DataLen % 128; DataLen = DataLen / 128; // if there are more data to encode, set the top bit of this byte if (DataLen > 0) encodedByte = encodedByte | 128; mqtt_txbuf[mqtt_txlen++] = encodedByte; } while (DataLen > 0); mqtt_txbuf[mqtt_txlen++] = BYTE1(topicLength);//主题长度MSB mqtt_txbuf[mqtt_txlen++] = BYTE0(topicLength);//主题长度LSB memcpy(&mqtt_txbuf[mqtt_txlen], topic, topicLength);//拷贝主题 mqtt_txlen += topicLength; //报文标识符 if (qos) { mqtt_txbuf[mqtt_txlen++] = BYTE1(id); mqtt_txbuf[mqtt_txlen++] = BYTE0(id); id++; } memcpy(&mqtt_txbuf[mqtt_txlen], message, messageLength); mqtt_txlen += messageLength; MQTT_SendBuf(mqtt_txbuf, mqtt_txlen); return mqtt_txlen; } void MQTT_SendBuf(unsigned char* buf, unsigned short len) { Client_SendData(buf, len);//发送数据到服务器 } //-----------------------------------------MQTT服务器的参数------------------------------------------------------------ //服务器IP #define SERVER_IP "117.78.5.125" #define SERVER_PORT 1883 //端口号 //MQTT三元组 #define ClientID "65697df3585c81787ad4da82_stm32_0_0_2023120106" #define Username "65697df3585c81787ad4da82_stm32" #define Password "12cc9b1f637da8d755fa2cbd007bb669e6f292e3e63017538b5e6e13eef0cf58"//密文 //订阅主题: #define SET_TOPIC "$oc/devices/65697df3585c81787ad4da82_stm32/sys/messages/down"//订阅 //发布主题: #define POST_TOPIC "$oc/devices/65697df3585c81787ad4da82_stm32/sys/properties/report"//发布 //-----------------------------------------主函数------------------------------------------------------------ char mqtt_message[1024];//数据缓存区 SOCKET connectSocket; //网络套接字 WSADATA wsaData; //创建一个结构体变量,用于存储关于Winsock库的信息 double TEMP = 10.0; int main() { int result = WSAStartup(MAKEWORD(2, 2), &wsaData); //初始化Winsock库,指定版本号2.2,检查返回值 if (result != 0) { printf("WSAStartup failed: %d\r\n", result);//输出错误信息并退出程序 return 1; } connectSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //创建一个TCP套接字,检查返回值 if (connectSocket == INVALID_SOCKET) { printf("socket failed with error: %d", WSAGetLastError());//输出错误信息并退出程序 WSACleanup(); //清除Winsock库 return 1; } sockaddr_in service; //创建一个结构体变量,用于存储服务器地址信息 service.sin_family = AF_INET; //指定地址族为IPv4 inet_pton(AF_INET, SERVER_IP, &service.sin_addr); //将字符串类型的IP地址转换为二进制网络字节序的IP地址,并存储在结构体中 service.sin_port = htons(SERVER_PORT); //将端口号从主机字节序转换为网络字节序,并存储在结构体中 result = connect(connectSocket, (SOCKADDR*)&service, sizeof(service)); //连接到服务器,检查返回值 if (result == SOCKET_ERROR) { std::cout << "connect failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序 closesocket(connectSocket); //关闭套接字 WSACleanup(); //清除Winsock库 return 1; } std::cout << "Connected to server." << std::endl; //连接成功,输出消息 MQTT_Init(); while (1) { /*登录服务器*/ if (MQTT_Connect((char*)ClientID, (char*)Username, (char*)Password) == 0) { break; } // 延时1000毫秒,即1秒 Sleep(1000); printf("MQTT服务器登录校验中....\n"); } printf("连接成功_666\r\n"); //订阅物联网平台数据 int stat = MQTT_SubscribeTopic((char*)SET_TOPIC, 1, 1); if (stat) { printf("订阅失败\r\n"); closesocket(connectSocket); //关闭套接字 WSACleanup(); //清除Winsock库 return 1; } printf("订阅成功\r\n"); /*创建线程*/ while (1) { sprintf(mqtt_message, "{\"services\": [{\"service_id\": \"stm32\",\"properties\":{\"TEMP\":%.1f}}]}", (double)(TEMP+=0.2));//温度 //发布主题 MQTT_PublishData((char*)POST_TOPIC, mqtt_message, 0); printf("发布消息成功\r\n"); Sleep(5000); } } /*发送数据到服务器*/ int Client_SendData(unsigned char* buff, unsigned int len) { int result = send(connectSocket,(const char*)buff, len, 0); //向服务器发送数据,检查返回值 if (result == SOCKET_ERROR) { std::cout << "send failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序 return -1; } return 0; } /*获取服务器下发数据*/ int Client_GetData(unsigned char* buff) { int result = recv(connectSocket, (char*)buff,200, 0); //从服务器接收数据,检查返回值 if (result == SOCKET_ERROR) { std::cout << "recv failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序 return -1; } return result; }
这里填写MQTT服务器的信息,也就是前面创建华为云IOT服务器得到的信息。
这里是主函数,登录服务器后订阅主题,发布消息。
按下Ctrl+F5
运行代码。 弹出控制台窗口之后,可以看到,我们已经连接了华为云MQTT服务器,并且完成数据上传。
可以看到设备已经在线了。
可以看到我们的消息也在实时的上传。
到此,说明我们的MQTT协议已经封装完成,可以正常的运行了。
一般MQTT设备端除了上传数据以外,还需要接收MQTT服务器下发的控制命令。
那么我们接下来就完善一下代码,接收华为云MQTT服务器下发的命令,并进行回应。
要测试命令下发,那么首先需要再华为云IOT平台添加命令。
添加一个控制命令。
命令添加完成:
注意:下发命令是同步的,设备端必须在线才可以下发命令。
这个下发的命令是有反馈。设备端收到之后,可以向服务器反馈状态,这样服务器才能知道刚才的控制命令确实发送成功了。
设备收到信息之后,上传回应给服务器的主题和内容格式:
Topic:$oc/devices/{device_id}/sys/commands/response/request_id={request_id}
数据格式:
{
"result_code": 0,
"response_name": "COMMAND_RESPONSE",
"paras": {
"result": "success"
}
}
云端发送控制命令之后,设备收到的消息如下:
$oc/devices/65697df3585c81787ad4da82_stm32/sys/commands/request_id=d49f0bb9-ba87-4c9b-b915-98a1f0fcf689{"paras":{"lock":true},"service_id":"lock","command_name":"锁开关控制"}
其中request_id=d49f0bb9-ba87-4c9b-b915-98a1f0fcf689
就是本次的请求ID。回应的时候需要加上请求ID。服务器才好对应。
以当前设备为例:
发布的主题这样填: $oc/devices/65113d05a559fd7cd41435f8_lock1/sys/commands/response/request_id=ce49181e-7636-4b24-946d-c034ca083c1c
发布的内容这样填:
{"result_code":0,"response_name":"COMMAND_RESPONSE","paras":{"result":"success"}}
为了能够实时接收服务器的代码,我们单独增加一个线程来接收服务器的消息。
在主函数里MQTT服务器连接成功之后,增加以下代码:
/*创建线程*/
HANDLE hThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ReceiveData, NULL, 0, NULL);
if (hThread == NULL) {
printf("CreateThread failed.\n");
return 1;
}
编写线程的工作函数:
// 处理服务器下发的数据 void ReceiveData(void) { // 接收数据 char buffer[1024]; char request_id[100]; char send_cmd[500]; int recvSize; while (1) { //等待服务器下发消息 recvSize = recv(connectSocket, buffer, 1024, 0); if (recvSize == SOCKET_ERROR) { std::cout << "网络断开连接: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序 return; } if (recvSize > 0) { printf("服务器下发消息:\r\n"); //接收下发的数据 for (int i = 0; i < recvSize; i++) { printf("%c", buffer[i]); } printf("\r\n"); //下发指令请求回应给服务器(命令下发) if (strstr((char*)&buffer[5], "sys/commands/request_id=")) { char* p = NULL; p = strstr((char*)&buffer[5], "request_id="); if (p) { //解析数据 //$oc/devices/65697df3585c81787ad4da82_stm32/sys/commands/request_id=6e925cc1-4a8d-4eab-8d85-6c7f15d72189 strncpy(request_id, p, 47); } //上报数据 sprintf(mqtt_message, "{\"result_code\":0,\"response_name\":\"COMMAND_RESPONSE\",\"paras\":{\"result\":\"success\"}}"); sprintf(send_cmd, "$oc/devices/65697df3585c81787ad4da82_stm32/sys/commands/response/%s", request_id); MQTT_PublishData(send_cmd, mqtt_message, 0); printf("(命令)发布主题:%s\r\n", send_cmd); printf("(命令)发布数据:%s\r\n", mqtt_message); } } } }
先运行客户端的代码,登录MQTT服务器。
然后,在华为云IOT平台下发命令。 如果点击下发之后,右上角弹出了 命令下发成功
,就表示我们客户端代码写OK了。
我们看设备端收到的消息:
到此,我们的MQTT协议已经开发完成了。如果大家详细阅读了文章,并且跟着步骤操作了一次,相信你此刻对MQTT协议应该有所认识了。我是DS小龙哥
,欢迎关注我,后续会有更多的技术文章、项目文章发布。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。