赞
踩
TCP短连接
:就是当客户端与服务端建立起了TCP连接后,以后两端发送数据和回应数据都是短时间迅速回应,并且发送完数据即可快速断开连接TCP长连接
:就是当客户端与服务端建立起了TCP连接后,以后两端发送数据和回应数据可以自己设定时间间隔来回应,并且可以一直保持TCP连接,因此可以长时间连接,例如微信中我与别人通信,所需要的就是两边客户端与微信服务器建立长时间连接,如果不是长时间则将导致两端无法及时和正确接收到所有的消息问题
:在TCP长连接中,因对端非正常退出,而因为TCP是一个虚拟连接,导致本端无法检测连接是否断开,而导致的浪费资源问题心跳机制(HeartBeat)
:就是像心跳一样,定时的去发送心跳包判断对端是否存活,以此来进行判断执行保持连接、断开连接或清理数据等操作概念:
心跳检测机制
也就是解决问题的方法,需要定时发送心跳包来判活,而心跳检测机制通常实现方式有两种:
心跳检测机制实现方式:
- 在应用层实现心跳机制
- 使用TCP中的keepalive选项
- 使用方式1和2结合使用
心跳机制在游戏开发中是经常需要在应用层实现自己的心跳机制,通常是客户端发送心跳包给服务端,服务端检测心跳判断客户端是否存活,如果非存活即清理用户数据(资源清理),断开连接
注意:因此后面的应用层实现都只围绕着客户端发送心跳包,服务端判活机制
作用:
保活
用于在一些需要长连接应用场景(例如即时通讯/IM),来发送心跳包,来让客户端与服务端有一定的来往,将对端进行保活
检测死链
用于解决对端因非正常原因退出(网络问题、宕机或者断电崩溃等),而导致的本端未及时检测出对端状态而导致的"死链"
实现方式优缺点:
在应用层自定义实现
:
优点:
- 灵活自由,可以自定义心跳包的消息结构,可以自定义心跳间隔,而在特殊场景需要,可以携带业务数据
- 通用,可以不再约束于TCP协议,也可以用于UDP中,例如一些IM软件中,使用UDP的时效性,而在应用层保证数据以及连接机制
缺点:
- 复杂,需要自己实现一个心跳机制还是增加了开发的工作量,增加了代码的复杂度
- 资源耗费更多,毕竟在应用层的心跳检测的流量消耗还是很大的
TCP中的keepalive选项
:
优点:
- 简单快捷,代码量少,设置简单,并且底层自动帮你发送心跳包,减少了代码的复杂度
- 资源浪费少,因为TCP中的心跳包不携带数据,带宽资源浪费少
缺点:
- 检活时间太长,也就是定时发送心跳包时间过长(两小时)
应用层+TCP中keepalive
:用于更加细腻的应用场景,TCP keepalive解决网络可用性,应用层keepalive解决网络和服务可用性,应用层无法区别网络还是服务问题,加上tcp心跳包可以进一步区分,有助于客户端做动态调整
实现思路:
心跳检测步骤:
1.服务端每隔一个时间间隔发生一个探测包给客户端
2.服务端发包时启动一个超时定时器
3.客户端端接收到检测包,应该回应一个包
4.如果服务端收到客户端的应答包,则说明客户端正常,删除超时定时器
5.如果服务端端的超时定时器超时,依然没有收到应答包则说明客户端挂了
以下是示例实现,以一个非阻塞ET模式的回声服务器为例。
服务端具体实现:
变量定义
:
- 定义一个心跳类,当作定时器,用于检测超时
- 使用list列表作为超时队列,用于将管理客户端连接的添加、删除以及更新操作
- 使用哈希表映射客户端套接字与在超时队列中对应位置,用来利于更新时间以及从超时队列中删除数据
- 定义超时队列锁,因为超时队列是公共资源,所以我们需要用一个锁用来保护数据的安全性与正确性
插入超时队列以及更新超时队列数据
:
- 当有新连接来时,将其加入到超时队列中
- 当客户端与服务端通信或者服务端接收了客户端的心跳包时,则进行更新数据(从超时队列中删除,然后更新好后加入超时队列队尾)
超时检测函数
:如果当所有客户端都没有与服务端通信的话,以超时队列的特性,先进先出,也就是队首如果没超时则后面的都没超时,如果队首超时,则需要从队首逐一检测,这样可以避免大多数的遍历所有连接的操作。超时的连接就将其踢出
删除超时队列数据函数
:用于客户端断开连接时,服务端将超时队列以及映射表中的客户端连接数据删除
服务端函数:
心跳类
:
//心跳类 class HeartBeat{ public: HeartBeat(int fd,struct timespec abstime); bool timecheck(unsigned int outtime); //判断是否超时,超时返回true int getfd(); private: int fd; struct timespec tm; }; HeartBeat::HeartBeat(int fd,struct timespec abstime){ this->fd=fd; this->tm=abstime; //构造时将连接时的系统时间传入 } bool HeartBeat::timecheck(unsigned int outtime){ struct timespec abstime; clock_gettime(CLOCK_REALTIME,&abstime); if(abstime.tv_sec-this->tm.tv_sec>outtime) return true; else return false; } int HeartBeat::getfd(){ return this->fd; }
变量定义
:
std::list<HeartBeat*>timeque; //超时队列
std::unordered_map<int, std::list<HeartBeat*>::iterator> up; //映射表
pthread_mutex_t quemutex; //超时队列锁
插入超时队列函数
:
//加入超时队列
void insert_que(int fd){
pthread_mutex_lock(&quemutex); //加锁
struct timespec abstime;
clock_gettime(CLOCK_REALTIME,&abstime);
HeartBeat *newheart=new HeartBeat(fd,abstime);
timeque.push_back(newheart);
std::list<HeartBeat*>::iterator it;
it=timeque.end();
it--;
up[fd]=it;
pthread_mutex_unlock(&quemutex); //解锁
return;
}
更新超时队列数据函数
:
void update_que(int fd){
pthread_mutex_lock(&quemutex);
std::list<HeartBeat*>::iterator it;
it=up[fd];
timeque.erase(it);
struct timespec abstime;
clock_gettime(CLOCK_REALTIME,&abstime);
HeartBeat *newheart=new HeartBeat(fd,abstime);
timeque.push_back(newheart);
it=timeque.end();
it--;
up[fd]=it;
pthread_mutex_unlock(&quemutex);
return;
}
检测超时函数
:
void check_out(int epfd){ while(timeque.size()>0){ pthread_mutex_lock(&quemutex); HeartBeat *heart=timeque.front(); bool flag=heart->timecheck(60); if(flag){ timeque.pop_front(); int fd=heart->getfd(); // std::unordered_map<int,std::list<HeartBeat*>::iterator>::iterator it; auto it=up.find(fd); if(it!=up.end()){ up.erase(it); } epoll_event tp; tp.data.fd=fd; tp.events=EPOLLIN|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_DEL,fd,&tp); pthread_mutex_unlock(&quemutex); printf("client[%d] timeout exit\n",fd); }else { pthread_mutex_unlock(&quemutex); break; } } return; }
删除超时队列函数
:
void del_que(int fd){
pthread_mutex_lock(&quemutex);
timeque.erase(up[fd]);
auto it=up.find(fd);
up.erase(it);
pthread_mutex_unlock(&quemutex);
return;
}
客户端具体实现:
变量定义
:
- 定义一个心跳机制配置类,用于设置心跳机制参数的
- 创建一个检测超时、发送心跳包的子线程
- 定义一个用于更新时间的锁,防止检测线程与主线程争抢,保证数据的正确性和安全性
- 定义一个标志位,用来判断服务端关闭,终止客户端运行
检测线程回调函数send_heart
:
- 用来检测客户端与服务端未通信时间,当达到了时间限制,向服务端发心跳包
- 如果未收到服务端回应,判断是否服务端关闭,未关闭则重复定时发送心跳包,发送心跳包超出限制则超时断开连接
- 服务端关闭则客户端断开连接,标志位置为true表示服务端关闭
主线程函数run_loop
:
- 用于阻塞监听终端数据,像服务端发送数据以及接收回应
- 判断标志位是否为true,如果为true则终止客户端允许
客户端函数:
心跳机制配置类
:
//用于超时检测线程的传参
struct heartbg{
int fd; //客户端套接字
timespec tm; //客户端与服务端连接的时间
unsigned int outtime; //设置超时时间
int cnt; //重试次数
unsigned int difftime; //重试间隔
};
变量定义
:
heartbg *cli; //用于检测超时发送心跳包的线程所需的参数
pthread_mutex_t climutex; //用于更新时间的锁,防止检测线程与主线程争抢,导致数据不正确
bool flag=false; //标志位判断服务端关闭,而终止主线程运行
检测线程函数
:
void *send_heart(void *arg){ heartbg *cli=(struct heartbg *)(arg); char buf[BUFSIZ]; int i,n; while (1) { timespec nowtime; clock_gettime(CLOCK_REALTIME,&nowtime); if(nowtime.tv_sec-cli->tm.tv_sec>cli->outtime){ for(i=0;i<cli->cnt+1;++i){ Write(cli->fd,Heartpg,strlen(Heartpg)); printf("发送心跳包[%d]\n",i+1); bzero(&buf,sizeof(buf)); n=Read(cli->fd,buf,sizeof(buf)); if(n==0){ break; } if(strcmp(buf,Heart_ACK)==0){ pthread_mutex_lock(&climutex); printf("收到服务端回应\n"); clock_gettime(CLOCK_REALTIME,&cli->tm); pthread_mutex_unlock(&climutex); break; } sleep(cli->difftime); } if(n==0){ printf("服务端断开连接\n"); flag=true; pthread_mutex_destroy(&climutex); pthread_detach(pthread_self()); pthread_exit(NULL); } //断开连接 if(i==cli->cnt){ printf("超时断开连接\n"); flag=true; pthread_mutex_destroy(&climutex); pthread_detach(pthread_self()); pthread_exit(NULL); //线程自行结束 } } } }
主线程函数
:
void client::run_loop(){ fcntl(STDIN_FILENO,F_SETFL,(fcntl(STDIN_FILENO,F_GETFL))|O_NONBLOCK); flag=false; while (!flag) { if(fgets(buf,BUFSIZ,stdin)!=NULL){ pthread_mutex_lock(&climutex); Write(cfd,buf,strlen(buf)); int ret=Read(cfd,buf,BUFSIZ); if(ret==0){ printf("the other side has been closed.\n"); break; } clock_gettime(CLOCK_REALTIME,&cli->tm); pthread_mutex_unlock(&climutex); Write(STDOUT_FILENO,buf,ret); }else { continue; } } //关闭客户端连接 Close(cfd); }
server代码:
#include "wrap.h" #include <cstdio> #include <ctime> #include <list> #include <pthread.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <sys/epoll.h> #include <strings.h> #include <fcntl.h> #include <errno.h> #include <ctype.h> #include <unordered_map> #include <string.h> #include <stdlib.h> #define SER_PORT 5005 #define OPEN_MAX 5000 #define Heartpg "HeartBeat" #define Heart_ACK "OK" //心跳类 class HeartBeat{ public: HeartBeat(int fd,struct timespec abstime); bool timecheck(unsigned int outtime); //判断是否超时,超时返回true int getfd(); private: int fd; struct timespec tm; }; //非阻塞et模式epoll服务端 class et_ser{ public: et_ser(){}; ~et_ser(){}; void init(); //初始化 void run_loop(); //主程序 private: int lfd,epfd; //监听套接字,以及epoll套接字 char buf[BUFSIZ]; //缓冲区 sockaddr_in ser_addr,cli_addr; //服务端、客户端地址结构 socklen_t cli_addr_len; //客户端地址结构长度 epoll_event tep,ep[OPEN_MAX]; //tep为监听和客户端套接字的监听事件结构体,ep为epoll_wait的传出参数->满足事件的事件结构体数组 }; //加入超时队列 void insert_que(int fd); //收到心跳包,更新超时时间 void update_que(int fd); //超时检测 void check_out(int epfd); //用于删除超时队列中的连接 void del_que(int fd); std::list<HeartBeat*>timeque; //超时队列 std::unordered_map<int, std::list<HeartBeat*>::iterator> up; //映射表 pthread_mutex_t quemutex; //超时队列锁 HeartBeat::HeartBeat(int fd,struct timespec abstime){ this->fd=fd; this->tm=abstime; //构造时将连接时的系统时间传入 } bool HeartBeat::timecheck(unsigned int outtime){ struct timespec abstime; clock_gettime(CLOCK_REALTIME,&abstime); if(abstime.tv_sec-this->tm.tv_sec>outtime) return true; else return false; } int HeartBeat::getfd(){ return this->fd; } //加入超时队列 void insert_que(int fd){ pthread_mutex_lock(&quemutex); //加锁 struct timespec abstime; clock_gettime(CLOCK_REALTIME,&abstime); HeartBeat *newheart=new HeartBeat(fd,abstime); timeque.push_back(newheart); std::list<HeartBeat*>::iterator it; it=timeque.end(); it--; up[fd]=it; pthread_mutex_unlock(&quemutex); //解锁 return; } void update_que(int fd){ pthread_mutex_lock(&quemutex); std::list<HeartBeat*>::iterator it; it=up[fd]; timeque.erase(it); struct timespec abstime; clock_gettime(CLOCK_REALTIME,&abstime); HeartBeat *newheart=new HeartBeat(fd,abstime); timeque.push_back(newheart); it=timeque.end(); it--; up[fd]=it; pthread_mutex_unlock(&quemutex); return; } void check_out(int epfd){ while(timeque.size()>0){ pthread_mutex_lock(&quemutex); HeartBeat *heart=timeque.front(); bool flag=heart->timecheck(60); if(flag){ timeque.pop_front(); int fd=heart->getfd(); // std::unordered_map<int,std::list<HeartBeat*>::iterator>::iterator it; auto it=up.find(fd); if(it!=up.end()){ up.erase(it); } epoll_event tp; tp.data.fd=fd; tp.events=EPOLLIN|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_DEL,fd,&tp); pthread_mutex_unlock(&quemutex); printf("client[%d] timeout exit\n",fd); }else { pthread_mutex_unlock(&quemutex); break; } } return; } void del_que(int fd){ pthread_mutex_lock(&quemutex); timeque.erase(up[fd]); auto it=up.find(fd); up.erase(it); pthread_mutex_unlock(&quemutex); return; } //初始化 void et_ser::init(){ //创建监听套接字,并设置非阻塞 lfd=Socket(AF_INET,SOCK_STREAM|SOCK_NONBLOCK,0); //初始化服务端地址结构 bzero(&ser_addr,sizeof(ser_addr)); ser_addr.sin_family=AF_INET; ser_addr.sin_port=htons(SER_PORT); ser_addr.sin_addr.s_addr=htonl(INADDR_ANY); int opt=1; int ret=setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)); if(ret==-1){ perr_exit("setsockopt error"); } Bind(lfd,(struct sockaddr *)&ser_addr,sizeof(ser_addr)); Listen(lfd,128); epfd=epoll_create(OPEN_MAX); if(epfd==-1) perr_exit("epoll_create error"); tep.events=EPOLLIN; tep.data.fd=lfd; ret=epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&tep); if(ret==-1) perr_exit("EPOLL_CTL_ADD error"); cli_addr_len=sizeof(cli_addr); if(pthread_mutex_init(&quemutex,NULL)!=0){ printf("init quemutex error\n"); exit(1); } return; } void et_ser::run_loop(){ int ret,cfd; while (1) { check_out(epfd); int nready=epoll_wait(epfd,ep,OPEN_MAX,5); if(nready==0) continue; if(nready==-1){ perr_exit("epoll_wait error"); } for(int i=0;i<nready;++i){ if(!(ep[i].events&EPOLLIN)) continue; if(ep[i].data.fd==lfd){ cfd=Accept(lfd,(struct sockaddr *)&cli_addr,&cli_addr_len); printf("recevied from %s at PORT %d\n",inet_ntoa(cli_addr.sin_addr),ntohs(cli_addr.sin_port)); insert_que(cfd); //加入到超时队列中 tep.events=EPOLLIN|EPOLLET; tep.data.fd=cfd; fcntl(cfd,F_SETFL,(fcntl(cfd,F_GETFL))|O_NONBLOCK); ret=epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&tep); if(ret==-1) perr_exit("EPOLL_CTL_ADD error"); }else { cfd=ep[i].data.fd; while (1) { // bzero(&buf,sizeof(buf)); int n=Read(cfd,buf,sizeof(buf)); if(n==0){ ret=epoll_ctl(epfd,EPOLL_CTL_DEL,cfd,NULL); if(ret==-1) perr_exit("EPOLL_CTL_DEL error"); Close(cfd); del_que(cfd); printf("client[%d] closed connection\n",cfd); break; }else if (n<0) { if(errno==EAGAIN||errno==EWOULDBLOCK) break; printf("read n<0 error\n"); ret=epoll_ctl(epfd,EPOLL_CTL_DEL,cfd,NULL); if(ret==-1) perr_exit("EPOLL_CTL_DEL error"); Close(cfd); del_que(cfd); break; }else { update_que(cfd); if(strcmp(buf,Heartpg)==0){ // bzero(&buf,sizeof(buf)); strcpy(buf,Heart_ACK); Write(cfd,buf,strlen(Heartpg)); printf("收到心跳包Heartpg\n"); break; }else { for(i=0;i<n;++i){ buf[i]=toupper(buf[i]); } Write(cfd,buf,n); Write(STDOUT_FILENO,buf,n); } } } } } } Close(lfd); Close(epfd); } int main (int argc, char *argv[]) { et_ser ser; ser.init(); ser.run_loop(); pthread_mutex_destroy(&quemutex); return 0; }
client代码:
#include "wrap.h" #include <fcntl.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include <unistd.h> #include <cstdio> #include <strings.h> #include <string.h> #include <ctime> #include <pthread.h> #define SER_PORT 5005 #define Heartpg "HeartBeat" #define Heart_ACK "OK" class client{ public: client(){}; ~client(){}; void init(); void run_loop(); void stop(); private: int cfd; sockaddr_in ser_addr; char buf[BUFSIZ]; }; //用于超时检测线程的传参 struct heartbg{ int fd; //客户端套接字 timespec tm; //客户端与服务端连接的时间 unsigned int outtime; //设置超时时间 int cnt; //重试次数 unsigned int difftime; //重试间隔 }; heartbg *cli; //用于检测超时发送心跳包的线程所需的参数 pthread_mutex_t climutex; //用于更新时间的锁,防止检测线程与主线程争抢,导致数据不正确 bool flag=false; //标志位判断服务端关闭,而终止主线程运行 void *send_heart(void *arg); //超时检测线程,超时 void client::init(){ bzero(&ser_addr,sizeof(ser_addr)); ser_addr.sin_family=AF_INET; ser_addr.sin_port=htons(SER_PORT); ser_addr.sin_addr.s_addr=htonl(INADDR_ANY); cfd=Socket(AF_INET,SOCK_STREAM,0); int opt=1; int ret=setsockopt(cfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)); if(ret==-1){ perr_exit("setsockopt error"); } Connect(cfd,(struct sockaddr *)&ser_addr,sizeof(ser_addr)); cli=new heartbg; clock_gettime(CLOCK_REALTIME,&cli->tm); cli->fd=cfd; cli->outtime=25; cli->cnt=5; cli->difftime=5; pthread_t tid; pthread_mutex_init(&climutex,NULL); pthread_create(&tid,NULL,send_heart,(void *)cli); return; } void client::run_loop(){ fcntl(STDIN_FILENO,F_SETFL,(fcntl(STDIN_FILENO,F_GETFL))|O_NONBLOCK); flag=false; while (!flag) { if(fgets(buf,BUFSIZ,stdin)!=NULL){ pthread_mutex_lock(&climutex); Write(cfd,buf,strlen(buf)); int ret=Read(cfd,buf,BUFSIZ); if(ret==0){ printf("the other side has been closed.\n"); break; } clock_gettime(CLOCK_REALTIME,&cli->tm); pthread_mutex_unlock(&climutex); Write(STDOUT_FILENO,buf,ret); }else { continue; } } //关闭客户端连接 Close(cfd); } void *send_heart(void *arg){ heartbg *cli=(struct heartbg *)(arg); char buf[BUFSIZ]; int i,n; while (1) { timespec nowtime; clock_gettime(CLOCK_REALTIME,&nowtime); if(nowtime.tv_sec-cli->tm.tv_sec>cli->outtime){ for(i=0;i<cli->cnt+1;++i){ Write(cli->fd,Heartpg,strlen(Heartpg)); printf("发送心跳包[%d]\n",i+1); bzero(&buf,sizeof(buf)); n=Read(cli->fd,buf,sizeof(buf)); if(n==0){ break; } if(strcmp(buf,Heart_ACK)==0){ pthread_mutex_lock(&climutex); printf("收到服务端回应\n"); clock_gettime(CLOCK_REALTIME,&cli->tm); pthread_mutex_unlock(&climutex); break; } sleep(cli->difftime); } if(n==0){ printf("服务端断开连接\n"); flag=true; pthread_mutex_destroy(&climutex); pthread_detach(pthread_self()); pthread_exit(NULL); } //断开连接 if(i==cli->cnt){ printf("超时断开连接\n"); flag=true; pthread_mutex_destroy(&climutex); pthread_detach(pthread_self()); pthread_exit(NULL); //线程自行结束 } } } } int main (int argc, char *argv[]) { client ct; ct.init(); ct.run_loop(); return 0; }
运行结果:
实现原理:
使用SOL_SOCKET协议层下的SO_KEEPALIVE来保持连接
SO_KEEPALIVE功能
:
保持连接检测对方主机是否崩溃,避免(服务器)永远阻塞于TCP连接的输入,设置该选项后,如果两小时内在此套接口的任一方向都没有数据交换,TCP就自动给对方发一个保持存活探测分节。这是一个对方必须响应的TCP分节,它会导致以下三种情况:
- 对方接收一切正常,以期望的ACK响应
- 2小时后,TCP将发出另一个探测分节,对方已崩溃且已重新启动,已RST响应,套接口的待处理错误被置为ECONNRESET,套接口本身则被关闭
- 2小时后,TCP将发出另一个探测分节,对方无任何响应,源自berkelery的TCP发送另外8个探测分节,相隔75秒一个,试图得到一个响应,在发出第一个探测分节11分钟15秒后若仍无响应就放弃,套接口的待处理错误被置为ETIMEOUT,套接口本身则被关闭
实现思路:
实现代码:
Linux下
#include <sys/socket.h> #include <netinet/in.h> int opt=1; int ret=setsockopt(lfd,SOL_SOCKET,SO_KEEPALIVE,&opt,sizeof(opt)); if(ret==-1){ perr_exit("setsockopt error"); } //发送 keepalive 报文的时间间隔 int val = 7200; //默认为7500s,也就是两个小时 setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)); //两次重试报文的时间间隔 int interval = 75; //默认为75s setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(interval)); int cnt = 9; //默认是共9次,总共发送心跳包次数 setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &cnt, sizeof(cnt));
windows下
//开启 keepalive 选项`
const char on = 1;
setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, (char *)&on, sizeof(on);
// 设置超时详细信息
DWORD cbBytesReturned;
tcp_keepalive klive;
// 启用保活
klive.onoff = 1;
//发送 keepalive 报文的时间间隔
klive.keepalivetime = 7200;
// 重试间隔为10秒
klive.keepaliveinterval = 1000 * 10;
WSAIoctl(socket, SIO_KEEPALIVE_VALS, &klive, sizeof(tcp_keepalive), NULL, 0, &cbBytesReturned, NULL, NULL);
我的博客中还有很多笔记欢迎大家进行访问哦
地址1:codebooks.xyz
地址2:moonfordream.github.io
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。