当前位置:   article > 正文

ESP32 基于4G模块透传 模式 实现MQTT通信_esp32 4g上网

esp32 4g上网

这里选择的是SIM7600CE 和EC20 4G通信模块,工作在透传模式

  1. 创建GPRS 拨号的通信接口源文件 GprsUtils.c ,实现4G模块 拨号及连接服务器功能

  1. #include <string.h>
  2. #include <stdint.h>
  3. #include "freertos/FreeRTOS.h"
  4. #include "freertos/task.h"
  5. #include "GprsUtils.h"
  6. #include "UartUtils.h"
  7. #include "driver/gpio.h"
  8. static uint8_t find_string(const char * recvBuff,uint16_t recvBuffLen,const char* p)
  9. {
  10. if(recvBuffLen>0&&strstr(recvBuff,p)!=NULL)
  11. {
  12. return 1;
  13. }
  14. else
  15. {
  16. return 0;
  17. }
  18. }
  19. static void delay_ms(int waitTimeMs){
  20. vTaskDelay(waitTimeMs/portTICK_PERIOD_MS);
  21. }
  22. void gprs_send_string(char* s){
  23. uartSendBytes(GPRSPORT,(uint8_t *)s,strlen(s));
  24. }
  25. void gprs_recv_ack(uint8_t *buf,int *len,int wait_time){
  26. uartRecvBytes(GPRSPORT, buf, len,MAX_UART_RECV_BUFF_SIZE,wait_time);
  27. }
  28. uint8_t gprs_send_cmd(char *b,char *a,uint8_t times,uint16_t wait_time)
  29. {
  30. uint8_t i;
  31. i = 0;
  32. while(i < times)
  33. {
  34. char gprsBuf[MAX_UART_RECV_BUFF_SIZE]={0};
  35. int gprsBufLen=0;
  36. gprs_send_string(b);
  37. gprs_send_string("\r\n"); // 回车换行
  38. gprs_recv_ack((uint8_t *)gprsBuf,&gprsBufLen,wait_time);
  39. if(find_string(gprsBuf,gprsBufLen,a))
  40. return 1;
  41. i++;
  42. }
  43. return 0;
  44. }
  45. /******************************************************************************
  46. * 函数名称:str_delim
  47. * 描 述:字符串分割函数
  48. * 输入参数: num 0---取出分隔字符串(delim)前面的字符串 1---取出分隔字符串(delim)后面的字符串
  49. * temp 要分割的字符串
  50. * delim 分隔符字符串
  51. * 输 出:
  52. * 返 回:
  53. * 说 明:
  54. *******************************************************************************/
  55. char *str_delim(uint8_t num,char *temp,char *delim)
  56. {
  57. int i;
  58. char *str[2]={0};
  59. char *tok=temp;
  60. char *restr;
  61. for(i=0;i<2;i++)
  62. {
  63. tok=strtok(tok,delim);
  64. str[i]=tok;
  65. tok = NULL;
  66. }
  67. restr=str[num];
  68. return restr;
  69. }
  70. uint8_t sim7600CheckStatus(void);
  71. uint8_t sim7600SelectNet(void);
  72. uint8_t sim7600NetConfig(void);
  73. uint8_t sim7600TcpConnect(const char *serverIp,int port);
  74. uint8_t sim7600ConnectServer(const char *serverIp,int port);
  75. void sim7600ExitDataMode(void);
  76. uint8_t sim7600CloseTcpConnect(void);
  77. uint8_t sim7600CloseNet(void);
  78. /******************************************************************************
  79. * 函数名称: check_status
  80. * 描 述: 核心板基本状态测试
  81. * 输入参数: 无
  82. * 输 出: 无
  83. * 返 回: 0 --- 出错 1 --- 正确
  84. * 说 明: 核心板开机后,先判断AT命令是否正常、能否读到卡、能否注册网络。确认无误后
  85. * 再进行其他功能测试
  86. *******************************************************************************/
  87. uint8_t sim7600CheckStatus(void)
  88. {
  89. sim7600ExitDataMode(); // 退出透传模式
  90. // 同步波特率
  91. if(!gprs_send_cmd("AT","OK",5,200))
  92. {
  93. printf("handshake failed\r\n");
  94. return 0;
  95. }
  96. // 关闭网络,避免后面因为网络已经开启,导致打开网络失败
  97. gprsCloseNet();
  98. // 取消回显
  99. if(!gprs_send_cmd("ATE0","OK",1,200)){
  100. printf("echo cancelled failed\r\n");
  101. return 0;
  102. }
  103. // 查询核心板能否读到SIM卡
  104. if(!gprs_send_cmd("AT+CPIN?","+CPIN: READY",2,500)){
  105. printf("no sim card detected\r\n");
  106. return 0;
  107. }
  108. if(!gprs_send_cmd("AT+CSQ","+CSQ",2,100))
  109. {
  110. printf("serching CSQ failed\r\n");
  111. return 0;
  112. }
  113. if(!gprs_send_cmd("AT+COPS?","OK",2,100))
  114. {
  115. printf("get Operator failed\r\n");
  116. return 0;
  117. }
  118. // 选择网络
  119. if(!sim7600SelectNet()) // 第一次选择好网路后,可直接去掉该函数
  120. return 0;
  121. return 1;
  122. }
  123. /******************************************************************************
  124. * 函数名称: select_net
  125. * 描 述: 核心板选择网络
  126. * 输入参数:
  127. * 输 出:
  128. * 返 回: 0 --- 出错 1 --- 正确
  129. * 说 明: SIM卡首次插入核心板测试时,需要选择网络,选择好网络后,如果不换卡,则不需要再执行该函数
  130. *******************************************************************************/
  131. uint8_t sim7600SelectNet(void)
  132. {
  133. // 设置APN
  134. if(!gprs_send_cmd("AT+CGSOCKCONT=1,\"IP\",\"CMNET\"","OK",1,500)){
  135. printf("set APN failed\r\n");
  136. return 0;
  137. }
  138. // 选择网络,可在常量声明中修改
  139. if(!gprs_send_cmd("AT+CNMP=38","OK",1,200)){
  140. printf("select network failed\r\n");
  141. return 0;
  142. }
  143. //查询核心板所处网络
  144. if(!gprs_send_cmd("AT+CNMP?","+CNMP: 38",3,500)){
  145. printf("register network failed\r\n");
  146. return 0;
  147. }
  148. // 查询核心板是否注册成功
  149. if(!gprs_send_cmd("AT+CREG?","+CREG: 0,1",5,1000)){
  150. printf("register network failed\r\n");
  151. return 0;
  152. }
  153. return 1;
  154. }
  155. /******************************************************************************
  156. * 函数名称: net_config
  157. * 描 述: 核心板进行连接前的网络配置
  158. * 输入参数: 无
  159. * 输 出: 无
  160. * 返 回: 0 --- 出错 1 --- 正确
  161. * 说 明: 判断核心板是否可以进行通信连接
  162. *******************************************************************************/
  163. uint8_t sim7600NetConfig(void)
  164. {
  165. // 激活启动场景
  166. if(!gprs_send_cmd("AT+CSOCKSETPN=1","OK",1,200) ){
  167. printf("activate mobile scene failed\r\n");
  168. return 0;
  169. }
  170. // 设置为非透传模式
  171. if(!gprs_send_cmd("AT+CIPMODE?","+CIPMODE: 1",1,200)){ // 这里不能直接设置非透传模式,避免因为重复设置导致ERROR
  172. if(!gprs_send_cmd("AT+CIPMODE=1","OK",1,200)){
  173. printf("Transparent transmission mode failed\r\n");
  174. return 0;
  175. }
  176. }
  177. // 打开网络
  178. if(!gprs_send_cmd("AT+NETOPEN","+NETOPEN: 0",1,5000)){ // 不能重复操作,否则会ERROR
  179. printf("open network failed\r\n");
  180. return 0;
  181. }
  182. // 获取本地IP地址
  183. if(gprs_send_cmd("AT+IPADDR","ERROR",1,2000))
  184. {
  185. printf("get local Ipaddress failed\r\n");
  186. return 0;
  187. }
  188. /************
  189. else
  190. {
  191. printf("11.获取本地IP成功\r\n");
  192. printf("%s",str_delim(1,gprsBuf,":")); // 将获取到的本地IP地址打印到串口调试助手
  193. }
  194. ************/
  195. return 1;
  196. }
  197. /******************************************************************************
  198. * 函数名称: tcp_connect
  199. * 描 述: 核心板与云服务器进行TCP透传模式通信
  200. * 输入参数: 无
  201. * 输 出: 无
  202. * 返 回: 0 --- 出错 1 --- 正确
  203. * 说 明: 无
  204. *******************************************************************************/
  205. uint8_t sim7600TcpConnect(const char *serverIp,int port)
  206. {
  207. char serverIpCmd[512]={0};
  208. sprintf(serverIpCmd,"AT+CIPOPEN=0,\"TCP\",\"%s\",%d\r\n",serverIp,port); // TCP服务器IP地址,可自行修改
  209. // 建立TCP连接
  210. if(!gprs_send_cmd((char*)serverIpCmd,"CONNECT",2,3000)){
  211. printf("set Tcp Transparent transmission mode failed \r\n");
  212. return 0;
  213. }
  214. return 1;
  215. }
  216. uint8_t sim7600ConnectServer(const char *serverIp,int port){
  217. if(!sim7600CheckStatus()) // 判断核心板状态是否正常
  218. {
  219. printf("*** check_status failed ***\r\n");
  220. return 0;
  221. }else
  222. printf("*** check_status ok ***\r\n");
  223. if(!sim7600NetConfig()){ // 判断通信连接是否正常
  224. printf("*** net_config failed ***\r\n");
  225. return 0;
  226. }else
  227. printf("*** net_config OK ***\r\n");
  228. return sim7600TcpConnect(serverIp,port);
  229. }
  230. /********************************************************************************
  231. * 函数名称: exit_data_mode
  232. * 描 述: 退出数据模式
  233. * 输入参数: 无
  234. * 输 出: 无
  235. * 返 回: 无
  236. * 说 明: 发送“+++”,确保核心板退出透传数据模式,前后1s延时
  237. *******************************************************************************/
  238. void sim7600ExitDataMode(void)
  239. {
  240. char gprsBuf[MAX_UART_RECV_BUFF_SIZE]={0};
  241. int gprsBufLen=0;
  242. delay_ms(1000);
  243. gprs_send_string("+++");
  244. gprs_recv_ack((uint8_t *)gprsBuf,&gprsBufLen,1000);
  245. }
  246. /********************************************************************************
  247. * 函数名称: close_tcp_connect
  248. * 描 述: 关闭TCP连接
  249. * 输入参数: 无
  250. * 输 出: 无
  251. * 返 回: 无
  252. * 说 明: 关闭TCP连接
  253. *******************************************************************************/
  254. uint8_t sim7600CloseTcpConnect(void)
  255. {
  256. // 退出透传数据模式
  257. sim7600ExitDataMode();
  258. delay_ms(1500);
  259. // 关闭TCP连接
  260. if(!gprs_send_cmd("AT+CIPCLOSE=0","OK",2,2000)){ // 关闭连接耗时较长(+CIPCLOSE: 0,0),可根据实际情况调整
  261. printf("close tcp port failed\n");
  262. return 0;
  263. }
  264. return 1;
  265. }
  266. uint8_t sim7600CloseNet(void){
  267. // 关闭网络
  268. sim7600CloseTcpConnect();
  269. if(!gprs_send_cmd("AT+NETCLOSE","+NETCLOSE:",5,3000))
  270. {
  271. printf("close network failed\r\n");
  272. return 0;
  273. }
  274. return 1;
  275. }
  276. /EC20 Start/
  277. uint8_t ec20ConnectServer(const char *serverIp,int port);
  278. uint8_t ec20CloseNet(void);
  279. uint8_t ec20ConnectServer(const char *serverIp,int port){
  280. printf("ec20ConnectServer:%s %d\n",serverIp,port);
  281. if(!gprs_send_cmd("AT","OK",2,1000)){
  282. printf("handshake failed\r\n");
  283. return 0;
  284. }
  285. // 查询核心板能否读到SIM卡
  286. if(!gprs_send_cmd("AT+CPIN?","+CPIN: READY",2,1000)){
  287. printf("no sim card detected\r\n");
  288. return 0;
  289. }
  290. if(!gprs_send_cmd("AT+CSQ","+CSQ",2,1000))
  291. {
  292. printf("serching CSQ failed\r\n");
  293. return 0;
  294. }
  295. // 查询核心板是否注册成功
  296. if(!gprs_send_cmd("AT+CREG?","+CREG: 0,1",5,1000)){
  297. printf("register GSM network failed\r\n");
  298. return 0;
  299. }
  300. if(!gprs_send_cmd("AT+CGREG?","OK",5,1000)){
  301. printf("register GPRS network failed\r\n");
  302. return 0;
  303. }
  304. // 设置APN
  305. if(!gprs_send_cmd("AT+QICSGP=1,1,\"CMNET\"","OK",2,1000)){
  306. printf("set APN failed\r\n");
  307. return 0;
  308. }
  309. if(!gprs_send_cmd("AT+QIDEACT=1","OK",2,1000)){
  310. printf("Deactivates a PDP context\r\n");
  311. return 0;
  312. }
  313. if(!gprs_send_cmd("AT+QIACT=1","OK",2,1000)){
  314. printf("activates a PDP context\r\n");
  315. return 0;
  316. }
  317. char serverIpCmd[512]={0};
  318. sprintf(serverIpCmd,"AT+QIOPEN=1,0,\"TCP\",\"%s\",%d,0,2\r\n",serverIp,port); // TCP服务器IP地址,可自行修改
  319. // 建立TCP连接
  320. if(!gprs_send_cmd((char*)serverIpCmd,"CONNECT",2,3000)){
  321. printf("set Tcp Transparent transmission mode failed \r\n");
  322. return 0;
  323. }
  324. return 1;
  325. }
  326. uint8_t ec20CloseNet(void){
  327. sim7600ExitDataMode();
  328. delay_ms(2000);
  329. // 关闭TCP连接
  330. if(!gprs_send_cmd("AT+QICLOSE=0","OK",2,2000)){ // 关闭连接耗时较长(+CIPCLOSE: 0,0),可根据实际情况调整
  331. printf("close tcp port failed\n");
  332. return 0;
  333. }
  334. return 1;
  335. }
  336. /EC20 End/
  337. //#define SIM7600CE_4G 1
  338. #define EC20_4G 1
  339. void gprsInit(int uartPort,uint32_t bound){
  340. uart_init(uartPort,bound);
  341. gprsPower(1);
  342. gprsReset(0);
  343. delay_ms(2000);
  344. }
  345. uint8_t gprsTryConnectServer(const char *serverIp,int port){
  346. #if SIM7600CE_4G
  347. return sim7600ConnectServer(serverIp,port);
  348. #else
  349. return ec20ConnectServer(serverIp,port);
  350. #endif
  351. }
  352. uint8_t gprsCloseNet(void)
  353. {
  354. #if SIM7600CE_4G
  355. return sim7600CloseNet();
  356. #else
  357. return ec20CloseNet();
  358. #endif
  359. }
  360. int gprsTcpWrite(uint8_t *data,int len){
  361. return uartSendBytes(GPRSPORT,data,len);
  362. }
  363. int gprsTcpRead(uint8_t *recvbuf,int maxRecvLen,int waitTime){
  364. int readLen=0;
  365. uartRecvBytes(GPRSPORT, recvbuf, &readLen,maxRecvLen,waitTime);
  366. return readLen;
  367. }
  368. int gprsTcpAvailable(int waitTime){
  369. return uartHasDataReceived(GPRSPORT, waitTime);
  370. }
  371. void gprsReboot(void){
  372. gpio_set_level(SIM_RST_Pin,1);
  373. delay_ms(2000);
  374. gpio_set_level(SIM_RST_Pin, 0);
  375. delay_ms(20000);
  376. }
  377. void gprsPower(uint8_t isOn){
  378. gpio_set_level(SIM_PEN_Pin,isOn);
  379. }
  380. void gprsReset(uint8_t isOn){
  381. gpio_set_level(SIM_RST_Pin, isOn);
  382. }
  383. void gprsRepowerOn(void) {
  384. gpio_set_level(SIM_PEN_Pin,1);
  385. delay_ms(4000);
  386. }
  387. void gprsRepowerOff(void) {
  388. gpio_set_level(SIM_PEN_Pin,0);
  389. delay_ms(2000);
  390. }

  1. 移植MQTT 通信协议

https://github.com/eclipse/paho.mqtt.embedded-c/tree/master/MQTTPacket 下载MQTT通信源码包, 将src目录下所有文件拷贝到自己的工程目录下,然后移植samples目录下的transport.c

  1. #include "transport.h"
  2. #include "lwip/opt.h"
  3. #include "lwip/arch.h"
  4. #include "lwip/api.h"
  5. #include "lwip/inet.h"
  6. #include "lwip/sockets.h"
  7. #include "string.h"
  8. #include "../GprsUtils/GprsUtils.h"
  9. static int mysock;
  10. /************************************************************************
  11. ** 函数名称: transport_sendPacketBuffer
  12. ** 函数功能: 以TCP方式发送数据
  13. ** 入口参数: unsigned char* buf:数据缓冲区
  14. ** int buflen:数据长度
  15. ** 出口参数: <0发送数据失败
  16. ************************************************************************/
  17. int transport_sendPacketBuffer( uint8_t* buf, int buflen)
  18. {
  19. return gprsTcpWrite(buf,buflen);
  20. }
  21. /************************************************************************
  22. ** 函数名称: transport_getdata
  23. ** 函数功能: 以阻塞的方式接收TCP数据
  24. ** 入口参数: unsigned char* buf:数据缓冲区
  25. ** int count:数据长度
  26. ** 出口参数: <=0接收数据失败
  27. ************************************************************************/
  28. int transport_getdata(uint8_t* buf, int count)
  29. {
  30. return gprsTcpRead(buf,count,500);
  31. }
  32. /************************************************************************
  33. ** 函数名称: transport_open
  34. ** 函数功能: 打开一个接口,并且和服务器 建立连接
  35. ** 入口参数: char* servip:服务器域名
  36. ** int port:端口号
  37. ** 出口参数: <0打开连接失败
  38. ************************************************************************/
  39. int transport_open(const char *servip, int port)
  40. {
  41. return gprsTryConnectServer(servip,port);
  42. }
  43. /************************************************************************
  44. ** 函数名称: transport_close
  45. ** 函数功能: 关闭套接字
  46. ** 入口参数: unsigned char* buf:数据缓冲区
  47. ** int buflen:数据长度
  48. ** 出口参数: <0发送数据失败
  49. ************************************************************************/
  50. int transport_close(void)
  51. {
  52. return gprsCloseNet();
  53. }
  1. 创建MQTT通信接口源文件,包含连接MQTT服务器、订阅、发布,接收

  1. #include <string.h>
  2. #include "freertos/FreeRTOS.h"
  3. #include "freertos/task.h"
  4. #include "MqttUtils.h"
  5. #include "../GprsUtils/GprsUtils.h"
  6. /************************************************************************
  7. ** 函数名称: MQTTClientInit
  8. ** 函数功能: 初始化客户端并登录服务器
  9. ** 入口参数: int sock:网络描述符
  10. ** 出口参数: >=0:发送成功 <0:发送失败
  11. ** 备 注:
  12. ************************************************************************/
  13. static int MQTTClientInit(const char *username,const char *password,int keepalive){
  14. MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
  15. uint8_t buf[100];
  16. int buflen = sizeof(buf);
  17. int len;
  18. uint8_t sessionPresent,connack_rc;
  19. //创建MQTT客户端连接参数
  20. connectData.willFlag = 0;
  21. //MQTT版本
  22. connectData.MQTTVersion = 4;
  23. //客户端ID--必须唯一
  24. connectData.clientID.cstring = "MIMITOTO";
  25. //保活间隔
  26. connectData.keepAliveInterval = keepalive;
  27. if(username&&password){
  28. //用户名
  29. connectData.username.cstring = username;
  30. //用户密码
  31. connectData.password.cstring = password;
  32. }
  33. //清除会话
  34. connectData.cleansession = 1;
  35. //串行化连接消息
  36. len = MQTTSerialize_connect(buf, buflen, &connectData);
  37. //发送TCP数据
  38. if(transport_sendPacketBuffer(buf, len) < 0)
  39. return -1;
  40. if(gprsTcpAvailable(2000)<0)
  41. return -2;
  42. if(MQTTPacket_read(buf, buflen, transport_getdata) != CONNACK)
  43. return -3;
  44. //拆解连接回应包
  45. if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
  46. return -4;
  47. if(sessionPresent == 1)
  48. return 1;//不需要重新订阅--服务器已经记住了客户端的状态
  49. else
  50. return 0;//需要重新订阅
  51. }
  52. int mqttConnnectServer(const char *servip, int port,const char *username,const char *password,int keepalive){
  53. int sessionPresent=-1;
  54. if(!transport_open(servip,port))
  55. return -1;
  56. if((sessionPresent=MQTTClientInit(username,password,keepalive))<0){
  57. transport_close();
  58. return -1;
  59. }
  60. return sessionPresent;
  61. }
  62. /************************************************************************
  63. ** 函数名称: ReadPacketTimeout
  64. ** 函数功能: 阻塞读取MQTT数据
  65. ** 入口参数: int sock:网络描述符
  66. ** uint8_t *buf:数据缓存区
  67. ** int buflen:缓冲区大小
  68. ** uint32_t timeout:超时时间--0-表示直接查询,没有数据立即返回
  69. ** 出口参数: -1:错误,其他--包类型
  70. ** 备 注:
  71. ************************************************************************/
  72. int ReadPacketTimeout(uint8_t *buf,int buflen,uint32_t timeout)
  73. {
  74. if(timeout != 0)
  75. {
  76. if(gprsTcpAvailable(timeout)<0)
  77. return -1;
  78. }
  79. //读取TCP/IP事件
  80. return MQTTPacket_read(buf, buflen, transport_getdata);
  81. }
  82. /************************************************************************
  83. ** 函数名称: MQTTSubscribe
  84. ** 函数功能: 订阅消息
  85. ** 入口参数: int sock:套接字
  86. ** char *topic:主题
  87. ** enum QoS pos:消息质量
  88. ** 出口参数: >=0:发送成功 <0:发送失败
  89. ** 备 注:
  90. ************************************************************************/
  91. int MQTTSubscribe(char *topic,enum QoS pos){
  92. static uint32_t PacketID = 0;
  93. uint16_t packetidbk = 0;
  94. int conutbk = 0;
  95. uint8_t buf[100];
  96. int buflen = sizeof(buf);
  97. MQTTString topicString = MQTTString_initializer;
  98. int len;
  99. int req_qos,qosbk;
  100. //复制主题
  101. topicString.cstring = (char *)topic;
  102. //订阅质量
  103. req_qos = pos;
  104. //串行化订阅消息
  105. len = MQTTSerialize_subscribe(buf, buflen, 0, PacketID++, 1, &topicString, &req_qos);
  106. //发送TCP数据
  107. if(transport_sendPacketBuffer(buf, len) < 0)
  108. return -1;
  109. if(gprsTcpAvailable(2000) <0 ){
  110. return -2;
  111. }
  112. //等待订阅返回--未收到订阅返回
  113. if(MQTTPacket_read(buf, buflen, transport_getdata) != SUBACK)
  114. return -4;
  115. //拆订阅回应包
  116. if(MQTTDeserialize_suback(&packetidbk,1, &conutbk, &qosbk, buf, buflen) != 1)
  117. return -5;
  118. //检测返回数据的正确性
  119. if((qosbk == 0x80)||(packetidbk != (PacketID-1)))
  120. return -6;
  121. //订阅成功
  122. return 0;
  123. }
  124. /************************************************************************
  125. ** 函数名称: GetNextPackID
  126. ** 函数功能: 产生下一个数据包ID
  127. ** 入口参数: 无
  128. ** 出口参数: uint16_t packetid:产生的ID
  129. ** 备 注:
  130. ************************************************************************/
  131. static uint16_t GetNextPackID(void)
  132. {
  133. static uint16_t pubpacketid = 0;
  134. return pubpacketid++;
  135. }
  136. /************************************************************************
  137. ** 函数名称: WaitForPacket
  138. ** 函数功能: 等待特定的数据包
  139. ** 入口参数: int sock:网络描述符
  140. ** uint8_t packettype:包类型
  141. ** uint8_t times:等待次数
  142. ** 出口参数: >=0:等到了特定的包 <0:没有等到特定的包
  143. ** 备 注:
  144. ************************************************************************/
  145. static int WaitForPacket(uint8_t packettype,uint8_t times)
  146. {
  147. int type;
  148. uint8_t buf[MSG_MAX_LEN];
  149. uint8_t n = 0;
  150. int buflen = sizeof(buf);
  151. do
  152. {
  153. //读取数据包
  154. type = ReadPacketTimeout(buf,buflen,2000);
  155. if(type != -1)
  156. mqtt_pktype_ctl(type,buf,buflen);
  157. n++;
  158. }while((type != packettype)&&(n < times));
  159. //收到期望的包
  160. if(type == packettype)
  161. return 0;
  162. else
  163. return -1;
  164. }
  165. int MQTTMsgPublish(char *topic, char qos, char retained,uint8_t* msg,uint32_t msg_len){
  166. uint8_t buf[MSG_MAX_LEN];
  167. int buflen = sizeof(buf),len;
  168. MQTTString topicString = MQTTString_initializer;
  169. uint16_t packid = 0,packetidbk=0;
  170. //填充主题
  171. topicString.cstring = (char *)topic;
  172. //填充数据包ID
  173. if((qos == QOS1)||(qos == QOS2))
  174. {
  175. packid = GetNextPackID();
  176. }
  177. else
  178. {
  179. qos = QOS0;
  180. retained = 0;
  181. packid = 0;
  182. }
  183. //推送消息
  184. len = MQTTSerialize_publish(buf, buflen, 0, qos, retained, packid, topicString, (unsigned char*)msg, msg_len);
  185. if(len <= 0)
  186. return -1;
  187. if(transport_sendPacketBuffer(buf, len) < 0)
  188. return -2;
  189. //质量等级0,不需要返回
  190. if(qos == QOS0)
  191. {
  192. return 0;
  193. }
  194. //等级1
  195. if(qos == QOS1)
  196. {
  197. //等待PUBACK
  198. if(WaitForPacket(PUBACK,5) < 0)
  199. return -3;
  200. return 1;
  201. }
  202. //等级2
  203. if(qos == QOS2)
  204. {
  205. //等待PUBREC
  206. if(WaitForPacket(PUBREC,5) < 0)
  207. return -3;
  208. //发送PUBREL
  209. len = MQTTSerialize_pubrel(buf, buflen,0, packetidbk);
  210. if(len == 0)
  211. return -4;
  212. if(transport_sendPacketBuffer(buf, len) < 0)
  213. return -6;
  214. //等待PUBCOMP
  215. if(WaitForPacket(PUBREC,5) < 0)
  216. return -7;
  217. return 2;
  218. }
  219. //等级错误
  220. return -8;
  221. }
  222. int my_mqtt_send_pingreq(){
  223. int len;
  224. uint8_t buf[200];
  225. int buflen = sizeof(buf);
  226. len = MQTTSerialize_pingreq(buf, buflen);
  227. transport_sendPacketBuffer(buf, len);
  228. if(gprsTcpAvailable(5000)<0)
  229. return -1;
  230. if(MQTTPacket_read(buf, buflen, transport_getdata) != PINGRESP)
  231. return -2;
  232. return 0;
  233. }
  234. int mqttHasDataIn(int waitTime){
  235. return gprsTcpAvailable(waitTime);
  236. }
  237. 消息处理/
  238. /************************************************************************
  239. ** 函数名称: deliverMessage
  240. ** 函数功能: 接受服务器发来的消息
  241. ** 入口参数: MQTTMessage *msg:MQTT消息结构体
  242. ** MQTT_USER_MSG *mqtt_user_msg:用户接受结构体
  243. ** MQTTString *TopicName:主题
  244. ** 出口参数: 无
  245. ** 备 注:
  246. ************************************************************************/
  247. static void deliverMessage(MQTTString *TopicName,MQTTMessage *msg,MQTT_USER_MSG *mqtt_user_msg)
  248. {
  249. //消息质量
  250. mqtt_user_msg->msgqos = msg->qos;
  251. //保存消息
  252. memcpy(mqtt_user_msg->msg,msg->payload,msg->payloadlen);
  253. mqtt_user_msg->msg[msg->payloadlen] = '\0';
  254. //保存消息长度
  255. mqtt_user_msg->msglenth = msg->payloadlen;
  256. //消息主题
  257. memcpy((char *)mqtt_user_msg->topic,TopicName->lenstring.data,TopicName->lenstring.len);
  258. mqtt_user_msg->topic[TopicName->lenstring.len] = 0;
  259. //消息ID
  260. mqtt_user_msg->packetid = msg->id;
  261. //标明消息合法
  262. mqtt_user_msg->valid = 1;
  263. }
  264. /************************************************************************
  265. ** 函数名称: UserMsgCtl
  266. ** 函数功能: 用户消息处理函数
  267. ** 入口参数: MQTT_USER_MSG *msg:消息结构体指针
  268. ** 出口参数: 无
  269. ** 备 注:
  270. ************************************************************************/
  271. void UserMsgCtl(MQTT_USER_MSG *msg)
  272. {
  273. //这里处理数据只是打印,用户可以在这里添加自己的处理方式
  274. printf("MQTT>>****收到客户端自己订阅的消息!!****\n");
  275. //返回后处理消息
  276. switch(msg->msgqos)
  277. {
  278. case 0:
  279. printf("MQTT>>消息质量:QoS0\n");
  280. break;
  281. case 1:
  282. printf("MQTT>>消息质量:QoS1\n");
  283. break;
  284. case 2:
  285. printf("MQTT>>消息质量:QoS2\n");
  286. break;
  287. default:
  288. printf("MQTT>>错误的消息质量\n");
  289. break;
  290. }
  291. printf("MQTT>>消息主题:%s\n",msg->topic);
  292. printf("MQTT>>消息类容:%s\n",msg->msg);
  293. printf("MQTT>>消息长度:%d\n",msg->msglenth);
  294. //处理完后销毁数据
  295. msg->valid = 0;
  296. char MqttSendbuf[512]={0};
  297. static int timeCount=0;
  298. sprintf(MqttSendbuf,"Hello server %d",timeCount++);
  299. MQTTMsgPublish("mqttSendTopic",QOS0, 0,(uint8_t *)MqttSendbuf,strlen(MqttSendbuf));
  300. }
  301. /************************************************************************
  302. ** 函数名称: mqtt_pktype_ctl
  303. ** 函数功能: 根据包类型进行处理
  304. ** 入口参数: uint8_t packtype:包类型
  305. ** 出口参数: 无
  306. ** 备 注:
  307. ************************************************************************/
  308. void mqtt_pktype_ctl(uint8_t packtype,uint8_t *buf,uint32_t buflen)
  309. {
  310. MQTTMessage msg;
  311. int rc;
  312. MQTTString receivedTopic;
  313. uint32_t len;
  314. MQTT_USER_MSG mqtt_user_msg;
  315. switch(packtype)
  316. {
  317. case PUBLISH:
  318. //拆析PUBLISH消息
  319. if(MQTTDeserialize_publish(&msg.dup,(int*)&msg.qos, &msg.retained, &msg.id, &receivedTopic,(unsigned char **)&msg.payload, &msg.payloadlen, buf, buflen) != 1)
  320. return;
  321. //接受消息
  322. deliverMessage(&receivedTopic,&msg,&mqtt_user_msg);
  323. //消息质量不同,处理不同
  324. if(msg.qos == QOS0)
  325. {
  326. //QOS0-不需要ACK
  327. //直接处理数据
  328. UserMsgCtl(&mqtt_user_msg);
  329. return;
  330. }
  331. //发送PUBACK消息
  332. if(msg.qos == QOS1)
  333. {
  334. len =MQTTSerialize_puback(buf,buflen,mqtt_user_msg.packetid);
  335. if(len == 0)
  336. return;
  337. //发送返回
  338. if(transport_sendPacketBuffer(buf,len)<0)
  339. return;
  340. //返回后处理消息
  341. UserMsgCtl(&mqtt_user_msg);
  342. return;
  343. }
  344. //对于质量2,只需要发送PUBREC就可以了
  345. if(msg.qos == QOS2)
  346. {
  347. len = MQTTSerialize_ack(buf, buflen, PUBREC, 0, mqtt_user_msg.packetid);
  348. if(len == 0)
  349. return;
  350. //发送返回
  351. transport_sendPacketBuffer(buf,len);
  352. }
  353. break;
  354. case PUBREL:
  355. //解析包数据,必须包ID相同才可以
  356. rc = MQTTDeserialize_ack(&msg.type,&msg.dup, &msg.id, buf,buflen);
  357. if((rc != 1)||(msg.type != PUBREL)||(msg.id != mqtt_user_msg.packetid))
  358. return ;
  359. //收到PUBREL,需要处理并抛弃数据
  360. if(mqtt_user_msg.valid == 1)
  361. {
  362. //返回后处理消息
  363. UserMsgCtl(&mqtt_user_msg);
  364. }
  365. //串行化PUBCMP消息
  366. len = MQTTSerialize_pubcomp(buf,buflen,msg.id);
  367. if(len == 0)
  368. return;
  369. //发送返回--PUBCOMP
  370. transport_sendPacketBuffer(buf,len);
  371. break;
  372. case PUBACK://等级1客户端推送数据后,服务器返回
  373. break;
  374. case PUBREC://等级2客户端推送数据后,服务器返回
  375. break;
  376. case PUBCOMP://等级2客户端推送PUBREL后,服务器返回
  377. break;
  378. default:
  379. break;
  380. }
  381. }
  1. 创建MQTT 任务

  1. #include <stdint.h>
  2. #include "freertos/FreeRTOS.h"
  3. #include "freertos/task.h"
  4. #include "string.h"
  5. #include "freertos/queue.h"
  6. #include "malloc.h"
  7. #include "taskMqttClient.h"
  8. #include "../MQTT/transport.h"
  9. #include "../MQTT/MQTTPacket.h"
  10. #include "../MqttUtils/MqttUtils.h"
  11. #include "../GprsUtils/GprsUtils.h"
  12. #define HOST_NAME "183.220.123.35"
  13. #define HOST_PORT 17788
  14. #define USER_NAME "jdtf1"
  15. #define USER_PASSWD "jdtf1"
  16. #define MQTT_RECV_TOPIC "mqttRecvTopic"
  17. #define MQTT_SEND_TOPIC "mqttSendTopic"
  18. #define MQTT_RECV_TASK_STACKSIZE (configMINIMAL_STACK_SIZE*8)
  19. #define MQTT_RECV_TASK_PRIO 4
  20. static TaskHandle_t MqttRecvTask_Handler;
  21. static void mqttRecvThread(void *pvParameters);
  22. #define MQTT_SEND_TASK_STACKSIZE (configMINIMAL_STACK_SIZE*8)
  23. #define MQTT_SEND_TASK_PRIO 5
  24. static TaskHandle_t MqttSendTask_Handler;
  25. static void mqttSendThread(void *pvParameters);
  26. QueueHandle_t MQTT_Data_Queue =NULL;
  27. #define MQTT_QUEUE_LEN 10 /* 队列的长度,最大可包含多少个消息 */
  28. #define MQTT_QUEUE_SIZE sizeof(MQTT_MSG_ITEM ) /* 队列中每个消息大小(字节) */
  29. void mqtt_thread_init(void){
  30. /* 创建Test_Queue */
  31. MQTT_Data_Queue = xQueueCreate((UBaseType_t ) MQTT_QUEUE_LEN,(UBaseType_t ) MQTT_QUEUE_SIZE);/* 消息的大小 */
  32. if(NULL != MQTT_Data_Queue)
  33. printf("The MQTT_Data_Queue was created successfully!\r\n");
  34. xTaskCreate(mqttRecvThread, (const char* ) "MQTTRecvTask", MQTT_RECV_TASK_STACKSIZE, NULL, MQTT_RECV_TASK_PRIO, (TaskHandle_t*)&MqttRecvTask_Handler);
  35. xTaskCreate(mqttSendThread, (const char* ) "MQTTSendTask", MQTT_SEND_TASK_STACKSIZE, NULL, MQTT_SEND_TASK_PRIO, (TaskHandle_t*)&MqttSendTask_Handler);
  36. }
  37. static void mqttSendThread(void *pvParameters){
  38. /* 定义一个创建信息返回值,默认为pdTRUE */
  39. BaseType_t xReturn = pdTRUE;
  40. /* 定义一个接收消息的变量 */
  41. MQTT_MSG_ITEM recvMqttMsg;
  42. while(1){
  43. xReturn = xQueueReceive(MQTT_Data_Queue, &recvMqttMsg,portMAX_DELAY); /* 等待时间 3000ms */
  44. if(xReturn == pdTRUE){
  45. printf("mqttSendThread: msg=%s len=%d\r\n",recvMqttMsg.msgStr,recvMqttMsg.msgLength);
  46. MQTTMsgPublish((char*)MQTT_SEND_TOPIC,QOS0, 0,(uint8_t *)recvMqttMsg.msgStr,recvMqttMsg.msgLength);
  47. free(recvMqttMsg.msgStr);
  48. }
  49. }
  50. }
  51. #define MAX_TRY_CONNECT_TIMES 15
  52. #define MAX_PING_FAILED_TIMES 3
  53. static void mqttRecvThread(void *pvParameters){
  54. uint32_t curtick=0;
  55. uint8_t no_mqtt_msg_exchange = 1;
  56. int sessionPresent = 0;
  57. uint8_t buf[MSG_MAX_LEN];
  58. int buflen = sizeof(buf),type;
  59. int isMqttServerConnected=0;
  60. int tryConnectServerFailedTime=0;
  61. int tryPingServerFFailedTimes=0;
  62. gprsInit(GPRSPORT,115200);
  63. gprsReboot();
  64. //获取当前滴答,作为心跳包起始时间
  65. curtick = xTaskGetTickCount();
  66. //无线循环
  67. printf("MQTT>>4.开始循环接收订阅的消息...\n");
  68. while(1)
  69. {
  70. if(!isMqttServerConnected){
  71. printf("MQTT>>1.开始创建网络连接...\n");
  72. if((sessionPresent=mqttConnnectServer(HOST_NAME,HOST_PORT,USER_NAME,USER_PASSWD,KEEPLIVE_TIME))<0){
  73. vTaskDelay(3000/portTICK_RATE_MS);
  74. if(tryConnectServerFailedTime++>MAX_TRY_CONNECT_TIMES){
  75. tryConnectServerFailedTime=0;
  76. gprsReboot();
  77. printf("Try max %d time to connect server:%s port:%d failed,then reboot 4G model\n",MAX_TRY_CONNECT_TIMES,HOST_NAME,HOST_PORT);
  78. }
  79. continue;
  80. }
  81. printf("连接MQTT 服务器成功 ...\n");
  82. isMqttServerConnected=1;
  83. tryConnectServerFailedTime=0;
  84. if(sessionPresent!=1){ //订阅消息
  85. if(MQTTSubscribe((char*)MQTT_RECV_TOPIC,QOS0) < 0)
  86. {
  87. //重连服务器
  88. printf("MQTT>>客户端订阅消息失败...\n");
  89. transport_close();
  90. isMqttServerConnected=0;
  91. continue;
  92. }else
  93. printf("订阅主题 %s 成功 \n",MQTT_RECV_TOPIC);
  94. }
  95. curtick = xTaskGetTickCount();
  96. }
  97. //表明无数据交换
  98. no_mqtt_msg_exchange = 1;
  99. //判断MQTT服务器是否有数据
  100. if((type=ReadPacketTimeout(buf,buflen,1000))!=-1)
  101. {
  102. mqtt_pktype_ctl(type,buf,buflen);
  103. //表明有数据交换
  104. no_mqtt_msg_exchange = 0;
  105. //获取当前滴答,作为心跳包起始时间
  106. curtick = xTaskGetTickCount();
  107. }
  108. //这里主要目的是定时向服务器发送PING保活命令
  109. if((xTaskGetTickCount() - curtick) >(pdMS_TO_TICKS(KEEPLIVE_TIME)))
  110. {
  111. curtick = xTaskGetTickCount();
  112. //判断是否有数据交换
  113. if(no_mqtt_msg_exchange == 0)
  114. {
  115. //如果有数据交换,这次就不需要发送PING消息
  116. continue;
  117. }
  118. if(my_mqtt_send_pingreq() < 0){
  119. //重连服务器
  120. printf("MQTT>>发送ping失败....\n");
  121. if(tryPingServerFFailedTimes++>=MAX_PING_FAILED_TIMES){
  122. tryPingServerFFailedTimes=0;
  123. transport_close();
  124. isMqttServerConnected=0;
  125. }
  126. continue;
  127. }
  128. tryPingServerFFailedTimes=0;
  129. printf("MQTT>>发送ping作为心跳成功....\n");
  130. //表明有数据交换
  131. no_mqtt_msg_exchange = 0;
  132. }
  133. }
  134. }

工程源码地址:https://download.csdn.net/download/du2005023029/87505817

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/940767
推荐阅读
相关标签
  

闽ICP备14008679号