赞
踩
这里选择的是SIM7600CE 和EC20 4G通信模块,工作在透传模式
创建GPRS 拨号的通信接口源文件 GprsUtils.c ,实现4G模块 拨号及连接服务器功能
- #include <string.h>
- #include <stdint.h>
- #include "freertos/FreeRTOS.h"
- #include "freertos/task.h"
- #include "GprsUtils.h"
- #include "UartUtils.h"
- #include "driver/gpio.h"
-
- static uint8_t find_string(const char * recvBuff,uint16_t recvBuffLen,const char* p)
- {
- if(recvBuffLen>0&&strstr(recvBuff,p)!=NULL)
- {
- return 1;
- }
- else
- {
- return 0;
- }
- }
- static void delay_ms(int waitTimeMs){
- vTaskDelay(waitTimeMs/portTICK_PERIOD_MS);
-
- }
-
- void gprs_send_string(char* s){
- uartSendBytes(GPRSPORT,(uint8_t *)s,strlen(s));
- }
-
- void gprs_recv_ack(uint8_t *buf,int *len,int wait_time){
- uartRecvBytes(GPRSPORT, buf, len,MAX_UART_RECV_BUFF_SIZE,wait_time);
- }
-
-
- uint8_t gprs_send_cmd(char *b,char *a,uint8_t times,uint16_t wait_time)
- {
- uint8_t i;
- i = 0;
-
- while(i < times)
- {
- char gprsBuf[MAX_UART_RECV_BUFF_SIZE]={0};
- int gprsBufLen=0;
- gprs_send_string(b);
- gprs_send_string("\r\n"); // 回车换行
- gprs_recv_ack((uint8_t *)gprsBuf,&gprsBufLen,wait_time);
- if(find_string(gprsBuf,gprsBufLen,a))
- return 1;
- i++;
- }
- return 0;
- }
-
-
- /******************************************************************************
- * 函数名称:str_delim
- * 描 述:字符串分割函数
- * 输入参数: num 0---取出分隔字符串(delim)前面的字符串 1---取出分隔字符串(delim)后面的字符串
- * temp 要分割的字符串
- * delim 分隔符字符串
- * 输 出:
- * 返 回:
- * 说 明:
- *******************************************************************************/
- char *str_delim(uint8_t num,char *temp,char *delim)
- {
- int i;
- char *str[2]={0};
- char *tok=temp;
- char *restr;
- for(i=0;i<2;i++)
- {
- tok=strtok(tok,delim);
- str[i]=tok;
- tok = NULL;
- }
- restr=str[num];
- return restr;
- }
-
-
-
- uint8_t sim7600CheckStatus(void);
- uint8_t sim7600SelectNet(void);
- uint8_t sim7600NetConfig(void);
- uint8_t sim7600TcpConnect(const char *serverIp,int port);
- uint8_t sim7600ConnectServer(const char *serverIp,int port);
- void sim7600ExitDataMode(void);
- uint8_t sim7600CloseTcpConnect(void);
- uint8_t sim7600CloseNet(void);
-
- /******************************************************************************
- * 函数名称: check_status
- * 描 述: 核心板基本状态测试
- * 输入参数: 无
- * 输 出: 无
- * 返 回: 0 --- 出错 1 --- 正确
- * 说 明: 核心板开机后,先判断AT命令是否正常、能否读到卡、能否注册网络。确认无误后
- * 再进行其他功能测试
- *******************************************************************************/
- uint8_t sim7600CheckStatus(void)
- {
- sim7600ExitDataMode(); // 退出透传模式
- // 同步波特率
- if(!gprs_send_cmd("AT","OK",5,200))
- {
- printf("handshake failed\r\n");
- return 0;
- }
- // 关闭网络,避免后面因为网络已经开启,导致打开网络失败
- gprsCloseNet();
- // 取消回显
- if(!gprs_send_cmd("ATE0","OK",1,200)){
- printf("echo cancelled failed\r\n");
- return 0;
- }
-
- // 查询核心板能否读到SIM卡
- if(!gprs_send_cmd("AT+CPIN?","+CPIN: READY",2,500)){
- printf("no sim card detected\r\n");
- return 0;
- }
-
- if(!gprs_send_cmd("AT+CSQ","+CSQ",2,100))
- {
- printf("serching CSQ failed\r\n");
- return 0;
- }
- if(!gprs_send_cmd("AT+COPS?","OK",2,100))
- {
- printf("get Operator failed\r\n");
- return 0;
- }
-
- // 选择网络
- if(!sim7600SelectNet()) // 第一次选择好网路后,可直接去掉该函数
- return 0;
- return 1;
- }
-
- /******************************************************************************
- * 函数名称: select_net
- * 描 述: 核心板选择网络
- * 输入参数:
- * 输 出:
- * 返 回: 0 --- 出错 1 --- 正确
- * 说 明: SIM卡首次插入核心板测试时,需要选择网络,选择好网络后,如果不换卡,则不需要再执行该函数
- *******************************************************************************/
-
- uint8_t sim7600SelectNet(void)
- {
-
- // 设置APN
- if(!gprs_send_cmd("AT+CGSOCKCONT=1,\"IP\",\"CMNET\"","OK",1,500)){
- printf("set APN failed\r\n");
- return 0;
- }
-
- // 选择网络,可在常量声明中修改
- if(!gprs_send_cmd("AT+CNMP=38","OK",1,200)){
- printf("select network failed\r\n");
- return 0;
- }
- //查询核心板所处网络
- if(!gprs_send_cmd("AT+CNMP?","+CNMP: 38",3,500)){
- printf("register network failed\r\n");
- return 0;
- }
-
-
-
- // 查询核心板是否注册成功
- if(!gprs_send_cmd("AT+CREG?","+CREG: 0,1",5,1000)){
- printf("register network failed\r\n");
- return 0;
- }
- return 1;
- }
-
- /******************************************************************************
- * 函数名称: net_config
- * 描 述: 核心板进行连接前的网络配置
- * 输入参数: 无
- * 输 出: 无
- * 返 回: 0 --- 出错 1 --- 正确
- * 说 明: 判断核心板是否可以进行通信连接
- *******************************************************************************/
- uint8_t sim7600NetConfig(void)
- {
- // 激活启动场景
- if(!gprs_send_cmd("AT+CSOCKSETPN=1","OK",1,200) ){
- printf("activate mobile scene failed\r\n");
- return 0;
- }
-
- // 设置为非透传模式
- if(!gprs_send_cmd("AT+CIPMODE?","+CIPMODE: 1",1,200)){ // 这里不能直接设置非透传模式,避免因为重复设置导致ERROR
-
- if(!gprs_send_cmd("AT+CIPMODE=1","OK",1,200)){
- printf("Transparent transmission mode failed\r\n");
- return 0;
- }
-
- }
-
- // 打开网络
- if(!gprs_send_cmd("AT+NETOPEN","+NETOPEN: 0",1,5000)){ // 不能重复操作,否则会ERROR
- printf("open network failed\r\n");
- return 0;
- }
-
- // 获取本地IP地址
- if(gprs_send_cmd("AT+IPADDR","ERROR",1,2000))
- {
- printf("get local Ipaddress failed\r\n");
- return 0;
- }
-
- /************
- else
- {
- printf("11.获取本地IP成功\r\n");
- printf("%s",str_delim(1,gprsBuf,":")); // 将获取到的本地IP地址打印到串口调试助手
- }
- ************/
- return 1;
- }
-
- /******************************************************************************
- * 函数名称: tcp_connect
- * 描 述: 核心板与云服务器进行TCP透传模式通信
- * 输入参数: 无
- * 输 出: 无
- * 返 回: 0 --- 出错 1 --- 正确
- * 说 明: 无
- *******************************************************************************/
- uint8_t sim7600TcpConnect(const char *serverIp,int port)
- {
- char serverIpCmd[512]={0};
- sprintf(serverIpCmd,"AT+CIPOPEN=0,\"TCP\",\"%s\",%d\r\n",serverIp,port); // TCP服务器IP地址,可自行修改
- // 建立TCP连接
- if(!gprs_send_cmd((char*)serverIpCmd,"CONNECT",2,3000)){
- printf("set Tcp Transparent transmission mode failed \r\n");
- return 0;
- }
- return 1;
- }
-
-
- uint8_t sim7600ConnectServer(const char *serverIp,int port){
- if(!sim7600CheckStatus()) // 判断核心板状态是否正常
- {
- printf("*** check_status failed ***\r\n");
- return 0;
- }else
- printf("*** check_status ok ***\r\n");
- if(!sim7600NetConfig()){ // 判断通信连接是否正常
- printf("*** net_config failed ***\r\n");
- return 0;
- }else
- printf("*** net_config OK ***\r\n");
-
- return sim7600TcpConnect(serverIp,port);
- }
-
-
- /********************************************************************************
- * 函数名称: exit_data_mode
- * 描 述: 退出数据模式
- * 输入参数: 无
- * 输 出: 无
- * 返 回: 无
- * 说 明: 发送“+++”,确保核心板退出透传数据模式,前后1s延时
- *******************************************************************************/
- void sim7600ExitDataMode(void)
- {
- char gprsBuf[MAX_UART_RECV_BUFF_SIZE]={0};
- int gprsBufLen=0;
- delay_ms(1000);
- gprs_send_string("+++");
- gprs_recv_ack((uint8_t *)gprsBuf,&gprsBufLen,1000);
- }
-
- /********************************************************************************
- * 函数名称: close_tcp_connect
- * 描 述: 关闭TCP连接
- * 输入参数: 无
- * 输 出: 无
- * 返 回: 无
- * 说 明: 关闭TCP连接
- *******************************************************************************/
- uint8_t sim7600CloseTcpConnect(void)
- {
- // 退出透传数据模式
- sim7600ExitDataMode();
- delay_ms(1500);
- // 关闭TCP连接
- if(!gprs_send_cmd("AT+CIPCLOSE=0","OK",2,2000)){ // 关闭连接耗时较长(+CIPCLOSE: 0,0),可根据实际情况调整
- printf("close tcp port failed\n");
- return 0;
- }
- return 1;
- }
-
-
-
- uint8_t sim7600CloseNet(void){
- // 关闭网络
- sim7600CloseTcpConnect();
- if(!gprs_send_cmd("AT+NETCLOSE","+NETCLOSE:",5,3000))
- {
- printf("close network failed\r\n");
- return 0;
- }
- return 1;
- }
-
-
-
-
- /EC20 Start/
-
-
- uint8_t ec20ConnectServer(const char *serverIp,int port);
- uint8_t ec20CloseNet(void);
-
- uint8_t ec20ConnectServer(const char *serverIp,int port){
- printf("ec20ConnectServer:%s %d\n",serverIp,port);
- if(!gprs_send_cmd("AT","OK",2,1000)){
- printf("handshake failed\r\n");
- return 0;
- }
- // 查询核心板能否读到SIM卡
- if(!gprs_send_cmd("AT+CPIN?","+CPIN: READY",2,1000)){
- printf("no sim card detected\r\n");
- return 0;
- }
- if(!gprs_send_cmd("AT+CSQ","+CSQ",2,1000))
- {
- printf("serching CSQ failed\r\n");
- return 0;
- }
- // 查询核心板是否注册成功
- if(!gprs_send_cmd("AT+CREG?","+CREG: 0,1",5,1000)){
- printf("register GSM network failed\r\n");
- return 0;
- }
- if(!gprs_send_cmd("AT+CGREG?","OK",5,1000)){
- printf("register GPRS network failed\r\n");
- return 0;
- }
- // 设置APN
- if(!gprs_send_cmd("AT+QICSGP=1,1,\"CMNET\"","OK",2,1000)){
- printf("set APN failed\r\n");
- return 0;
- }
-
- if(!gprs_send_cmd("AT+QIDEACT=1","OK",2,1000)){
- printf("Deactivates a PDP context\r\n");
- return 0;
- }
- if(!gprs_send_cmd("AT+QIACT=1","OK",2,1000)){
- printf("activates a PDP context\r\n");
- return 0;
- }
-
- char serverIpCmd[512]={0};
- sprintf(serverIpCmd,"AT+QIOPEN=1,0,\"TCP\",\"%s\",%d,0,2\r\n",serverIp,port); // TCP服务器IP地址,可自行修改
- // 建立TCP连接
- if(!gprs_send_cmd((char*)serverIpCmd,"CONNECT",2,3000)){
- printf("set Tcp Transparent transmission mode failed \r\n");
- return 0;
- }
- return 1;
-
-
- }
-
- uint8_t ec20CloseNet(void){
-
- sim7600ExitDataMode();
- delay_ms(2000);
- // 关闭TCP连接
- if(!gprs_send_cmd("AT+QICLOSE=0","OK",2,2000)){ // 关闭连接耗时较长(+CIPCLOSE: 0,0),可根据实际情况调整
- printf("close tcp port failed\n");
- return 0;
- }
- return 1;
- }
-
- /EC20 End/
-
-
-
-
- //#define SIM7600CE_4G 1
- #define EC20_4G 1
-
-
-
-
-
- void gprsInit(int uartPort,uint32_t bound){
- uart_init(uartPort,bound);
- gprsPower(1);
- gprsReset(0);
- delay_ms(2000);
- }
-
- uint8_t gprsTryConnectServer(const char *serverIp,int port){
- #if SIM7600CE_4G
- return sim7600ConnectServer(serverIp,port);
- #else
- return ec20ConnectServer(serverIp,port);
- #endif
-
- }
-
- uint8_t gprsCloseNet(void)
- {
- #if SIM7600CE_4G
- return sim7600CloseNet();
- #else
- return ec20CloseNet();
- #endif
- }
-
- int gprsTcpWrite(uint8_t *data,int len){
- return uartSendBytes(GPRSPORT,data,len);
- }
-
- int gprsTcpRead(uint8_t *recvbuf,int maxRecvLen,int waitTime){
- int readLen=0;
- uartRecvBytes(GPRSPORT, recvbuf, &readLen,maxRecvLen,waitTime);
- return readLen;
- }
- int gprsTcpAvailable(int waitTime){
- return uartHasDataReceived(GPRSPORT, waitTime);
- }
-
- void gprsReboot(void){
- gpio_set_level(SIM_RST_Pin,1);
- delay_ms(2000);
- gpio_set_level(SIM_RST_Pin, 0);
- delay_ms(20000);
- }
-
- void gprsPower(uint8_t isOn){
- gpio_set_level(SIM_PEN_Pin,isOn);
- }
- void gprsReset(uint8_t isOn){
- gpio_set_level(SIM_RST_Pin, isOn);
- }
- void gprsRepowerOn(void) {
- gpio_set_level(SIM_PEN_Pin,1);
- delay_ms(4000);
- }
- void gprsRepowerOff(void) {
- gpio_set_level(SIM_PEN_Pin,0);
- delay_ms(2000);
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
移植MQTT 通信协议
从https://github.com/eclipse/paho.mqtt.embedded-c/tree/master/MQTTPacket 下载MQTT通信源码包, 将src目录下所有文件拷贝到自己的工程目录下,然后移植samples目录下的transport.c
- #include "transport.h"
- #include "lwip/opt.h"
- #include "lwip/arch.h"
- #include "lwip/api.h"
- #include "lwip/inet.h"
- #include "lwip/sockets.h"
- #include "string.h"
- #include "../GprsUtils/GprsUtils.h"
-
- static int mysock;
-
- /************************************************************************
- ** 函数名称: transport_sendPacketBuffer
- ** 函数功能: 以TCP方式发送数据
- ** 入口参数: unsigned char* buf:数据缓冲区
- ** int buflen:数据长度
- ** 出口参数: <0发送数据失败
- ************************************************************************/
- int transport_sendPacketBuffer( uint8_t* buf, int buflen)
- {
- return gprsTcpWrite(buf,buflen);
- }
-
- /************************************************************************
- ** 函数名称: transport_getdata
- ** 函数功能: 以阻塞的方式接收TCP数据
- ** 入口参数: unsigned char* buf:数据缓冲区
- ** int count:数据长度
- ** 出口参数: <=0接收数据失败
- ************************************************************************/
- int transport_getdata(uint8_t* buf, int count)
- {
- return gprsTcpRead(buf,count,500);
- }
-
-
- /************************************************************************
- ** 函数名称: transport_open
- ** 函数功能: 打开一个接口,并且和服务器 建立连接
- ** 入口参数: char* servip:服务器域名
- ** int port:端口号
- ** 出口参数: <0打开连接失败
- ************************************************************************/
- int transport_open(const char *servip, int port)
- {
- return gprsTryConnectServer(servip,port);
- }
-
-
- /************************************************************************
- ** 函数名称: transport_close
- ** 函数功能: 关闭套接字
- ** 入口参数: unsigned char* buf:数据缓冲区
- ** int buflen:数据长度
- ** 出口参数: <0发送数据失败
- ************************************************************************/
- int transport_close(void)
- {
- return gprsCloseNet();
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
创建MQTT通信接口源文件,包含连接MQTT服务器、订阅、发布,接收
- #include <string.h>
- #include "freertos/FreeRTOS.h"
- #include "freertos/task.h"
- #include "MqttUtils.h"
- #include "../GprsUtils/GprsUtils.h"
-
-
-
-
-
- /************************************************************************
- ** 函数名称: MQTTClientInit
- ** 函数功能: 初始化客户端并登录服务器
- ** 入口参数: int sock:网络描述符
- ** 出口参数: >=0:发送成功 <0:发送失败
- ** 备 注:
- ************************************************************************/
- static int MQTTClientInit(const char *username,const char *password,int keepalive){
- MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
- uint8_t buf[100];
- int buflen = sizeof(buf);
- int len;
- uint8_t sessionPresent,connack_rc;
-
-
- //创建MQTT客户端连接参数
- connectData.willFlag = 0;
- //MQTT版本
- connectData.MQTTVersion = 4;
- //客户端ID--必须唯一
- connectData.clientID.cstring = "MIMITOTO";
- //保活间隔
- connectData.keepAliveInterval = keepalive;
- if(username&&password){
- //用户名
- connectData.username.cstring = username;
- //用户密码
- connectData.password.cstring = password;
- }
- //清除会话
- connectData.cleansession = 1;
-
- //串行化连接消息
- len = MQTTSerialize_connect(buf, buflen, &connectData);
- //发送TCP数据
- if(transport_sendPacketBuffer(buf, len) < 0)
- return -1;
- if(gprsTcpAvailable(2000)<0)
- return -2;
- if(MQTTPacket_read(buf, buflen, transport_getdata) != CONNACK)
- return -3;
- //拆解连接回应包
- if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
- return -4;
-
- if(sessionPresent == 1)
- return 1;//不需要重新订阅--服务器已经记住了客户端的状态
- else
- return 0;//需要重新订阅
- }
-
-
- int mqttConnnectServer(const char *servip, int port,const char *username,const char *password,int keepalive){
- int sessionPresent=-1;
- if(!transport_open(servip,port))
- return -1;
- if((sessionPresent=MQTTClientInit(username,password,keepalive))<0){
- transport_close();
- return -1;
- }
- return sessionPresent;
- }
-
-
- /************************************************************************
- ** 函数名称: ReadPacketTimeout
- ** 函数功能: 阻塞读取MQTT数据
- ** 入口参数: int sock:网络描述符
- ** uint8_t *buf:数据缓存区
- ** int buflen:缓冲区大小
- ** uint32_t timeout:超时时间--0-表示直接查询,没有数据立即返回
- ** 出口参数: -1:错误,其他--包类型
- ** 备 注:
- ************************************************************************/
- int ReadPacketTimeout(uint8_t *buf,int buflen,uint32_t timeout)
- {
- if(timeout != 0)
- {
- if(gprsTcpAvailable(timeout)<0)
- return -1;
- }
- //读取TCP/IP事件
- return MQTTPacket_read(buf, buflen, transport_getdata);
- }
-
- /************************************************************************
- ** 函数名称: MQTTSubscribe
- ** 函数功能: 订阅消息
- ** 入口参数: int sock:套接字
- ** char *topic:主题
- ** enum QoS pos:消息质量
- ** 出口参数: >=0:发送成功 <0:发送失败
- ** 备 注:
- ************************************************************************/
- int MQTTSubscribe(char *topic,enum QoS pos){
- static uint32_t PacketID = 0;
- uint16_t packetidbk = 0;
- int conutbk = 0;
- uint8_t buf[100];
- int buflen = sizeof(buf);
- MQTTString topicString = MQTTString_initializer;
- int len;
- int req_qos,qosbk;
-
- //复制主题
- topicString.cstring = (char *)topic;
- //订阅质量
- req_qos = pos;
-
- //串行化订阅消息
- len = MQTTSerialize_subscribe(buf, buflen, 0, PacketID++, 1, &topicString, &req_qos);
- //发送TCP数据
- if(transport_sendPacketBuffer(buf, len) < 0)
- return -1;
- if(gprsTcpAvailable(2000) <0 ){
- return -2;
- }
- //等待订阅返回--未收到订阅返回
- if(MQTTPacket_read(buf, buflen, transport_getdata) != SUBACK)
- return -4;
-
- //拆订阅回应包
- if(MQTTDeserialize_suback(&packetidbk,1, &conutbk, &qosbk, buf, buflen) != 1)
- return -5;
-
- //检测返回数据的正确性
- if((qosbk == 0x80)||(packetidbk != (PacketID-1)))
- return -6;
- //订阅成功
- return 0;
- }
-
-
-
-
- /************************************************************************
- ** 函数名称: GetNextPackID
- ** 函数功能: 产生下一个数据包ID
- ** 入口参数: 无
- ** 出口参数: uint16_t packetid:产生的ID
- ** 备 注:
- ************************************************************************/
- static uint16_t GetNextPackID(void)
- {
- static uint16_t pubpacketid = 0;
- return pubpacketid++;
- }
-
- /************************************************************************
- ** 函数名称: WaitForPacket
- ** 函数功能: 等待特定的数据包
- ** 入口参数: int sock:网络描述符
- ** uint8_t packettype:包类型
- ** uint8_t times:等待次数
- ** 出口参数: >=0:等到了特定的包 <0:没有等到特定的包
- ** 备 注:
- ************************************************************************/
- static int WaitForPacket(uint8_t packettype,uint8_t times)
- {
- int type;
- uint8_t buf[MSG_MAX_LEN];
- uint8_t n = 0;
- int buflen = sizeof(buf);
- do
- {
- //读取数据包
- type = ReadPacketTimeout(buf,buflen,2000);
- if(type != -1)
- mqtt_pktype_ctl(type,buf,buflen);
- n++;
- }while((type != packettype)&&(n < times));
- //收到期望的包
- if(type == packettype)
- return 0;
- else
- return -1;
- }
-
-
- int MQTTMsgPublish(char *topic, char qos, char retained,uint8_t* msg,uint32_t msg_len){
- uint8_t buf[MSG_MAX_LEN];
- int buflen = sizeof(buf),len;
- MQTTString topicString = MQTTString_initializer;
- uint16_t packid = 0,packetidbk=0;
-
- //填充主题
- topicString.cstring = (char *)topic;
- //填充数据包ID
- if((qos == QOS1)||(qos == QOS2))
- {
- packid = GetNextPackID();
- }
- else
- {
- qos = QOS0;
- retained = 0;
- packid = 0;
- }
-
- //推送消息
- len = MQTTSerialize_publish(buf, buflen, 0, qos, retained, packid, topicString, (unsigned char*)msg, msg_len);
- if(len <= 0)
- return -1;
- if(transport_sendPacketBuffer(buf, len) < 0)
- return -2;
-
- //质量等级0,不需要返回
- if(qos == QOS0)
- {
- return 0;
- }
-
- //等级1
- if(qos == QOS1)
- {
- //等待PUBACK
- if(WaitForPacket(PUBACK,5) < 0)
- return -3;
- return 1;
-
- }
- //等级2
- if(qos == QOS2)
- {
- //等待PUBREC
- if(WaitForPacket(PUBREC,5) < 0)
- return -3;
- //发送PUBREL
- len = MQTTSerialize_pubrel(buf, buflen,0, packetidbk);
- if(len == 0)
- return -4;
- if(transport_sendPacketBuffer(buf, len) < 0)
- return -6;
- //等待PUBCOMP
- if(WaitForPacket(PUBREC,5) < 0)
- return -7;
- return 2;
- }
- //等级错误
- return -8;
- }
-
-
- int my_mqtt_send_pingreq(){
- int len;
- uint8_t buf[200];
- int buflen = sizeof(buf);
- len = MQTTSerialize_pingreq(buf, buflen);
- transport_sendPacketBuffer(buf, len);
- if(gprsTcpAvailable(5000)<0)
- return -1;
- if(MQTTPacket_read(buf, buflen, transport_getdata) != PINGRESP)
- return -2;
- return 0;
- }
- int mqttHasDataIn(int waitTime){
- return gprsTcpAvailable(waitTime);
- }
-
-
-
-
- 消息处理/
-
-
- /************************************************************************
- ** 函数名称: deliverMessage
- ** 函数功能: 接受服务器发来的消息
- ** 入口参数: MQTTMessage *msg:MQTT消息结构体
- ** MQTT_USER_MSG *mqtt_user_msg:用户接受结构体
- ** MQTTString *TopicName:主题
- ** 出口参数: 无
- ** 备 注:
- ************************************************************************/
- static void deliverMessage(MQTTString *TopicName,MQTTMessage *msg,MQTT_USER_MSG *mqtt_user_msg)
- {
- //消息质量
- mqtt_user_msg->msgqos = msg->qos;
- //保存消息
- memcpy(mqtt_user_msg->msg,msg->payload,msg->payloadlen);
- mqtt_user_msg->msg[msg->payloadlen] = '\0';
- //保存消息长度
- mqtt_user_msg->msglenth = msg->payloadlen;
- //消息主题
- memcpy((char *)mqtt_user_msg->topic,TopicName->lenstring.data,TopicName->lenstring.len);
- mqtt_user_msg->topic[TopicName->lenstring.len] = 0;
- //消息ID
- mqtt_user_msg->packetid = msg->id;
- //标明消息合法
- mqtt_user_msg->valid = 1;
- }
-
- /************************************************************************
- ** 函数名称: UserMsgCtl
- ** 函数功能: 用户消息处理函数
- ** 入口参数: MQTT_USER_MSG *msg:消息结构体指针
- ** 出口参数: 无
- ** 备 注:
- ************************************************************************/
- void UserMsgCtl(MQTT_USER_MSG *msg)
- {
-
-
- //这里处理数据只是打印,用户可以在这里添加自己的处理方式
- printf("MQTT>>****收到客户端自己订阅的消息!!****\n");
- //返回后处理消息
- switch(msg->msgqos)
- {
- case 0:
- printf("MQTT>>消息质量:QoS0\n");
- break;
- case 1:
- printf("MQTT>>消息质量:QoS1\n");
- break;
- case 2:
- printf("MQTT>>消息质量:QoS2\n");
- break;
- default:
- printf("MQTT>>错误的消息质量\n");
- break;
- }
- printf("MQTT>>消息主题:%s\n",msg->topic);
- printf("MQTT>>消息类容:%s\n",msg->msg);
- printf("MQTT>>消息长度:%d\n",msg->msglenth);
- //处理完后销毁数据
- msg->valid = 0;
-
- char MqttSendbuf[512]={0};
- static int timeCount=0;
- sprintf(MqttSendbuf,"Hello server %d",timeCount++);
- MQTTMsgPublish("mqttSendTopic",QOS0, 0,(uint8_t *)MqttSendbuf,strlen(MqttSendbuf));
- }
-
-
-
- /************************************************************************
- ** 函数名称: mqtt_pktype_ctl
- ** 函数功能: 根据包类型进行处理
- ** 入口参数: uint8_t packtype:包类型
- ** 出口参数: 无
- ** 备 注:
- ************************************************************************/
- void mqtt_pktype_ctl(uint8_t packtype,uint8_t *buf,uint32_t buflen)
- {
- MQTTMessage msg;
- int rc;
- MQTTString receivedTopic;
- uint32_t len;
- MQTT_USER_MSG mqtt_user_msg;
- switch(packtype)
- {
- case PUBLISH:
- //拆析PUBLISH消息
- if(MQTTDeserialize_publish(&msg.dup,(int*)&msg.qos, &msg.retained, &msg.id, &receivedTopic,(unsigned char **)&msg.payload, &msg.payloadlen, buf, buflen) != 1)
- return;
- //接受消息
- deliverMessage(&receivedTopic,&msg,&mqtt_user_msg);
-
- //消息质量不同,处理不同
- if(msg.qos == QOS0)
- {
- //QOS0-不需要ACK
- //直接处理数据
- UserMsgCtl(&mqtt_user_msg);
- return;
- }
- //发送PUBACK消息
- if(msg.qos == QOS1)
- {
- len =MQTTSerialize_puback(buf,buflen,mqtt_user_msg.packetid);
- if(len == 0)
- return;
- //发送返回
- if(transport_sendPacketBuffer(buf,len)<0)
- return;
- //返回后处理消息
- UserMsgCtl(&mqtt_user_msg);
- return;
- }
-
- //对于质量2,只需要发送PUBREC就可以了
- if(msg.qos == QOS2)
- {
- len = MQTTSerialize_ack(buf, buflen, PUBREC, 0, mqtt_user_msg.packetid);
- if(len == 0)
- return;
- //发送返回
- transport_sendPacketBuffer(buf,len);
- }
- break;
- case PUBREL:
- //解析包数据,必须包ID相同才可以
- rc = MQTTDeserialize_ack(&msg.type,&msg.dup, &msg.id, buf,buflen);
- if((rc != 1)||(msg.type != PUBREL)||(msg.id != mqtt_user_msg.packetid))
- return ;
- //收到PUBREL,需要处理并抛弃数据
- if(mqtt_user_msg.valid == 1)
- {
- //返回后处理消息
- UserMsgCtl(&mqtt_user_msg);
- }
- //串行化PUBCMP消息
- len = MQTTSerialize_pubcomp(buf,buflen,msg.id);
- if(len == 0)
- return;
- //发送返回--PUBCOMP
- transport_sendPacketBuffer(buf,len);
- break;
- case PUBACK://等级1客户端推送数据后,服务器返回
- break;
- case PUBREC://等级2客户端推送数据后,服务器返回
- break;
- case PUBCOMP://等级2客户端推送PUBREL后,服务器返回
- break;
- default:
- break;
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
创建MQTT 任务
- #include <stdint.h>
- #include "freertos/FreeRTOS.h"
- #include "freertos/task.h"
- #include "string.h"
- #include "freertos/queue.h"
- #include "malloc.h"
-
- #include "taskMqttClient.h"
- #include "../MQTT/transport.h"
- #include "../MQTT/MQTTPacket.h"
- #include "../MqttUtils/MqttUtils.h"
- #include "../GprsUtils/GprsUtils.h"
-
-
- #define HOST_NAME "183.220.123.35"
- #define HOST_PORT 17788
- #define USER_NAME "jdtf1"
- #define USER_PASSWD "jdtf1"
-
-
-
- #define MQTT_RECV_TOPIC "mqttRecvTopic"
- #define MQTT_SEND_TOPIC "mqttSendTopic"
-
-
-
-
-
-
-
-
- #define MQTT_RECV_TASK_STACKSIZE (configMINIMAL_STACK_SIZE*8)
- #define MQTT_RECV_TASK_PRIO 4
- static TaskHandle_t MqttRecvTask_Handler;
- static void mqttRecvThread(void *pvParameters);
-
- #define MQTT_SEND_TASK_STACKSIZE (configMINIMAL_STACK_SIZE*8)
- #define MQTT_SEND_TASK_PRIO 5
- static TaskHandle_t MqttSendTask_Handler;
- static void mqttSendThread(void *pvParameters);
-
-
- QueueHandle_t MQTT_Data_Queue =NULL;
- #define MQTT_QUEUE_LEN 10 /* 队列的长度,最大可包含多少个消息 */
- #define MQTT_QUEUE_SIZE sizeof(MQTT_MSG_ITEM ) /* 队列中每个消息大小(字节) */
- void mqtt_thread_init(void){
- /* 创建Test_Queue */
- MQTT_Data_Queue = xQueueCreate((UBaseType_t ) MQTT_QUEUE_LEN,(UBaseType_t ) MQTT_QUEUE_SIZE);/* 消息的大小 */
- if(NULL != MQTT_Data_Queue)
- printf("The MQTT_Data_Queue was created successfully!\r\n");
- xTaskCreate(mqttRecvThread, (const char* ) "MQTTRecvTask", MQTT_RECV_TASK_STACKSIZE, NULL, MQTT_RECV_TASK_PRIO, (TaskHandle_t*)&MqttRecvTask_Handler);
- xTaskCreate(mqttSendThread, (const char* ) "MQTTSendTask", MQTT_SEND_TASK_STACKSIZE, NULL, MQTT_SEND_TASK_PRIO, (TaskHandle_t*)&MqttSendTask_Handler);
- }
-
-
- static void mqttSendThread(void *pvParameters){
- /* 定义一个创建信息返回值,默认为pdTRUE */
- BaseType_t xReturn = pdTRUE;
- /* 定义一个接收消息的变量 */
- MQTT_MSG_ITEM recvMqttMsg;
- while(1){
- xReturn = xQueueReceive(MQTT_Data_Queue, &recvMqttMsg,portMAX_DELAY); /* 等待时间 3000ms */
- if(xReturn == pdTRUE){
- printf("mqttSendThread: msg=%s len=%d\r\n",recvMqttMsg.msgStr,recvMqttMsg.msgLength);
- MQTTMsgPublish((char*)MQTT_SEND_TOPIC,QOS0, 0,(uint8_t *)recvMqttMsg.msgStr,recvMqttMsg.msgLength);
- free(recvMqttMsg.msgStr);
- }
- }
- }
-
-
- #define MAX_TRY_CONNECT_TIMES 15
- #define MAX_PING_FAILED_TIMES 3
- static void mqttRecvThread(void *pvParameters){
-
- uint32_t curtick=0;
- uint8_t no_mqtt_msg_exchange = 1;
- int sessionPresent = 0;
- uint8_t buf[MSG_MAX_LEN];
- int buflen = sizeof(buf),type;
- int isMqttServerConnected=0;
- int tryConnectServerFailedTime=0;
- int tryPingServerFFailedTimes=0;
- gprsInit(GPRSPORT,115200);
- gprsReboot();
- //获取当前滴答,作为心跳包起始时间
- curtick = xTaskGetTickCount();
- //无线循环
- printf("MQTT>>4.开始循环接收订阅的消息...\n");
- while(1)
- {
-
- if(!isMqttServerConnected){
- printf("MQTT>>1.开始创建网络连接...\n");
- if((sessionPresent=mqttConnnectServer(HOST_NAME,HOST_PORT,USER_NAME,USER_PASSWD,KEEPLIVE_TIME))<0){
- vTaskDelay(3000/portTICK_RATE_MS);
- if(tryConnectServerFailedTime++>MAX_TRY_CONNECT_TIMES){
- tryConnectServerFailedTime=0;
- gprsReboot();
- printf("Try max %d time to connect server:%s port:%d failed,then reboot 4G model\n",MAX_TRY_CONNECT_TIMES,HOST_NAME,HOST_PORT);
- }
- continue;
- }
- printf("连接MQTT 服务器成功 ...\n");
- isMqttServerConnected=1;
- tryConnectServerFailedTime=0;
- if(sessionPresent!=1){ //订阅消息
- if(MQTTSubscribe((char*)MQTT_RECV_TOPIC,QOS0) < 0)
- {
- //重连服务器
- printf("MQTT>>客户端订阅消息失败...\n");
- transport_close();
- isMqttServerConnected=0;
- continue;
- }else
- printf("订阅主题 %s 成功 \n",MQTT_RECV_TOPIC);
- }
- curtick = xTaskGetTickCount();
- }
- //表明无数据交换
- no_mqtt_msg_exchange = 1;
- //判断MQTT服务器是否有数据
- if((type=ReadPacketTimeout(buf,buflen,1000))!=-1)
- {
- mqtt_pktype_ctl(type,buf,buflen);
- //表明有数据交换
- no_mqtt_msg_exchange = 0;
- //获取当前滴答,作为心跳包起始时间
- curtick = xTaskGetTickCount();
-
- }
-
- //这里主要目的是定时向服务器发送PING保活命令
- if((xTaskGetTickCount() - curtick) >(pdMS_TO_TICKS(KEEPLIVE_TIME)))
- {
- curtick = xTaskGetTickCount();
- //判断是否有数据交换
- if(no_mqtt_msg_exchange == 0)
- {
- //如果有数据交换,这次就不需要发送PING消息
- continue;
- }
-
- if(my_mqtt_send_pingreq() < 0){
- //重连服务器
- printf("MQTT>>发送ping失败....\n");
- if(tryPingServerFFailedTimes++>=MAX_PING_FAILED_TIMES){
- tryPingServerFFailedTimes=0;
- transport_close();
- isMqttServerConnected=0;
- }
- continue;
- }
- tryPingServerFFailedTimes=0;
- printf("MQTT>>发送ping作为心跳成功....\n");
- //表明有数据交换
- no_mqtt_msg_exchange = 0;
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
工程源码地址:https://download.csdn.net/download/du2005023029/87505817
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。