赞
踩
补充:
总的来说,IO灰常地形象,就拿钓鱼举例,「鱼竿」就可理解为「文件描述符」,「鱼」就类似于「数据」,「鱼上钩」就类似于「IO事件就绪」,「把鱼放到桶里」就可理解为「拷贝数据」的过程。根据「你」参不参与IO的过程,通常可以分为同步IO和异步IO。
同步IO
我们简单的讨论一下这几种IO模式的效率——
简单总结一下,同步IO参与等和拷贝的过程;而异步IO不参与等和拷贝,只参与发起和接收执行完毕之后的结果的过程。
接口说明
/* 头文件: */ #include <unistd.h> #include <fcntl.h> /* 函数声明: */ int fcntl(int fd, int cmd, ... /* arg */ ); /* 参数: 1.文件描述符。 2.命令选项,通常的有F_GETFL(获取属性),F_SETFL(设置属性), 3.选项,可变参数列表。 其它常见的命令选项: 1.F_DUPFD,返回一个新的文件描述符,但与原文件描述符指向同一个文件。 2.F_GETFD或F_SETFD,用于获取与设置描述符标记,目前只有FD_CLOEXEC,意味在execl时,自动将文件描述符进行关闭。 3.F_GETFL或F_SETFL,用于获取与设置描述符状态属性,常见的有O_APPEND,O_NONBLOCK,O_CREAT,O_RDONLY,O_WRONLY,O_RDWR。 4.F_GETOWN或F_SETOWN,获取与设置异步IO进程拥有者的pid。 5.F_GETLK用于获取锁,F_SETLK用于设置锁,F_SETLKW用于表示没有冲突的锁存在。 返回值: 1. 设置失败,返回一个小于0的数表示。 2. 否则表示成功。 */
下面我们通过代码实现来进一步了解IO模型,并熟练使用此函数。
非阻塞IO
#include<iostream> #include<stdio.h> #include<unistd.h> #include<cstring> int main() { char buff[128]; while(true) { printf("please enter@"); fflush(stdout); //按下ctrl + D,结束输入。 int n = read(0,buff,sizeof(buff) - 1); //1.返回值为0时,这里我们是读到了终端,需要根据。 //2.返回值为-1时,可能读取出错 //3.返回值大于0时,读取成功的字节数。 if(n == 0) { std::cout << "read end of file." << std::endl; break; } else if(n > 0) { //实际上我们按下回车时,本质上就是输入了一个\n。 buff[n-1] = '\0'; std::cout <<"echo: "<< buff << std::endl; } else { std::cout << "error reason is " << strerror(errno) << ", errno code is " << errno << std::endl; break; } } return 0; }
非阻塞式
#include<iostream> #include<stdio.h> #include<unistd.h> #include<fcntl.h> #include<cstring> void SetNoBlock(int fd) { //第一步:获取当前文件描述符的状态。 int n = fcntl(fd, F_GETFL); if(n < 0) { perror("fcntl:F_GETFL:"); return; } //第二步:添加非阻塞模式到当前文件描述符。 n = fcntl(fd,F_SETFL,n | O_NONBLOCK); if(n < 0) { perror("fcntl:F_SETFL:"); } std::cout << "set none block done" << std::endl; } int main() { char buff[128]; SetNoBlock(0); while(true) { //按下ctrl + D,结束输入。 int n = read(0,buff,sizeof(buff) - 1); //1.返回值为0时,读到了文件的结尾。 //2.返回值为-1时,可能读取出错 //3.返回值大于0时,读取成功的字节数。 if(n == 0) { std::cout << "read end of file." << std::endl; break; } else if(n > 0) { //实际上读取时,按下\n会刷新缓存区。 buff[n-1] = '\0'; std::cout <<"echo: "<< buff << std::endl; } else { /* 1.这里当缓存区没有资源时,会出现11号报错,即Resource temporarily unavailable, 这其实不算是报错,只是缓存区暂时还没有资源。 2.11号用宏来表示就是EWOULDBLOCK */ if(errno == EWOULDBLOCK) { std::cout << "do other things...." << std::endl; sleep(1); } else if(errno == EINTR) { //被信号中断了。 continue; } else { std::cout << "read error,error code is " << errno << ", reason is " << strerror(errno) << std::endl; break; } } } return 0; }
实验结果:
说明:
信号驱动式
#include<iostream> #include<cstring> #include<sys/types.h> #include<sys/stat.h> #include<fcntl.h> #include<unistd.h> #include<signal.h> int fd = 0; void handler(int sig) { char buffer[1024]; int n = read(fd,buffer,sizeof(buffer)); if(n == 0) { std::cout << "read end of file" << std::endl; } else if(n > 0) { buffer[n-1] = '\0'; std::cout << "echo: " << buffer << std::endl;; } else { if(errno != EWOULDBLOCK) perror("read:"); } } void SetSigDive(int fd) { //Centos 7下必须设置读取为非阻塞模式,因为打印信息也会向进程发送信号。 //fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK); //ubuntu下要开启异步模式,可以不用设置为非阻塞 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_ASYNC); //设置文件描述符的所有者为当前进程,以便接收信号 int n = fcntl(fd, F_SETOWN, getpid()); if(n < 0) { perror("fcntl:"); return; } //将信号设置进操作系统中 struct sigaction act; memset(&act,0,sizeof(act));//清空信号集。 act.sa_handler = handler; if(sigaction(SIGIO,&act,nullptr) == -1) std::cout << "fail to add signal..." << std::endl; else std::cout << "Set Signal Dive IO Success." << std::endl; } void Sleep(int sec); int main() { SetSigDive(fd); while(true) { std::cout << "do nothing...." << std::endl; Sleep(1); } return 0; } void Sleep(int sec) { sigset_t mask,pre_mask; sigemptyset(&mask); sigaddset(&mask,SIGIO); sigprocmask(SIG_BLOCK,&mask,&pre_mask); sleep(sec); sigprocmask(SIG_SETMASK,&pre_mask,nullptr); }
说明一下:
异步式
#include<iostream> #include<cstring> #include<sys/types.h> #include<sys/stat.h> #include<fcntl.h> #include<unistd.h> #include<signal.h> int fd = 0; void handler(int sig) { char buffer[1024]; int n = read(fd,buffer,sizeof(buffer)); if(n == 0) { std::cout << "read end of file" << std::endl; } else if(n > 0) { buffer[n - 1] = '\0'; std::cout << "echo: " << buffer << std::endl; } else { if(errno != EWOULDBLOCK) perror("read:"); } } int main() { signal(SIGIO,handler);//跟上述的sigaction函数的功能一样,不过用起来更为简单。 fcntl(fd,F_SETFL,fcntl(fd, F_GETFL) | O_ASYNC); pid_t pid = fork(); if(pid == 0) { while(true) { sleep(1); } } else { fcntl(fd,F_SETOWN,pid); while(true) { std::cout << "do other things..." << std::endl; sleep(1); } } return 0; }
如果信号驱动设置为异步没有明白的话,想必这个例子更浅显易懂,父进程获取到子进程的pid之后,将读取信息的任务交给了子进程进行读取,完成任务的发起动作,在之后都是子进程在收到信号并对数据进行拷贝和处理,而父进程则没有参与到这个过程。这是一个非常典型的异步demo,结合之前的理论知识希望能够帮助读者进一步理解异步IO。
接口说明
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); struct timeval { time_t tv_sec; //seconds,秒 suseconds_t tv_usec; //microseconds,毫秒 }; /* 参数: 1:nfds,最大的文件描述符加1,当做左闭右开理解式记忆。 2,3,4: readfds,writefds,exceptfds,输入输出型参数,输入要关心的文件描述符,输出已经就绪的文件描述符。 5:输入输出型参数,输入设置的秒数,输出剩下的秒数,最低为0. 返回值: 1.等于0,表示等待超时,超过设定的时间,且没有文件描述符就绪。 2.大于0,表示已经就绪的文件描述符的个数,不过没啥用。 3.小于0,表示等待过程中出错。 */ //位图操作函数: void FD_CLR(int fd, fd_set *set);//从位图中移除文件描述符。 int FD_ISSET(int fd, fd_set *set);//查看文件描述符在位图中是否存在。 void FD_SET(int fd, fd_set *set);//在位图中设置文件描述符。 void FD_ZERO(fd_set *set);//清空或者初始化位图。 //内核中的fd_set #define __NFDBITS (8 * sizeof(unsigned long)) #define __FD_SETSIZE 1024 #define __FDSET_LONGS (__FD_SETSIZE/__NFDBITS) // 1024 / (8 * 4 or 8) => 2^10^ / 2^5^ or 2^6^ => 32 or 16 typedef struct { unsigned long fds_bits [__FDSET_LONGS]; //32 * 4 or 16 * 8,但总大小都是128*8 = 1024比特位,即最大能存放这么多个文件描述符。 } __kernel_fd_set;
设计思路
实现代码:
#include<unistd.h> #include<sys/types.h> #include<sys/time.h> #include<algorithm> #include "../Tools/Log.hpp" #include "../Tools/Socket.hpp" const int max_fd_nums = 1024; const int max_buffer_size = 10240; class SelectServer { public: SelectServer(uint16_t port = 8080,string ip = "0.0.0.0") :list_fd(port,ip) { //初始化时,将文件描述符数组,全置为-1 memset(fd_array,-1,sizeof(fd_array)); } void Init() { list_fd.Socket(); list_fd.Bind(); list_fd.Listen(); } void Accepter() { //如果就绪直接获取新连接即可 //输出型参数: sockaddr_in client_msg; socklen_t len; int new_fd = list_fd.Accept(&client_msg,&len); if(new_fd < 0) return; //这里就要解决第一个问题:即将关心的读事件添加到下次调用的rset中,这就需要一个中间数组进行存储。 //进行遍历找到空位,即找到值为-1的下标即可。 int pos = 1; while(pos < max_fd_nums) { if(fd_array[pos] == -1) break; } if(pos == max_fd_nums) { //说明没位置了,只能将获取的套接字进行关闭 close(new_fd); } else { fd_array[pos] = new_fd; lg(INFORE,"add a new fd..."); } } void Recver(int fd,int i) { //说明就绪了,直接进行读取即可 char buffer[max_buffer_size] = {0}; int n = read(fd,buffer,sizeof(buffer) - 1); if(n > 0) { buffer[n - 1] = '\0'; //读取成功,打印查看读取的内容 cout <<"echo: "<< buffer << endl; } else if(n == 0) { //说明,读取到文件的结尾,或者客户端的连接关闭了,关闭套接字,将所在位置的值置为-1,让出来即可。 close(fd); fd_array[i] = -1; lg(INFORE,"close fd: %d",fd); } else { //说明读取出错,查看错误原因即可。 lg(WARNNING,"read error, fd is %d ,errno code is %d, reason is %s",fd,errno,strerror(errno)); } } void Dispatcher(fd_set& rset) { //1.首先看监听事件是否就绪。 if(FD_ISSET(list_fd.GetSocket(),&rset)) { Accepter(); } //其次遍历其余就绪的事件进行读数据即可 for(int i = 1; i < max_fd_nums; i++) { int cur_fd = fd_array[i]; if(FD_ISSET(cur_fd,&rset)) { Recver(cur_fd,i); } } } void Run() { //起初就将监听套接字放在0号下标处 fd_array[0] = list_fd.GetSocket(); for(;;) { fd_set rset; //内核提供的128字节的一个定长数组,当做位图进行使用,最多能够容纳128 * 8 = 1024个文件描述符 //1.初始化,将位图清零,并添加初始套接字。 FD_ZERO(&rset); for(auto fd : fd_array) { if(fd != -1) { FD_SET(fd,&rset); } } //select首参数,文件描述符的最大值加1 int nfds = *max_element(fd_array,fd_array + max_fd_nums) + 1; //遍历更新最大值,或者直接用接口 /* struct timeval { time_t tv_sec; //seconds suseconds_t tv_usec; //microseconds }; */ //设置等待时间,这里设置3秒 struct timeval w_time = {3,0}; int n = select(nfds,&rset,nullptr,nullptr,&w_time); /* 返回值: 1.等于0,表示等待时间超时,没有事件就绪。 2.大于0,表示就绪的事件个数。 3.小于0,表示出错。 第二到第四个参数: 1.输入时,为要关心的文件描述符。 2.输出时,为已经就绪的文件描述符。 第五个参数: 1.输入时,为等待的时间,设置为0,表示非阻塞等待。 2.输出时,为剩余的时间。 */ switch (n) { case 0: cout << "time out...." << endl; //下一个循环会重新刷新等待的时间。 break; case -1: cout << "select errnor...." << endl; break; default: //事件就绪,进行处理即可。 cout << "Event is already...." << endl; Dispatcher(rset); break; } } } private: Sock list_fd; int fd_array[max_fd_nums]; };
#include<iostream>
#include<memory>
#include"SelectServer.hpp"
int main()
{
std::unique_ptr<SelectServer> ss_ptr(new SelectServer());
ss_ptr->Init();
ss_ptr->Run();
return 0;
}
注意:
缺陷:
接口说明:
int poll(struct pollfd *fds, nfds_t nfds, int timeout); struct pollfd { int fd; /* 文件描述符 */ short events; /* 所关心的事件 */ short revents; /* 返回时已经就绪的事件 */ }; /* 参数: 1.fds,指的是数组的指针。 2.nfds,指的时数组的最大大小。 3.timeout,等待的时长。 返回值: 1.等于0,表示等待超时,超过设定的时间,且没有文件描述符就绪。 2.大于0,表示已经就绪的文件描述符的个数,不过没啥用。 3.小于0,表示等待过程中出错。 */ /* 常见的事件就绪的情况~ POLLIN:读事件 POLLOUT:写事件 POLLPRI:表示有紧急数据可读取。 POLLNVAL表示无效请求:文件描述符未打开。 当写出错时: POLLERR:发生了错误条件,具体要根据情况分析。 POLLHUP:表示发生了挂断的情况,通常表示被监视的文件描述符已经断开连接,或者管道关闭。 当Linux定义了,_XOPEN_SOURCE时,如下的宏可使用: POLLRDNORM:等效于POLLIN,表示普通数据可读取。 POLLRDBAND:表示优先级带数据可读取,在Linux上通常未使用。 POLLWRNORM:等效于POLLOUT,表示普通数据可写入。 POLLWRBAND:表示可以写入优先级带数据。 当定义了_GNU_SOURCE时,可以使用: POLLRDHUP(自Linux 2.6.17起):表示流套接字的对等端关闭了连接,或者关闭了连接的写一半。 */
设计思路:
说明:如果我们实现过上面的select的话,其实这里的实现就简单了很多。
基本实现:
#include<unistd.h> #include<sys/types.h> #include<sys/time.h> #include<poll.h> #include<algorithm> #include "../Tools/Log.hpp" #include "../Tools/Socket.hpp" const int max_buffer_size = 1024; const int default_event = 0; const int default_fd = -1; const int default_sz = 2; class PollServer { public: PollServer(uint16_t port = 8080,string ip = "0.0.0.0") :list_fd(port,ip),sz(default_sz) { // event_fds = (struct pollfd*)malloc(sizeof(struct pollfd) * sz); event_fds = new struct pollfd[sz]; //进行初始化 for(int i = 0; i < sz; i++) { event_fds[i].events = event_fds[i].revents = default_event; event_fds[i].fd = default_fd; } } ~PollServer() { delete[] event_fds; } void Init() { list_fd.Socket(); //在绑定监听之前使用有效! int opt = 1; int fd = list_fd.GetSocket(); setsockopt(fd,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT\ ,&opt,sizeof(opt)); list_fd.Bind(); list_fd.Listen(); } void Accepter() { //如果就绪直接获取新连接即可 //输出型参数: sockaddr_in client_msg; socklen_t len; int new_fd = list_fd.Accept(&client_msg,&len); event_fds[0].revents = default_event; if(new_fd < 0) return; //这里就要解决第一个问题:即将关心的读事件添加到下次调用的rset中,这就需要一个中间数组进行存储。 //进行遍历找到空位,即找到值为-1的下标即可。 int pos = 1; while(pos < sz) { if(event_fds[pos].fd == default_fd) break; else pos++; } if(pos == sz) { //说明没位置了,进行扩容 int new_sz = sz * 2; struct pollfd* tmp = new struct pollfd[new_sz]; memcpy(tmp,event_fds,sizeof(struct pollfd) * sz); delete[] event_fds; //扩容。 event_fds = tmp; //进行后续的初始化 for(int i = sz; i < new_sz; i++) { event_fds[i].events = event_fds[i].revents = default_event; event_fds[i].fd = default_fd; } //更新sz sz = new_sz; cout << "扩容成功....." << endl; } event_fds[pos].fd = new_fd; event_fds[pos].events = POLLIN; event_fds[pos].revents = default_event; lg(INFORE,"add a new fd..."); } void Recver(int fd,int i) { //说明就绪了,直接进行读取即可 char buffer[max_buffer_size]; int n = read(fd,buffer,sizeof(buffer) - 1); if(n > 0) { buffer[n - 1] = '\0'; //读取成功,打印查看读取的内容 cout <<"echo: "<< buffer << endl; } else if(n == 0) { //说明,读取到文件的结尾,或者客户端的连接关闭了,关闭套接字让出来即可。 close(fd); event_fds[i].fd = default_fd; event_fds[i].events = event_fds[i].revents = default_event; lg(INFORE,"close fd: %d",fd); } else { //说明读取出错,查看错误原因即可。 lg(WARNNING,"read error, fd is %d ,errno code is %d, reason is %s",fd,errno,strerror(errno)); } } void Dispatcher() { if(event_fds[0].revents & POLLIN) { Accepter(); } for(int i = 1; i < sz; i++) { if(event_fds[i].revents & POLLIN) { Recver(event_fds[i].fd,i); } } } void Run() { //起初就将监听套接字放在0号下标处 event_fds[0].fd = list_fd.GetSocket(); event_fds[0].events = POLLIN; int wait_time = 3000;//3000ms == 3s for(;;) { int n = poll(event_fds,sz,wait_time); switch (n) { case 0: cout << "time out...." << endl; break; case -1: cout << "select errnor...." << endl; break; default: //事件就绪,进行处理即可。 cout << "Event is already...." << endl; Dispatcher(); break; } } } private: Sock list_fd; struct pollfd* event_fds; int sz; };
#include<iostream>
#include<memory>
#include"PollServer.hpp"
int main()
{
std::unique_ptr<PollServer> ps_ptr(new PollServer());
ps_ptr->Init();
ps_ptr->Run();
return 0;
}
优点:
缺点:
接口说明:
int epoll_create(int size); /* 参数:设置内核结构的大小,从Linux2.6.8之后参数已经是无效的了,但需要传入一个大于0的数。 返回值:成功返回epfd,用于之后的传参控制要关心的事件。失败返回-1,并设置合适的错误码。 */ int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout); /* 参数: 1.epfd,eppoll_create创建成功之后的返回值。 2.events,输出型参数,返回已经就绪的事件。 3.maxevents,设置的返回就绪事件的最大个数。 4.timeout,调用函数的等待的时间,单位为ms。 返回值: 1.等于0,表示等待超时,超过设定的时间,且没有文件描述符就绪。 2.大于0,表示已经就绪的文件描述符的个数,不过没啥用。 3.小于0,表示等待过程中出错,设置合适的错误码。 */ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); /* 参数: 1.epfd,eppoll_create创建成功之后的返回值。 2.op,要对,对应事件进行的可能操作,如下: EPOLL_CTL_ADD:添加事件。 EPOLL_CTL_MOD:修改事件。 EPOLL_CTL_DEL:删除事件。 3.fd,要操作的文件描述符。 4.events,输入型参数,所要关心的事件。 返回值: 1.成功返回0. 2.失败返回-1,并设置合适的错误码。 */ typedef union epoll_data //1.这是一个联合体,也就意味着,epoll_data可能不仅是描述符,也可能是一个指针,变量。 //2.这样设计拓宽了epoll的使用场景。 { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };
基本原理:
//相关的结构体信息: struct eventpoll { rwlock_t lock;//读写锁 struct rw_semaphore sem;//信号量 wait_queue_head_t wq;//一个被sys_epoll_wait()接口使用的等待队列。 wait_queue_head_t poll_wait; //一个被file->poll()使用的等待队列。 struct list_head rdllist; //已经就绪的文件描述符的队列。 struct rb_root rbr; //红黑树的根结点。 }; struct epitem { struct rb_node rbn;//红黑树的结点。 struct list_head rdllink;//用于将该结构体链接到事件轮询的准备就绪队列。 struct epoll_filefd ffd;//文件指针以及文件描述符的结构体。 int nwait;//等待队列的数量 struct list_head pwqlist;//用于存储轮询等待队列 struct eventpoll *ep;//指向该结构体所属的事件的指针 struct epoll_event event;//关心的事件的相关信息。 atomic_t usecnt;//引用计数。 struct list_head fllink;//文件的链表头,便于访问文件实例。 struct list_head txlink;//文本信息的链表头,便于访问传输信息实例。 unsigned int revents;//收集就绪的事件。 };
形象图解:
说明:
设计思路:
实现代码:
#include<unistd.h> #include<sys/epoll.h> #include<cstring> #include"../Tools/nocopy.hpp" #include"../Tools/Log.hpp" enum { EPOLL_CREATE_FAIL = 3, }; class Epoller : public Nocpy { public: Epoller(int capacity = 64) :_capacity(capacity) { _events = new struct epoll_event[capacity]; _epfd = epoll_create(_init_size); if(_epfd == -1) { lg(CRIT,"epoll_create fail...."); exit(EPOLL_CREATE_FAIL); } } ~Epoller() { delete[] _events; } pair<struct epoll_event*,int> Epoll_Wait() { int sz = epoll_wait(_epfd,_events,_capacity,_timeout); struct epoll_event* ret = sz > 0 ? _events : nullptr; return {ret,sz}; } pair<int,int> Epoll_Parse(const struct epoll_event& event) { return {event.data.fd,event.events}; } struct epoll_event Epoll_Create(int fd,uint32_t event) { struct epoll_event obj; obj.data.fd = fd; obj.events = event; return obj; } void Set_Wait_Way(int choice) { //0为非阻塞,-1为阻塞式等待 _timeout = choice; } int Epoll_Ctl(int fd,int oper,struct epoll_event* event = nullptr) { int ret; if(oper == EPOLL_CTL_DEL) { //这里只是为了强调删除操作,event参数传入空即可,这也是缺省参数给空的原因,外面无需再传。 ret = epoll_ctl(_epfd,oper,fd,event); } else { ret = epoll_ctl(_epfd,oper,fd,event); } if(ret == -1) { lg(ERRO,strerror(errno)); } return ret; } private: int _epfd; struct epoll_event* _events; int _capacity; static const int _init_size = 1; int _timeout = 3000; };
说明
成员变量:
方法:
#include<unistd.h> #include<sys/types.h> #include<sys/time.h> #include<poll.h> #include<algorithm> #include"Epoller.hpp" #include "../Tools/Log.hpp" #include "../Tools/Socket.hpp" const int max_buffer_size = 1024; const int default_event = 0; const int default_fd = -1; const int default_capcity = 64; class EpollServer { public: EpollServer(uint16_t port = 8080,string ip = "0.0.0.0",int cap = default_capcity) :_list_fd_ptr(new Sock(port,ip)),_epoller_ptr(new Epoller(cap)) {} void Init() { _list_fd_ptr->Socket(); //在绑定监听之前使用有效! int opt = 1; _listen_fd = _list_fd_ptr->GetSocket();//初始化_listen_fd的同时,也将端口设为重复使用。 setsockopt(_listen_fd,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt)); _list_fd_ptr->Bind(); _list_fd_ptr->Listen(); } void Accepter() { sockaddr_in client_msg; socklen_t len; int fd = _list_fd_ptr->Accept(&client_msg,&len); if(fd < 0) { lg(ERRO,strerror(errno)); } else { auto event = _epoller_ptr->Epoll_Create(fd,EPOLLIN); _epoller_ptr->Epoll_Ctl(fd,EPOLL_CTL_ADD,&event); } } void Recver(int fd) { char buffer[1024] = {0}; int n = read(fd,buffer,sizeof(buffer) - 1); if(n == 0) { lg(INFORE,"read end of file or client closed connect..."); //注意:在文件描述符有效之前先删除内核中的结构。再关闭文件描述符。 _epoller_ptr->Epoll_Ctl(fd,EPOLL_CTL_DEL); close(fd); } else if(n > 0) { buffer[n-1] = '\0'; lg(INFORE,buffer); } else { lg(ERRO,strerror(errno)); } } void Dispatcher(int sz,struct epoll_event* events) { for(int i = 0; i < sz; i++) { auto[fd,revents] = _epoller_ptr->Epoll_Parse(events[i]); if(revents & EPOLLIN) { if(fd == _listen_fd) { //新连接到了。 Accepter(); } else { //处理就绪事件。 Recver(fd); } } } } void Run() { //将监听套接字加入到内核的红黑树当中 auto event = _epoller_ptr->Epoll_Create(_listen_fd,EPOLLIN); _epoller_ptr->Epoll_Ctl(_listen_fd,EPOLL_CTL_ADD,&event); for(;;) { auto [events,n] = _epoller_ptr->Epoll_Wait(); if(n == 0) { lg(INFORE,"time out..."); } else if(n > 0) { Dispatcher(n,events); sleep(1); } else { lg(ERRO,strerror(errno)); } } } private: unique_ptr<Sock> _list_fd_ptr; int _listen_fd; unique_ptr<Epoller> _epoller_ptr; };
#include<iostream>
#include<functional>
#include<memory>
#include"EpollServer.hpp"
int main()
{
std::unique_ptr<EpollServer> eptr(new EpollServer());
eptr->Init();
eptr->Run();
return 0;
}
优点:
重点:
下面的内容我们将基于ET模式与封装的Epoller类简单实现一个Recator模式的服务器。
在事件驱动编程中,主要包含以下几个概念:
一般的处理模式
特点:
缺点:
单Recator单线程的处理模式
特点:
缺点:
单Recator多线程的处理模式
特点:
基本设计思路
时间检查类
实现框架:
#pragma once #include<time.h> #include<memory> class Connection; class RecatorSvr; enum { MAX_WAIT_TIME = 6, MAX_TIME = 15, }; using s_c_t = std::shared_ptr<Connection>; using fun_t = RecatorSvr*; struct Timer { Timer(fun_t rsvr ,s_c_t con ,time_t outdate) :_rsvr(rsvr),_con(con),_out_date(outdate),_recent_time(outdate) {} void OutOfTimeHandler(); void UpDateAccpetTime(); void UpDateRunTime(); bool IsOutTime(); bool operator < (time_t cur_time); s_c_t _con;//建立连接时,设置引用指向连接。 time_t _recent_time;//最近一次被调用的时间 + MAX_WAIT_TIME,用于堆结构中重新更新时间,。 time_t _out_date;//初始化超时时间戳,后续如果调用的话就更新,超时的话执行对应的异常处理函数即可。 fun_t _rsvr;//指向Recator的回指指针。 };
成员变量:
方法:
说明:博主这里是维护了一个小根堆管理的Timer指针,且排序是根据_out_date进行的,如果在调用时直接进行更新,那么小根堆就失效了,因此我们需要_recent_time保留最新一次的超时时间戳,等到所有事件处理完毕之后,我们再根据_out_date和_recent_time进行统一的更新。具体详见最后给出的Recator关于Timer的实现方法。
功能实现代码:
void OutOfTimeHandler() { _con->_excep(_con); } void UpDateAccpetTime() { _recent_time = time(nullptr) + MAX_TIME; } void UpDateRunTime() { _recent_time = time(nullptr) + MAX_WAIT_TIME; } bool IsOutTime() { //如果不大于,自动将_out_date更新为_recent_time if(_recent_time > time(nullptr)) { _out_date = _recent_time; return false;//说明不超时。 } return true; } bool operator < (time_t cur_time) { return _out_date < cur_time; }
连接管理类
实现框架
//... class Connection; class RecatorSvr; class Timer; using cal_t = std::function<void(std::shared_ptr<Connection>)>; using s_c_t = std::shared_ptr<Connection>; typedef RecatorSvr* fun_t; //链接管理器 class Connection { public: Connection() {} ~Connection() {} //默认的是服务器的配置。 Connection(int fd,fun_t rsvr,cal_t recv = nullptr,cal_t send = nullptr\ ,cal_t excep = nullptr,int port = 8888,const std::string& ip = "0.0.0.0") :_fd(fd),_recv(recv),_send(send),_excep(excep),_port(port),_ip(ip),_rsvr(rsvr) {} cal_t _recv; cal_t _send; cal_t _excep; fun_t _rsvr; Timer* _tp;//用来更新对应的时间。 //初始化_tp成员变量 void InitTime(Timer* tm); //获取成员变量_fd int Getfd(); //追加字符串到_inbuffer之后。 void Append(const std::string& str); //获取_inbuffer,注意这里是引用。 std::string& GetInbuf(); //获取_outbuffer,同理。 std::string& GetOutBuf(); private: //输入输出,一次性存放的地方。 std::string _inbuffer;//a bug: 二进制流的处理问题。 std::string _outbuffer; int _fd; std::string _ip; int _port; };
成员变量:
方法:
方法实现:
void InitTime(Timer* tm) { _tp = tm; } int Getfd() { return _fd; } void Append(const std::string& str) { // std::cout << inbuffer.size() << std::endl; inbuffer = inbuffer + str; // std::cout << inbuffer.size() << std::endl; // std::cout << inbuffer << std::endl; } std::string& GetInbuf() { return inbuffer; } std::string& GetOutBuf() { return outbuffer; }
连接监听类
实现框架
#pragma once #include<iostream> #include<functional> #include"TcpServer.hpp" //监听器。 class Listener { public: //构造函数初始化 Listener(fun_t svr) :_lisptr(new Sock()),_rsvr(svr); //进行连接的获取 void Accepter(s_c_t con); private: std::shared_ptr<Sock> _lisptr; fun_t _rsvr;//回指指针,方便添加连接。 };
成员:
方法:
方法实现代码
Listener(fun_t svr) :_lisptr(new Sock()),_rsvr(svr) { //1.创建绑定监听套接字。 _lisptr->Socket(); int fd = _lisptr->GetSocket(); //对端口号进行复用 int opt = 1; setsockopt(fd,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt)); _lisptr->Bind(); _lisptr->Listen(); //2.将连接添加到服务器管理的队列中。 if(_rsvr != nullptr) { _rsvr->AddConnection(fd,std::bind(&Listener::Accepter,this,std::placeholders::_1), nullptr,std::bind(&RecatorSvr::Exceper,_rsvr,std::placeholders::_1)); lg(INFORE,"listen_fd is added into Recator successfully."); } } void Accepter(s_c_t con) { //设置超时时间为15秒,保证监听套接字是最后退出的。 con->_tp->_recent_time = time(nullptr) + 60; while(true) { sockaddr_in client_msg; socklen_t len; int fd = con->Getfd(); int sock = accept(fd,(sockaddr*)&client_msg,&len); if(sock > 0) { //首先,将网络序列转为主机序列。 uint32_t ip = client_msg.sin_addr.s_addr; uint16_t port = ntohs(client_msg.sin_port); char ip_buff[100] = {0}; inet_ntop(AF_INET,&ip,ip_buff,sizeof(ip_buff) - 1); lg(INFORE,"accept a new link,the user is %s:%d",ip_buff,port); //其次,添加到对应的哈希表和放入到内核当中。 _rsvr->AddConnection(sock,std::bind(&RecatorSvr::Recver,_rsvr,std::placeholders::_1),\ std::bind(&RecatorSvr::Sender,_rsvr,std::placeholders::_1)\ ,std::bind(&RecatorSvr::Exceper,_rsvr,std::placeholders::_1),port,ip_buff); } else { if(errno == EWOULDBLOCK) { //说明连接获取完了。 break; } else if(errno == EINTR) { //说明被异常信号唤醒了。 continue; } lg(ERRO,strerror(errno)); //执行对应的异常处理逻辑。 con->_excep(con); return; } } }
任务类
实现代码:
#pragma once #include<functional> #include<memory> #include"../../Tools/Log.hpp" #include"Connection.hpp" class Connection; struct Task { Task(s_c_t con,cal_t handler = nullptr) :_con(con),_handler(handler) {} void Service() { _handler(_con); } void operator()() { Service(); }; s_c_t _con; cal_t _handler; };
成员变量:
线程池
说明:
实现代码:
#pragma once #include<vector> #include<queue> #include<unistd.h> #include<string> #include<vector> #include<strings.h> #include<cstring> #include<sys/types.h> #include<sys/socket.h> #include<netinet/in.h> #include<arpa/inet.h> #include"Task.hpp" using std::vector; using std::queue; typedef void(*cal)(); class Task; class ThreadPool { public: static ThreadPool* GetInstance() { if(tpool == nullptr) { UnLock(); if(tpool == nullptr) { tpool = new ThreadPool(); } Lock(); } return tpool; } void Lock() { pthread_mutex_lock(&_t_mutex); } void UnLock() { pthread_mutex_unlock(&_t_mutex); } ~ThreadPool() { for(int i = 0; i < _capacity; i++) { pthread_join(tids[i],nullptr); } } void start() { for(int i = 0; i < _capacity; i++) { pthread_create(&tids[i],nullptr,handler,this); } } void Push(const Task& data) { //push这里只有主线程在push,因此没必要加锁。 _que.push(data); pthread_cond_signal(&_t_cond); } static void* handler(void* args) { ThreadPool* ptr = static_cast<ThreadPool*>(args); ptr->_handler(); return nullptr; } void _handler() { while(true) { Lock(); while(_que.empty()) { pthread_cond_wait(&_t_cond,&_t_mutex); } _que.front()();_que.pop(); UnLock(); } } private: static ThreadPool* tpool; ThreadPool(int num = defaultnum) :_capacity(num),tids(num) { pthread_mutex_init(&_t_mutex,nullptr); pthread_cond_init(&_t_cond,nullptr); } const static int defaultnum = 5; //线程的锁和条件变量 pthread_cond_t _t_cond; pthread_mutex_t _t_mutex; queue<Task> _que; //任务的场所 vector<pthread_t> tids; int _capacity; int cnt = 0; }; ThreadPool* ThreadPool::tpool = nullptr;
Reactor
框架:
#pragma once #include"Connection.hpp" #include"Timer.hpp" #include"Task.hpp" #include"threadpool.hpp" namespace cmp { template<class T> struct Greater { bool operator()(const Timer*x,const Timer* y) { return x->_out_date > y->_out_date; } }; } //Reactor服务器 class RecatorSvr { public: //获取单例并给予数据的处理方法。 RecatorSvr(cal_t handler = nullptr) :_eptr(new Epoller()),_thptr(ThreadPool::GetInstance()); ~RecatorSvr(); //添加连接 void AddConnection(int fd,cal_t recv = nullptr,、 cal_t send = nullptr,cal_t excep = nullptr,int port = 8888,const string& ip = "0.0.0.0"); //默认的用户处理信息的方法 void DefaultUsrHandler(s_c_t con); //接收信息的处理函数 void Recver(s_c_t con); //发送信息的处理函数 void Sender(s_c_t con); //对事件进行派发,简称事件派发器。 void Dispatcher(struct epoll_event* events,int sz); //异常处理器 void Exceper(s_c_t con); //用于更新与处理超时时间 void UpDate(); //运行Recator的主逻辑 void Run(); private: //用于实现多路转接 std::shared_ptr<Epoller> _eptr; //用于快速根据文件描述符查找对应的连接 std::unordered_map<int,std::shared_ptr<Connection>> hash; //维护小根堆,用于更新与处理超时时间。 std::priority_queue<Timer*,vector<Timer*>,cmp::Greater<Timer*>> heap; //用户的处理接收信息的函数。 cal_t _usr_handler; //指向线程池的指针用于获取单例 ThreadPool* _thptr; };
说明
成员变量:
方法:
方法实现代码
//Reactor服务器 RecatorSvr(cal_t handler = nullptr) :_eptr(new Epoller()),_thptr(ThreadPool::GetInstance()) { _thptr->start(); if(handler == nullptr) { _usr_handler = std::bind(&RecatorSvr::DefaultUsrHandler,this,std::placeholders::_1); } else { _usr_handler = handler; } } ~RecatorSvr() { lg(INFORE,"~RecatorSvr:%d",__LINE__); } void AddConnection(int fd,cal_t recv = nullptr,cal_t send = nullptr,\ cal_t excep = nullptr,int port = 8888,const string& ip = "0.0.0.0") { if(!hash.count(fd)) { //将文件描述符设置为非阻塞的。 SetNoneBlock(fd); std::shared_ptr<Connection> res(new Connection(fd,this,recv,send,excep,port,ip)); //初始化对应文件描述符的超时时间。这里的time //这里就会出现内存泄漏的问题。 Timer* tp = new Timer(this,res,time(nullptr) + MAX_WAIT_TIME); res->InitTime(tp); heap.push(tp);//这里设置超时时间为6 //在哈希表中生成对应的结构 hash.insert({fd,res}); //在Epoller中进行添加fd。 auto event = _eptr->Epoll_Create(fd,EPOLLIN | EPOLLET);//添加ET模式 _eptr->Epoll_Ctl(fd,EPOLL_CTL_ADD,&event); //将时间更新出来,并添加到堆中。 } else { lg(ERRO,"did not clear completly"); } } void DefaultUsrHandler(s_c_t con) { //将输入缓存区的内容放入到输出缓冲区 string& inbuf = con->GetInbuf(); string& outbuf = con->GetOutBuf(); outbuf += inbuf; inbuf.resize(0); //然后调用Recver进行处理数据 con->_send(con); } void Recver(s_c_t con) { con->_tp->UpDateRunTime(); //更新最近一次调用的时间 for(;;) { char buffer[1024] = {0}; int n = recv(con->Getfd(),buffer,sizeof(buffer) - 1,0); if(n == 0) { lg(INFORE,"client has closed the connect."); //执行对应的异常事件 con->_excep(con); return; } else if(n < 0) { if(errno == EWOULDBLOCK) { lg(INFORE,"nothing to read in Linux"); break; } else if(errno == EINTR) { //说明被异常信号唤醒了。 continue; } lg(ERRO,strerror(errno)); con->_excep(con); return; } else { //将数据追加到输入缓存区当中。 //使用telnet时需要对数据做处理,因为回车是\r\n而不是\n。 const string& str = buffer; // auto pos = str.find("\r\n"); // con.Append(str.substr(0,pos)); con->Append(str); lg(INFORE,"echo: %s",con->GetInbuf().c_str()); } } //读完之后,对数据进行处理。。。。 _usr_handler(con); } //a small Bug? void Sender(s_c_t con) { con->_tp->UpDateRunTime(); for(;;) { int fd = con->Getfd(); string& outbuf = con->GetOutBuf(); int n = send(con->Getfd(),outbuf.c_str(),outbuf.size(),0); if(n < 0) { if(errno == EWOULDBLOCK) { lg(INFORE,"the buffer in Linux has no space."); return; } else if(errno == EINTR) { //说明被异常信号唤醒了。 lg(INFORE,"Sender is called by a signal"); continue; } else { lg(ERRO,strerror(errno)); //执行对应的异常处理函数 con->_excep(con); return; } } else { lg(INFORE,"write %d bytes to Linux buffer in fact.",n); //将已经写入的数据进行丢弃,继续写。 outbuf.erase(0,n);//for debug if(outbuf.size() == 0) { //说明写完了:取消对写事件的关心 auto event = _eptr->Epoll_Create(fd,EPOLLIN | EPOLLET); _eptr->Epoll_Ctl(fd,EPOLL_CTL_MOD,&event); break; } else { lg(INFORE,"the buffer in Linux has no space."); //我们设置对写事件的关心 auto event = _eptr->Epoll_Create(fd,EPOLLOUT | EPOLLET); _eptr->Epoll_Ctl(fd,EPOLL_CTL_MOD,&event); } } } } void Dispatcher(struct epoll_event* events,int sz) { for(int i = 0; i < sz; i++) { auto[fd,event] = _eptr->Epoll_Parse(events[i]); //更新超时时间会出问题的,那小根堆就失效了,我们这里只更新最近一次的调用的时间。 //将异常事件统一转换为读写事件。 if((event & EPOLLERR) || (event & EPOLLHUP)) event |= (EPOLLIN | EPOLLOUT); if(event & EPOLLIN) { // hash[fd]->_recv(hash[fd]);//fix a bug //交给线程池去执行。 _thptr->Push(Task(hash[fd],hash[fd]->_recv)); // } else if(event & EPOLLOUT) { _thptr->Push(Task(hash[fd],hash[fd]->_send)); } } } void Exceper(s_c_t con) { //首先将fd在内核的结构删除。 int fd = con->Getfd(); lg(INFORE,"%d Exceper:delete a fd in rb_tree of Linux.",__LINE__); auto it = hash.find(fd); if(it != hash.end()) { //其次删除在哈希表中的相应的结构 _eptr->Epoll_Ctl(fd,EPOLL_CTL_DEL); hash.erase(it); //最后关闭文件描述符 close(fd); lg(INFORE,"%d Exceper:closed a %d in Linux.",__LINE__,fd); } else { lg(INFORE,"%d Exceper: fd: %d had been delted.",__LINE__,fd); return; } } void UpDate() { vector<Timer*> ans; while(!heap.empty()) { Timer* top = heap.top(); heap.pop(); time_t tim = time(nullptr); //第一次判断是用于更新超时时间 if(*top < tim) { //第二次判断是判断是否真正超时 if(top->IsOutTime()) { //bug,可能客户端断开连接的更新的时候也进行了异常的处理,这里再进行处理就不太合适。 top->OutOfTimeHandler();//执行对应的处理即可。 lg(INFORE,"fd:%d is time out, will be closed.",top->_con->Getfd()); //处理完毕释放对应的空间。 delete top; } else { //最后判断完之后再统一入堆即可 ans.push_back(top); } } else { //说明没有超时的,再将其入堆,直接返回即可。 heap.push(top); break; } } //将更新之后的超时再进行入堆。 for(auto& val : ans) { heap.push(val); } lg(INFORE,"heap’s size is %d.",heap.size()); } void Run() { for(;;) { auto[events,n] = _eptr->Epoll_Wait(); if(n == 0) { //首先在等待超时时处理数据。 lg(INFORE,"time was out...."); } else if(n > 0) { lg(INFORE,"%d events is already.",n); Dispatcher(events,n); //其次在处理完毕时处理数据。 } else { lg(ERRO,strerror(errno)); } //将连接进行更新。 UpDate(); //没有连接了。 if(heap.size() == 0) break; } }
测试模块
说明:文章:网络 —— “?“ (下),中对网络版本计算器的代码进行分模块以及原理的讲解,这里就直接将相关的代码贴出,便于最终的测试。
自定义协议代码:
#pragma once #include<iostream> #include<string> #include<jsoncpp/json/json.h> using std::to_string; using std::stoi; using std::cout; using std::endl; using std::string; // #define SELF 1 char space = ' '; char newline = '\n'; //解码 string Decode(string& str) { int pos = str.find(newline); if(pos == string::npos) return ""; int len = stoi(str.substr(0,pos)); int totalsize = pos + len + 2; if(totalsize > str.size()) { return ""; } //将有效载荷截取出来 string actual_load = str.substr(pos + 1,len); //将完整的报文丢弃,便于下一次进行读取。 str.erase(0,totalsize); return actual_load; } //编码 string InCode(const string& str) { string text = to_string(str.size()) + newline + str + newline; return text; } struct Request { Request(int x,int y,char oper) :_x(x),_y(y),_oper(oper) {} Request() {} bool Deserialize(string& str) { cout << "+++++++++++++++++++++++++++++" << endl; //首先把字符串的报头和有效载荷进行分离 string content = Decode(str); if(content == "") return false; #ifdef SELF //解析字符串:字符 + 空格 + 字符 int left = content.find(space); int right = content.rfind(space); if(left + 1 != right - 1) { //说明是无效的字符 return false; } _x = stoi(content.substr(0,left)); _y = stoi(content.substr(right + 1)); _oper = content[left + 1]; #else Json::Value root; Json::Reader r; r.parse(content,root); _x = root["_x"].asInt(); _y = root["_y"].asInt(); _oper = root["_oper"].asInt(); #endif cout << "解析的字符串:"<< _x << _oper << _y << endl; cout <<"待读取的字符串:" << endl << str << endl; cout << "-------------------------------" << endl; return true; } string Serialize() { // cout << "+++++++++++++++++++++++++++++" << endl; string package; #ifdef SELF //首先对结构体进行编码 //编码格式:字符 + 空格 + 操作符 + 空格 + 字符 package = to_string(_x) + space + _oper + space + to_string(_y); // cout << "有效载荷:" << package << endl; //对报文再进行封装 #else Json::Value root; root["_x"] = _x; root["_y"] = _y; root["_oper"] = _oper; Json::FastWriter w; // Json::StyledWriter sw; package = w.write(root); #endif package = InCode(package); // cout << "报文:"<< package << endl; // cout << "-------------------------------" << endl; return package; } int _x; int _y; char _oper; }; struct Response { Response(int res,int code) :_res(res),_code(code) {} Response() {} bool Deserialize(string& str) { string content = Decode(str); if(content == "") return false; cout << "+++++++++++++++++++++++++++++++" << endl; #ifdef SELF //编码格式:字符 + 空格 + 字符 int pos = content.find(space); _res = stoi(content.substr(0,pos)); _code = stoi(content.substr(pos + 1)); #else Json::Value root; Json::Reader r; r.parse(content,root); _res = root["_res"].asInt(); _code = root["_code"].asInt(); #endif cout <<"转换结果:"<< _res << " " << _code << endl; cout << "待读取的字符串:" << endl << str << endl; cout << "-------------------------------" << endl; return true; } string Serialize() { // cout << "+++++++++++++++++++++++++++++++" << endl; #ifdef SELF //首先对结构体进行编码 //编码格式:字符 + 空格 + 字符 string package = to_string(_res) + space + to_string(_code); #else string package; Json::Value root; root["_res"] = _res; root["_code"] = _code; Json::FastWriter w; package = w.write(root); #endif // cout << "有效载荷:" << endl << package << endl; //对报文再进行封装 package = InCode(package); // cout << "报文:" << endl << package << endl; // cout << "-------------------------------" << endl; return package; } int _res; int _code; };
计算逻辑实现代码:
#pragma once #include<iostream> #include"Log.hpp" #include"protocol.hpp" enum CAL { DIV_ZERO = 1, MOD_ZERO, }; struct CalHelper { string Calculation(string& str) { Request req; if(req.Deserialize(str) == false) return ""; int x = req._x; int y = req._y; char op = req._oper; int res = 0, code = 0; switch(op) { case '+': res = x + y; break; case '-': res = x - y; break; case '*': res = x * y; break; case '/': if(!y) { code = DIV_ZERO; break; } res = x / y; break; case '%': if(!y) { code = MOD_ZERO; break; } res = x % y; break; default: break; } return Response(res,code).Serialize(); } };
主程序Main.cc实现代码:
#include<iostream> #include<memory> #include<functional> #include"TcpServer.hpp" #include"Listener.hpp" #include"../../Tools/servercal.hpp" CalHelper usr_cal; void UsrHandler(s_c_t con) { string& inbuff = con->GetInbuf(); string res = usr_cal.Calculation(inbuff); string& outbuff = con->GetOutBuf(); outbuff += move(res); con->_send(con); } int main() { std::unique_ptr<RecatorSvr> rsvr(new RecatorSvr(UsrHandler)); std::unique_ptr<Listener> lsr(new Listener(&(*rsvr))); rsvr->Run(); return 0; }
恭喜你看到这里!我是舜华,期待与你的下一次相遇!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。