赞
踩
举一个钓鱼的例子
阻塞IO: 在内核将数据准备好之前, 系统调用会一直等待. 所有的套接字, 默认都是阻塞方式.
非阻塞IO: 如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EWOULDBLOCK错误码
非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询. 这对CPU来说是较大的浪费,一般只有特定场景下才使用.
信号驱动IO: 内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作
IO多路转接: 虽然从流程图上看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态.
异步IO: 由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据).
设置为非阻塞
//将fd文件描述符设置为非阻塞状态
void SetNoBlock(int fd)
{
int fl = fcntl(fd, F_GETFL);
if (fl < 0)
{
perror("fcntl");
return;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
轮询方式读取标准输入
#include <iostream> #include <unistd.h> #include <fcntl.h> #include <cstdio> #include <cstring> void SetNoBlock(int fd) { int fl = fcntl(fd, F_GETFL); if (fl < 0) { perror("fcntl"); return; } fcntl(fd, F_SETFL, fl | O_NONBLOCK); } int main() { char buffer[64]; SetNoBlock(0); while (true) { printf(">>"); //fflush(stdout); ssize_t n = read(0, buffer, sizeof(buffer) - 1); if (n > 0) { buffer[n - 1] = 0; std::cout << "echo#" << buffer << std::endl; } else if (n == 0) // ctrl(按键) + d { std::cout << "end file" << std::endl; break; } else { //一旦低层没有数据就绪,以出错的形式返回,但是不算真正的出错。 if (errno == EAGAIN || errno == EWOULDBLOCK) { // 低层数据没有准备好,希望你下次继续来检测 sleep(1);//在这里你可以处理其他事件 continue; } else if (errno == EINTR) { // 这次IO被信号中断,也需要重新读取 continue; } else std::cout << "read error??" << strerror(errno) << "error code: " << errno << std::endl; break; } } return 0; }
阻塞I/O模式
read
系统调用被调用时,它会检查是否有数据可读。如果数据不可用(即,没有数据到达或者缓冲区中没有数据),read
会暂时挂起调用它的进程,直到数据变得可用。read
成功读取数据,它会返回读取的字节数。非阻塞I/O模式
read
系统调用仍然会检查是否有数据可读。但是,如果数据不可用,它不会挂起调用它的进程。read
会立即返回一个错误,通常设置errno为EAGAIN
或EWOULDBLOCK
。这意味着调用者需要在稍后再次尝试读取,或者使用其他机制(如select、poll或epoll)来监测文件描述符上的活动。在非阻塞模式下,read
不会“唤醒”进程,因为进程在read
调用中不会被挂起。相反,进程需要主动检查文件描述符的状态,以确定何时进行读取操作。
认识select
select调用失败时,错误码可能被设置为:
关于fd_set
结构体
typedef struct
{
/* XPG4.2 requires this member name. Otherwise avoid the name
from the global namespace. */
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
} fd_set;
其实这个结构就是一个整数数组, 更严格的说, 是一个 “位图”. 使用位图中对应的位来表示要监视的文件描述符
提供了一组操作fd_set的接口, 来比较方便的操作位图
void FD_CLR(int fd, fd_set *set); // 用来清除描述词组set中相关fd 的位
int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组set中相关fd 的位是否为真
void FD_SET(int fd, fd_set *set); // 用来设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位
select服务器
select服务器,使用时需要程序员自己维护一个第三方的数组,来进行已经获得的sock(文件描述符)进行管理。
// Sock.hpp #pragma once #include <iostream> #include <string> #include <cstdlib> #include <cstring> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/socket.h> #include <unistd.h> class Sock { public: Sock(int ret = -1) :_sock(ret) {} void Socket() { _sock = socket(AF_INET, SOCK_STREAM, 0); if(-1 == _sock) { std::cout<< "socket error" << std::endl; exit(-1); } } void Bind(const uint16_t& port) { struct sockaddr_in local; memset(&local, 0, sizeof(local));//强烈建议做清空 local.sin_family = AF_INET;//通信方式 local.sin_port = htons(port);//端口号 local.sin_addr.s_addr = INADDR_ANY;//ip地址 if(bind(_sock, (struct sockaddr*)&local, sizeof(local)) < 0) { std::cout<< "bind error" << std::endl; exit(-1); } } void Listen() { if(listen(_sock, 0) < 0) { std::cout<< "listen error" << std::endl; exit(-1); } } int Accept(std::string* clientIp, uint16_t* clientPort) { struct sockaddr_in temp; socklen_t len = sizeof(temp); int sock = accept(_sock, (struct sockaddr*)&temp, &len); if(sock < 0) { std::cout<< "accept error" << std::endl; exit(-1); } else { *clientIp = inet_ntoa(temp.sin_addr); *clientPort = ntohs(temp.sin_port); } return sock; } // 输入: const & // 输出: * // 输入输出: & int Connect(std::string& serverIp, uint16_t& serverPort) { struct sockaddr_in server; memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(serverPort); server.sin_addr.s_addr = inet_addr(serverIp.c_str()); return connect(_sock, (struct sockaddr *)&server, sizeof(server)); } int Fd() { return _sock; } void Close() { if (_sock != -1) close(_sock); } ~Sock() {} private: int _sock; }; //selectServer.hpp #pragma once #include <iostream> #include <sys/select.h> #include "Sock.hpp" const static uint16_t gport = 8888; typedef int type_t; static const int defaultfd = -1; class selectServer { static const int N = sizeof(fd_set) *8 ; public: selectServer(uint16_t port = gport) :_port(port) {} void InitServer() { _listenSock.Socket(); _listenSock.Bind(_port); _listenSock.Listen(); for(int i = 0; i < N; ++i) { fdarray[i] = defaultfd; } } void Accepter() { std::cout<< "Accepter();" << std::endl; std::string clientip; uint16_t clientport; int sock = _listenSock.Accept(&clientip,&clientport); if(sock < 0) { return; } int pos = 1; for(; pos < N; ++pos) { if(fdarray[pos] == defaultfd) { fdarray[pos] = sock; break; } } if(pos >= N) { close(sock); std::cerr << "fd_set的位图已经超过了其大小"<<std::endl; } } void HandlerEvent(fd_set &rfds) { for(int i = 0; i < N; ++i) { if(fdarray[i] == defaultfd) continue; if((fdarray[i] == _listenSock.Fd()) && (FD_ISSET(fdarray[i], &rfds)))//将监听套接字文件准备就绪 { Accepter(); } else if((fdarray[i] != _listenSock.Fd()) && (FD_ISSET(fdarray[i], &rfds))) { int fd = fdarray[i]; char buffer[1024]; ssize_t s = recv(fd, buffer, sizeof(buffer) - 1, 0); if (s > 0) { buffer[s-1] = 0; std::cout << "client# " << buffer << std::endl; // 发送回去也要被select管理的,TODO std::string echo = buffer; echo += " [select server echo]"; send(fd, echo.c_str(), echo.size(), 0); } else { if (s == 0) { close(fdarray[i]); fdarray[i] = defaultfd; } else { std::cerr<<errno<< ":"<< strerror(errno) << std::endl; } } } } } void Start() { // 1. 这里我们能够直接获取新的链接吗? // 2. 最开始的时候,我们的服务器是没有太多的sock的,甚至只有一个sock!listensock // 3. 在网络中, 新连接到来被当做 读事件就绪! fdarray[0] = _listenSock.Fd(); while(true) { // 因为rfds是一个输入输出型参数,注定了每次都要对rfds进行重置,重置,必定要知道我历史上都有哪些fd?fdarray[] // 因为服务器在运行中,sockfd的值一直在动态变化,所以maxfd也一定在变化, maxfd是不是也要进行动态更新, fdarray[] fd_set rfds; FD_ZERO(&rfds); int maxfd = fdarray[0]; for (int i = 0; i < N; ++i) { if(fdarray[i] == defaultfd) continue; FD_SET(fdarray[i], &rfds); if(maxfd < fdarray[i]) { maxfd = fdarray[i]; } } int n = select(maxfd + 1,&rfds,nullptr,nullptr,nullptr); switch(n) { case 0: std::cout << "没有文件描述符,就绪" << std::endl; break; case -1: std::cerr<<errno<< ":"<< strerror(errno) << std::endl; break; default: std::cout << "有一个就绪事件发生了" << std::endl; HandlerEvent(rfds); DebugPrint(); break; } } } void DebugPrint() { std::cout << "fdarray[]: "; for (int i = 0; i < N; i++) { if (fdarray[i] == defaultfd) continue; std::cout << fdarray[i] << " "; } std::cout << "\n"; sleep(1); } private: uint16_t _port; Sock _listenSock; type_t fdarray[N]; }; //test.cc #include <iostream> #include <memory> #include "selectServer.hpp" int main() { std::unique_ptr<selectServer> svr(new selectServer()); svr->InitServer(); svr->Start(); return 0; }
select缺点
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
int fd; /* 文件描述符 */
short events; /* 需要监视该文件描述符上的那些事件 */
short revents; /* poll函数返回时告知用户该文件描述符上的那些事件,已经就绪了 */
};
参数说明
events和revents的取值:
事件 | 描述 | 是否可作为输入 | 是否可作为输入 |
---|---|---|---|
POLLIN | 数据(包括普通数据和优先数据)可读 | 是 | 是 |
POLLRDNORM | 普通数据可读 | 是 | 是 |
POLLRDBAND | 优先级带数据可读(Linux不支持) | 是 | 是 |
POLLPRI | 高优先级数据可读,比如TCP带外数据 | 是 | 是 |
POLLOUT | 数据(包括普通数据和优先数据)可写 | 是 | 是 |
POLLWRNORM | 普通数据可写 | 是 | 是 |
POLLWRBAND | 优先级带数据可写 | 是 | 是 |
POLLRDHUP | TCP连接被对方关闭,或者对方关闭了写操作,它由GNU引入 | 是 | 是 |
POLLERR | 错误 | 否 | 是 |
POLLHUP | 挂起。比如管道的写端被关闭后,读端描述符上将收到POLLHUP事件 | 否 | 是 |
POLLNVAL | 文件描述符没有打开 | 否 | 是 |
这些取值实际都是以宏的方式进行定义的,它们的二进制序列当中有且只有一个比特位是1,且为1的比特位是各不相同的。
返回结果
poll服务器
//Sock.hpp #pragma once #include <iostream> #include <string> #include <cstdlib> #include <cstring> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/socket.h> #include <unistd.h> class Sock { public: Sock(int ret = -1) :_sock(ret) {} void Socket() { _sock = socket(AF_INET, SOCK_STREAM, 0); if(-1 == _sock) { std::cout<< "socket error" << std::endl; exit(-1); } } void Bind(const uint16_t& port) { struct sockaddr_in local; memset(&local, 0, sizeof(local));//强烈建议做清空 local.sin_family = AF_INET;//通信方式 local.sin_port = htons(port);//端口号 local.sin_addr.s_addr = INADDR_ANY;//ip地址 if(bind(_sock, (struct sockaddr*)&local, sizeof(local)) < 0) { std::cout<< "bind error" << std::endl; exit(-1); } } void Listen() { if(listen(_sock, 0) < 0) { std::cout<< "listen error" << std::endl; exit(-1); } } int Accept(std::string* clientIp, uint16_t* clientPort) { struct sockaddr_in temp; socklen_t len = sizeof(temp); int sock = accept(_sock, (struct sockaddr*)&temp, &len); if(sock < 0) { std::cout<< "accept error" << std::endl; exit(-1); } else { *clientIp = inet_ntoa(temp.sin_addr); *clientPort = ntohs(temp.sin_port); } return sock; } // 输入: const & // 输出: * // 输入输出: & int Connect(std::string& serverIp, uint16_t& serverPort) { struct sockaddr_in server; memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(serverPort); server.sin_addr.s_addr = inet_addr(serverIp.c_str()); return connect(_sock, (struct sockaddr *)&server, sizeof(server)); } int Fd() { return _sock; } void Close() { if (_sock != -1) close(_sock); } ~Sock() {} private: int _sock; }; //pollServer.hpp #pragma once #include <iostream> #include <sys/poll.h> #include "Sock.hpp" const static uint16_t gport = 8888; typedef struct pollfd type_t; const static int defaultfd = -1; const static int N = 1024; const static short defaultevent = 0; class pollServer { public: pollServer(uint16_t port = gport) :_port(port) {} void InitServer() { _listenSock.Socket(); _listenSock.Bind(_port); _listenSock.Listen(); fdarray = new type_t[N]; for(int i = 0; i < N; ++i) { fdarray[i].fd = defaultfd; fdarray[i].events = defaultevent; fdarray[i].revents = defaultevent; } } void Accepter() { std::string clientip; uint16_t clientport; int sock = _listenSock.Accept(&clientip,&clientport); if(sock < 0) { return; } int pos = 1; for(; pos < N; ++pos) { if(fdarray[pos].fd == defaultfd) { fdarray[pos].fd = sock; fdarray[pos].events = POLLIN; fdarray[pos].revents = defaultevent; break; } } if(pos >= N) { close(sock); std::cerr << "已经超过N了其大小"<<std::endl; } } void HandlerEvent() { for(int i = 0; i < N; ++i) { if(fdarray[i].fd == defaultfd) continue; if((fdarray[i].fd == _listenSock.Fd()) && ((fdarray[i].revents) & POLLIN))//将监听套接字文件准备就绪 { Accepter(); } else if((fdarray[i].fd != _listenSock.Fd()) && ((fdarray[i].revents) & POLLIN)) { int fd = fdarray[i].fd; char buffer[1024]; ssize_t s = recv(fd, buffer, sizeof(buffer) - 1, 0); if (s > 0) { buffer[s-1] = 0; std::cout << "client# " << buffer << std::endl; // 发送回去也要被select管理的,TODO std::string echo = buffer; echo += " [select server echo]"; send(fd, echo.c_str(), echo.size(), 0); } else { if (s == 0) { close(fdarray[i].fd); fdarray[i].fd = defaultfd; fdarray[i].events = defaultevent; fdarray[i].revents = defaultevent; } else { std::cerr<<errno<< ":"<< strerror(errno) << std::endl; } } } } } void Start() { fdarray[0].fd = _listenSock.Fd(); fdarray[0].events = POLLIN; while(true) { //poll内部会对struct pollfd结构体内的文件描述符做甄别,-1跳过。 int n = poll(fdarray,N,-1); switch(n) { case 0: std::cout << "没有文件描述符,就绪" << std::endl; break; case -1: std::cerr<<errno<< ":"<< strerror(errno) << std::endl; break; default: std::cout << "有一个就绪事件发生了" << std::endl; HandlerEvent(); DebugPrint(); break; } } } void DebugPrint() { std::cout << "fdarray[]: "; for (int i = 0; i < N; i++) { if (fdarray[i].fd == defaultfd) continue; std::cout << fdarray[i].fd << " "; } std::cout << "\n"; sleep(1); } ~pollServer() { _listenSock.Close(); if(fdarray) delete[] fdarray; } private: uint16_t _port; Sock _listenSock; type_t* fdarray; }; //test.cc #include <iostream> #include <memory> #include "pollServer.hpp" int main() { // fd_set fd; // std::cout << sizeof(fd) * 8<< std::endl; std::unique_ptr<pollServer> svr(new pollServer()); svr->InitServer(); svr->Start(); return 0; }
poll的缺点
poll中监听的文件描述符数目增多时
epoll_create
函数用于创建一个epoll
文件描述符,这是使用epoll
机制的起点。
int epoll_create(int size);
参数说明:
返回值:
对添加到红黑树中做节点,进行增改删
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数说明:
epoll_create
的返回值。第二个参数的取值:
EPOLL_CTL_ADD
:注册新的fd到epfd中;EPOLL_CTL_MOD
:修改已经注册的fd的监听事件;EPOLL_CTL_DEL
:从epfd中删除一个fdstruct epoll_event结构如下:
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;
events的常用取值如下:
EPOLLIN
:表示对应的文件描述符可以读(包括对端SOCKET正常关闭)。EPOLLOUT
:表示对应的文件描述符可以写。EPOLLPRI
:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)。EPOLLERR
:表示对应的文件描述符发送错误。EPOLLHUP
:表示对应的文件描述符被挂断,即对端将文件描述符关闭了。EPOLLET
:将epoll的工作方式设置为边缘触发(Edge Triggered)模式。EPOLLONESHOT
:只监听一次事件,当监听完这次事件之后,如果还需要继续监听该文件描述符的话,需要重新将该文件描述符添加到epoll模型中。从就绪队列中获取就绪事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数说明:
epoll_create
的返回值。返回值:
当某一进程调用epoll_create
方法时,Linux内核会创建一个eventpoll
结构体,这个结构体中有两个成员与epoll的使用方式密切相关,如下所示:
struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
epoll_ctl
函数中的EPOLL_CTL_ADD
,EPOLL_CTL_MOD
,EPOLL_CTL_DEL
对红黑树进行增改删。epoll服务器
//Sock.hpp #pragma once #include <iostream> #include <string> #include <cstdlib> #include <cstring> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/socket.h> #include <unistd.h> class Sock { public: Sock(int ret = -1) :_sock(ret) {} void Socket() { _sock = socket(AF_INET, SOCK_STREAM, 0); if(-1 == _sock) { std::cout<< "socket error" << std::endl; exit(-1); } } void Bind(const uint16_t& port) { struct sockaddr_in local; memset(&local, 0, sizeof(local));//强烈建议做清空 local.sin_family = AF_INET;//通信方式 local.sin_port = htons(port);//端口号 local.sin_addr.s_addr = INADDR_ANY;//ip地址 if(bind(_sock, (struct sockaddr*)&local, sizeof(local)) < 0) { std::cout<< "bind error" << std::endl; exit(-1); } } void Listen() { if(listen(_sock, 0) < 0) { std::cout<< "listen error" << std::endl; exit(-1); } } int Accept(std::string* clientIp, uint16_t* clientPort) { struct sockaddr_in temp; socklen_t len = sizeof(temp); int sock = accept(_sock, (struct sockaddr*)&temp, &len); if(sock < 0) { std::cout<< "accept error" << std::endl; exit(-1); } else { *clientIp = inet_ntoa(temp.sin_addr); *clientPort = ntohs(temp.sin_port); } return sock; } // 输入: const & // 输出: * // 输入输出: & int Connect(std::string& serverIp, uint16_t& serverPort) { struct sockaddr_in server; memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(serverPort); server.sin_addr.s_addr = inet_addr(serverIp.c_str()); return connect(_sock, (struct sockaddr *)&server, sizeof(server)); } int Fd() { return _sock; } void Close() { if (_sock != -1) close(_sock); } ~Sock() {} private: int _sock; }; //epollServer.hpp #pragma once #include <iostream> #include <sys/epoll.h> #include "Sock.hpp" const static uint16_t gport = 8888; #define GUNM 64 class epollServer { public: epollServer(int epfd = -1,uint16_t port = gport) :_port(port) {} void InitServer() { _listenSock.Socket(); _listenSock.Bind(_port); _listenSock.Listen(); _epfd = epoll_create(1); if(_epfd < 0) { std::cerr<<errno<< "epoll_create errror:"<< strerror(errno) << std::endl; exit(-1); } } void Accepter() { } void HandlerEvent(int n) { for(int i = 0; i < n; ++i) { int fd = fdarry[i].data.fd; uint32_t events = fdarry[i].events; if(events & EPOLLIN) { if(fd == _listenSock.Fd())//1.新连接到来 { std::string temp; uint16_t port; int sock = _listenSock.Accept(&temp, &port); if(sock < 0) { continue; } std::cout<<"这个连接已经连上服务器了" << temp << ": " << sock << std::endl; struct epoll_event event; event.events = EPOLLIN; event.data.fd = sock; // 用户数据,epoll低层不对该数据做任何修改,就是为了给未来就绪返回的。 epoll_ctl(_epfd,EPOLL_CTL_ADD,sock,&event); } else//2.读取事件就绪 { char buffer[1024]; ssize_t s = recv(fd, buffer, sizeof(buffer) - 1, 0);//我们目前无法保证读取到完整的报文。 if (s > 0) { buffer[s - 1] = 0; std::cout << "client# " << buffer << std::endl; // 发送回去也要被select管理的,TODO std::string echo = buffer; echo += " [select server echo]"; send(fd, echo.c_str(), echo.size(), 0); } else { if (s == 0) { //要先从epoll中移除,然后再关闭文件(必须的) epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr); close(fd); std::cout << "要关闭的文件描述符是:" << fd << std::endl; } else { std::cerr << errno << ":" << strerror(errno) << std::endl; } } } } } } void Start() { //1.将listeSock添加到epoll中 struct epoll_event event; event.events = EPOLLIN; event.data.fd = _listenSock.Fd();//用户数据,epoll低层不对该数据做任何修改,就是为了给未来就绪返回的。 int n = epoll_ctl(_epfd,EPOLL_CTL_ADD,_listenSock.Fd(), &event); if(n > 0) { std::cerr<<errno<< "epoll_create errror:"<< strerror(errno) << std::endl; exit(-1); } while(true) { int n = epoll_wait(_epfd, fdarry,GUNM,-1); switch (n) { case 0: break; case -1: break; default: HandlerEvent(n); break; } } } ~epollServer() { _listenSock.Close(); if(_epfd != -1) { close(_epfd); } } private: uint16_t _port; Sock _listenSock; int _epfd; struct epoll_event fdarry[GUNM]; }; //test.cc #include <iostream> #include <memory> #include "epollServer.hpp" int main() { std::unique_ptr<epollServer> svr(new epollServer()); svr->InitServer(); svr->Start(); return 0; }
epoll的优点(和 select 的缺点对应)
EPOLL_CTL_ADD
将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll
都是每次循环都要进行拷贝)epoll_wait
返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响.Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。