赞
踩
我们之前学到的所有关于IO的接口都可以称作拷贝接口,以send为例子
以接收为例子,如果缓冲区满了,就通告对方,然后对方就不发数据了,我的缓冲区为什么会满呢?无非就是上层来不及取或者取的太慢了,数据来的太快导致缓冲区被打满了。
当缓冲区满了,如何让发送方尽快发呢?肯定就是通知上层尽快把数据取走。我们可以认为是一种生产消费者模型,其中缓冲区就是交易场所:
输入就是操作系统将数据从外设拷贝到内存的过程,操作系统一定要通过某种方法得知特定外设上是否有数据就绪:
首先需要明确的是:并不是操作系统想要从外设读取数据时外设上就一定有数据。
但操作系统不会主动去检测外设上是否有数据就绪,这种做法一定会降低操作系统的工作效率。
操作系统实际采用的是中断的方式来得知外设上是否有数据就绪的,当某个外设上面有数据就绪时,该外设就会向CPU当中的中断控制器发送中断信号,中断控制器再根据产生的中断信号的优先级按顺序发送给CPU。
每一个中断信号都有一个对应的中断处理程序,存储中断信号和中断处理程序映射关系的表叫做中断向量表,当CPU收到某个中断信号时就会自动停止正在运行的程序,然后根据该中断向量表执行该中断信号对应的中断处理程序,处理完毕后再返回原被暂停的程序继续运行。
需要注意的是,CPU不直接和外设打交道指的是在数据层面上,而外设其实是可以直接将某些控制信号发送给CPU当中的某些控制器的。
操作系统任何时刻都可能会收到大量的数据包,因此操作系统必须将这些数据包管理起来,所谓的管理就是“先描述,再组织”。在内核当中有一个结构叫做sk_buff,该结构就是用来管理和控制接收或发送数据包的信息的。
(1)如下是一个简化版的sk_buff结构:
struct sk_buff
{
char* transport_header;
char* network_header;
char* mac_header;
char* data;
struct sk_buff* next;
struct sk_buff* prev;
};
(2)当操作系统从网卡当中读取到一个数据包后,会将该数据依次交给链路层、网络层、传输层、应用层进行解包和分用,最终将数据包中的数据交给了上层用户,如下是sk_buff结构是如何进行数据包的解包和分用的过程:
(3)发送数据时对数据进行封装也是同样的道理,就是依次在数据前面拷贝上对应的报头,最后再将数据发送出去(UDP)或拷贝到发送缓冲区(TCP)即可。也就是说,数据包在进行封装和解包的过程中,本质数据的存储位置是没有发生变化的,我们实际只是在用不同的指针对数据进行操作而已。
但内核中的sk_buff并不像不是那样简单:
当程序运行到IO函数时一般都在阻塞式等待,这也算作IO过程中的一个环节,但真正意义上的IO就是讲数据拷贝到缓冲区中,任何IO过程都要包含两个步骤:等待和拷贝 即: IO = 等待+数据拷贝。
要提高IO的效率主要分为两步:
任何IO的过程,都包含“等”和“拷贝”这两个步骤,在实际的应用场景中,等待消耗的时间往往都远远高于拷贝的时间所以提高IO效率的本质是:尽可能地减少单位时间内“等待”的比重。
提高IO效率的方式有两种:改变等待的方式和减少等待的比重高效的IO本质就是减少单位时间内“等待”的比重
(1)我们以钓鱼为例子:IO的过程其实和钓鱼是非常类似的。
(2)在谈论高效的IO之前,我们先来看看什么样的钓鱼方式才是高效的,下面给出五个人的钓鱼方式:
(3)那么张三、李四、王五的钓鱼效率是否一样呢?为什么?
其实张三、李四、王五的钓鱼效率本质上是一样的。
因此张三、李四、王五他们三个人的钓鱼的效率是一样的,他们只是等鱼上钩的方式不同而已,张三是死等,李四是定期检测浮漂,而王五是通过铃铛来判断是否有鱼上钩。
注意:这里问的是他们的钓鱼效率是否是一样的,而不是问他们整体谁做的事最多,如果说整体做事情的量的话,那一定是王五做得最多,李四次之,张三最少。
(4)张三、李四、王五它们三个人分别和赵六比较,谁的钓鱼效率更高?
赵六毫无疑问是这四个人当中钓鱼效率最高的,因为赵六同时在等多个鱼竿上有鱼上钩,因此在单位时间内,赵六的鱼竿有鱼上钩的概率是最大的。
而高效的钓鱼就是要减少单位时间内“等”的时间,增加“拷贝”的时间,所以说赵六的钓鱼效率是这四个人当中最高的。赵六的钓鱼效率之所以高,是因为赵六一次等待多个鱼竿上的鱼上钩,此时就可以将“等”的时间进行重叠。
(5)如何看待田七的这种钓鱼方式
(6)这五个人的钓鱼方式分别对应的就是五种IO模型:
IO模型 | 简单对比解释 |
---|---|
阻塞IO | 一心一意地等待数据到来 |
非阻塞IO | 三心二意地轮询式等待 |
信号驱动 | 信号递达时再来读取或写入数据 |
多路转接 | 让大批线程等待,自身读取数据 |
异步通信IO | 让其他进程或线程进行等待和读取,自身获取结果 |
阻塞IO,非阻塞IO和信号驱动IO本质上是不能提高IO的效率的,但非阻塞IO和信号驱动IO能提高整体做事的效率。
上述钓鱼的例子中:钓鱼的河对应就是内核,这里的每一个人都是进程或线程,鱼竿对应的就是文件描述符fd或套接字,装鱼的桶对应的就是用户缓冲区。
(7)如何区分同步IO和异步IO
阻塞IO就是在内核还没将数据准备好,系统调用会一直等待,阻塞IO是最常见的IO模型,所有的套接字默认就是阻塞等待:
以阻塞方式进行IO操作的进程或线程,在“等”和“拷贝”没有结束期间都不会返回,在用户看来就像是阻塞住了,因此我们称之为阻塞IO。
(1)非阻塞IO:如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回EWOULDBLOCK错误码。
非阻塞IO往往需要程序员以循环的方式反复尝试读写文件描述符,这个过程称为轮询,这对CPU来说是较大的浪费,一般只有特定场景下才使用:
(2)阻塞IO和非阻塞IO的区别:
(1)信号驱动IO就是当内核将数据准备好的时候,使用SIGIO信号通知应用程序进行IO操作:
当底层数据就绪的时候会向当前进程或线程递交SIGIO信号(29号信号----可以通告kill -l进行查看所有的信号),因此可以通过signal或sigaction函数进行自定义捕捉,将SIGIO的信号处理程序自定义为需要进行的IO操作,当底层数据就绪时就会自动执行对应的IO操作。
(2)信号的产生是异步的,但信号驱动IO是同步IO的一种。
信号驱动只使用了recv的拷贝功能,等待数据的过程是异步的(信号的产生是异步的),但拷贝数据是同步的。
IO多路转接也叫做IO多路复用,能够同时等待多个文件描述符的就绪状态:
IO多路转接的思想:
IO多路转接就像现实生活中的黄牛一样,只不过IO多路转接更像是帮人排队的黄牛,因为多路转接接口实际并没有帮我们进行数据拷贝的操作,这些排队黄牛可以一次帮多个人排队,此时就将多个人排队的时间进行了重叠。
异步IO就是由内核在数据拷贝完成时,通知应用程序 (异步IO不参与等待和拷贝的过程):
我们将缓冲区变量提供给异步接口,接口会等待并将数据放到缓冲区中,并通知进程,进程可以直接处理数据,并不会参与任何IO过程,所以是异步的。
异步IO系统提供有一些对应的系统接口,但大多使用复杂,也不建议使用,异步IO也有更好的替代方案。
(1)同步和异步关注的是消息通信机制:
(2)为什么非阻塞IO在没有得到结果之前就返回了
因此在进行非阻塞IO时,在没有得到结果之前,虽然这个调用会返回,但后续还需要继续进行轮询检测,因此可以理解成调用还没有返回,而只有当某次轮询检测到数据就绪,并且完成数据拷贝后才认为该调用返回了。
(3)同步IO通信 VS 同步与互斥
在多进程和多线程当中有同步与互斥的概念,但是这里的同步通信和进程或线程之间的同步是完全不相干的概念。
因此当看到“同步”这个词的时候,一定要先明确这个同步是同步通信的同步,还是同步与互斥的同步。
(1)阻塞和非阻塞关注的是程序在等待调用结果(消息、返回值)时的状态:
注意:如果非阻塞调用没有获取到数据时,是以出错的形式返回的,但并不算真正的错误,通过错误码errno区分出错和条件未就绪。
(1)系统中大部分的接口都是阻塞式接口。比如我们可以用read函数从标准输入(键盘)当中读取数据,然后使用write函数输出到标准输出(屏幕)当中:
#include<iostream>
#include<unistd.h>
using namespace std;
int main()
{
while(1)
{
char buffer[1024] = {0};
size_t s = read(0,buffer,sizeof(buffer)-1);
if(s > 0)
{
buffer[s] = 0;
write(1,buffer,sizeof(buffer));
}
else
{
cout << "read errno"<<endl;
break;
}
}
return 0;
}
(2)程序运行后,如果我们不进行输入操作,此时该进程就会阻塞住,根本原因就是因为此时底层数据不就绪,因此read函数需要进行阻塞等待:
(3)我们也可以发现当前进程的状态是S状态,也就是放到了等待队列当中。
(4)一旦我们进行了输入操作,此时read函数就会检测到底层数据就绪,然后立马将数据读取到从内核拷贝到我们传入的buffer数组当中,并且调用write函数,将读取到的数据输出到显示器上面,最后我们就看到了我们输入的字符串。
打开文件时默认都是以阻塞的方式打开的,如果要以非阻塞的方式打开某个文件,需要在使用open函数打开文件时携带O_NONBLOCK或O_NDELAY选项,此时就能够以非阻塞的方式打开文件。
如果要将已经打开的某个文件或套接字设置为非阻塞,此时就需要用到fcntl函数。
(1)函数原型:
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fildes, int cmd, ...); //可变参数列表
(2)参数说明:
(3)fcntl函数常用的5种功能与其对应的cmd取值如下:
(4)返回值:
(5)使用例子:我们可以定义一个函数,该函数就用于将指定的文件描述符设置为非阻塞状态:
void SetNonBlock(int fd)
{
int f1 = fcntl(fd,F_GETFL);//获取文件描述符对应的文件状态标记
if(f1 < 0)
{
perror("fcntl");
return ;
}
fcntl(fd,F_SETFL,f1 | O_NONBLOCK);//添加非阻塞标记 O_NONBLOCK,对文件状态标记进行设置
}
此时就将该文件描述符设置为了非阻塞状态。
(6)以非阻塞轮询方式读取标准输入err版本:
此时在调用read函数读取标准输入之前,调用SetNonBlock函数将0号文件描述符设置为非阻塞就行了。
int main()
{
SetNonBlock(0);//将0号描述符设置为非阻塞状态
while(1)
{
char buffer[1024] = {0};
size_t s = read(0,buffer,sizeof(buffer)-1);//少读取一个,是为了在最后的位置放\0
if(s > 0)
{
buffer[s] = 0;
write(1,buffer,sizeof(buffer));
printf("read success: s:%d errno:%d \n",s,errno);
}
else
{
printf("read failed: s:%d errno:%d \n",s,errno);
}
sleep(1);
}
return 0;
}
注意:上面的是错误的!如果非阻塞调用没有获取到数据时,是以出错的形式返回的,但并不算真正的错误,通过错误码errno区分出错和条件未就绪。
(7)区别真正的出错和数据没有就绪的"出错":
②因此在以非阻塞方式读取数据时,如果调用read函数时得到的返回值是-1,此时还需要通过错误码进一步进行判断,如果错误码的值是EAGAIN或EWOULDBLOCK,说明本次调用read函数出错是因为底层数据还没有就绪,因此后续还应该继续调用read函数进行轮询检测数据是否就绪,当数据继续时再进行数据的读取。
③调用read函数在读取到数据之前可能会被其他信号中断,此时read函数也会以出错的形式返回,此时的错误码会被设置为EINTR,此时应该重新执行read函数进行数据的读取。
(8)正确的非阻塞轮询的代码:
int main()
{
SetNonBlock(0);//将0号描述符设置为非阻塞状态
while (true)
{
char buffer[1024] = {0};
ssize_t s = read(0, buffer, sizeof(buffer)-1);//少读取一个,是为了在最后的位置放\0
if(s > 0)
{
buffer[s] = 0;
write(1,buffer,sizeof(buffer));
printf("read success: s:%d errno:%d \n",s,errno);
}
else
{
if (errno == EAGAIN || errno == EWOULDBLOCK)//底层数据没有就绪
{
printf("数据没有准备好,再试一下吧!\n");
printf("read failed: s:%d errno:%d \n",s,errno);
sleep(1);
continue;
}
else if (errno == EINTR)//在读取数据之前被信号中断
{
printf("数据被信号中断了,再试一下吧!\n");
sleep(1);
continue;
}
else
{
printf("读取错误!\n");
break;
}
}
}
return 0;
}
采用0号描述符进行操作是因为0号描述符必须要用户参与,必须得有数据输入,条件才能就绪。
IO事件就绪可以理解为两方面:一是读事件就绪,一是写事件就绪,可以认为接收缓冲区有数据就是读事件就绪,发送缓冲区里无数据就是写事件就绪。一旦等的事件就绪,我们就可以进行拷贝/读取了。
但事实上,频繁读写系统内核中的发送接收缓冲区会进行状态切换,期间会进行一系列的处理工作,会带来效率的下降,所以一般接收缓冲区中设有高水位,发送缓冲区中设有低水位的概念。
当超过"水位"才进行发送/通知上层读取。
系统提供select函数来实现多路复用输入/输出模型:
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
(1)参数解释:
(2)参数timeout的设置:
struct timeval timeout = {0, 0} //表示非阻塞
struct timeval timeout = {5, 0} //5秒以内阻塞式,超过5秒,非阻塞返回一次
(4)fd_set类型解析
①在使用select函数时,就免不了用到fd_set结构体。那fd_set就是怎么样的?下面我们先看在man手册中关于select:
②那么fd_set究竟是什么?
typedef long int __fd_mask;
/* It's easier to assume 8-bit bytes than to get CHAR_BIT. */
#define __NFDBITS (8 * (int) sizeof (__fd_mask))
#define __FDELT(d) ((d) / __NFDBITS)
#define __FDMASK(d) ((__fd_mask) 1 << ((d) % __NFDBITS))
/* fd_set for select and pselect. */
typedef struct
{
/* XPG4.2 requires this member name. Otherwise avoid the name
from the global namespace. */
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
} fd_set;
/* Maximum number of file descriptors in `fd_set'. */
#define FD_SETSIZE __FD_SETSIZE //__FD_SETSIZE等于1024
/* Access macros for `fd_set'. */
#define FD_SET(fd, fdsetp) __FD_SET (fd, fdsetp)
#define FD_CLR(fd, fdsetp) __FD_CLR (fd, fdsetp)
#define FD_ISSET(fd, fdsetp) __FD_ISSET (fd, fdsetp)
#define FD_ZERO(fdsetp) __FD_ZERO (fdsetp)
③根据分析,我么可以把这个结构理解为一个整数数组,更严格的说是一个 “位图”。使用位图中对应的位来表示要监视的文件描述符。并且还提供了一组操作fd_set的接口函数,来方便的操作该位图!
void FD_CLR(int fd, fd_set *set); // 用来清除描述词组set中相关fd 的位
int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组set中相关fd 的位是否为真
void FD_SET(int fd, fd_set *set); // 用来设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位
(5)函数的返回值
(6)其中,错误值可能为:
理解select模型的关键在于理解fd_set,为说明方便,取fd_set长度为1字节,fd_set中的每一bit可以对应一个文件描述符fd。则1字节长的fd_set最大可以对应8个fd。
这里我们思考为什么需要额外的数据结构array保存放到select监控集中的fd?
也正是因为以上两点,Select也有不得不说的缺点!
(1)SelectServer.hpp:
#pragma once
#include <iostream>
#include <sys/select.h>
#include <sys/time.h>
#include "Socket.hpp"
static const uint16_t defaultport = 8080;
static const int fd_num_max = (sizeof(fd_set) * 8);
int defaultfd = -1;
class SelectServer
{
public:
SelectServer(uint16_t port = defaultport)
:_port(port)
{
for(int i = 0; i < fd_num_max; i++)
{
fd_array[i] = defaultfd;
}
}
bool Init()
{
_listensock.Socket();
_listensock.Bind(_port);
_listensock.Listen();
return true;
}
void Start()
{
int listensock = _listensock.Getsock();
fd_array[0] = listensock;
while(1)
{
fd_set rfds;
FD_ZERO(&rfds); // 用来清除描述词组set的全部位
int maxfd = fd_array[0];
for(int i = 0; i < fd_num_max; i++)
{
if(fd_array[i] == defaultfd)
{
continue;
}
FD_SET(fd_array[i], &rfds); // 用来设置描述词组set中相关fd的位
if(fd_array[i] > maxfd)
{
maxfd = fd_array[i];
LOG(NORMAL, "max fd update");
}
}
timeval timeout = {3, 0};
int n = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);
switch (n)
{
case 0:
std::cout << "time out, timeout: " << timeout.tv_sec << "." << timeout.tv_usec << std::endl;
break;
case -1:
std::cerr << "select error" << std::endl;
break;
default:
// 有事件就绪了,TODO
std::cout << "get a new link!!!!!" << std::endl;
Dispatcher(rfds); // 就绪的事件和fd你怎么知道只有一个呢???
break;
}
}
}
void Dispatcher(fd_set rfds)
{
for(int i = 0; i < fd_num_max; i++)
{
int fd = fd_array[i];
if (fd == defaultfd)
{
continue;
}
if(FD_ISSET(fd, &rfds)) // 用来测试描述词组set中相关fd 的位是否为真
{
if (fd == _listensock.Getsock()) //说明是监听套接字就绪
{
Accepter(); // 连接管理器
}
else // non listenfd
{
Recver(fd, i);
}
}
}
}
void Accepter()
{
// 连接事件就绪了
int sockfd = _listensock.Accept();
if(sockfd < 0)
{
return;
}
LOG(NORMAL, "accept success");
int pos = 0;
for(pos = 1; pos < fd_num_max; pos++)
{
if(fd_array[pos] != defaultfd)
{
continue;
}
else
{
break;
}
}
if(pos == fd_num_max)
{
LOG(WARNING, "server is full");
close(sockfd);
}
else
{
fd_array[pos] = sockfd;
PrintFd();
}
}
void Recver(int fd, int pos)
{
char buffer[1024];
int n = read(fd, buffer, sizeof(buffer) - 1);
if(n > 0)
{
buffer[n] = 0;
std::cout << "get a messge: " << buffer << std::endl;
}
else if(n == 0)
{
LOG(NORMAL, "client quit");
close(fd);
fd_array[pos] = defaultfd; // 这里本质是从select中移除
}
else
{
LOG(WARNING, "client quit");
close(fd);
fd_array[pos] = defaultfd; // 这里本质是从select中移除
}
}
void PrintFd()
{
std::cout << "online fd list: ";
for (int i = 0; i < fd_num_max; i++)
{
if (fd_array[i] == defaultfd)
{
continue;
}
std::cout << fd_array[i] << " ";
}
std::cout << std::endl;
}
~SelectServer()
{
_listensock.Close();
}
private:
Sock _listensock;
uint16_t _port;
int fd_array[fd_num_max]; // 数组, 用户维护的!
// int wfd_array[fd_num_max];
};
(2)test.cpp:
#include "SelectServer.hpp"
#include <memory>
int main()
{
std::unique_ptr<SelectServer> svr(new SelectServer());
svr->Init();
svr->Start();
return 0;
}
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
(1)参数说明:
(2)pollfd结构体的定义:
// pollfd结构
struct pollfd
{
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
(3)events 和 revents 的取值:
事件 | 描述 | 是否可作为输入 | 是否可作为输出 |
---|---|---|---|
POLLIN | 数据(包括普通数据和优先数据)可读 | 是 | 是 |
POLLRDNORM | 普通数据可读 | 是 | 是 |
POLLRDBAND | 优先级带数据可读(Linux不支持) | 是 | 是 |
POLLPRI | 高优先级数据可读,比如TCP带外数据 | 是 | 是 |
POLLOUT | 数据(包括普通数据和优先数据)可读 | 是 | 是 |
POLLWRNORM | 普通数据可写 | 是 | 是 |
POLLWRBAND | 优先级带数据可写 | 是 | 是 |
POLLRDHUP | TCP连接被对方关闭,或者对方关闭了写操作,它由GNU引入 | 是 | 是 |
POLLERR | 错误 | 否 | 是 |
POLLHUP | 挂起,比如管道的写端被关闭后,读端描述符上将收到POLLHUP事件 | 否 | 是 |
POLLNVAL | 文件描述符没有打开 | 否 | 是 |
这些取值实际上都是以宏的方式进行定义的,二进制序列当中只要一个比特位是1,且1的位置各不相同:
(4)返回结果:
(1)PollServer.hpp:
#pragma once
#include <iostream>
#include <poll.h>
#include <sys/time.h>
#include "Socket.hpp"
static const uint16_t defaultport = 8080;
static const int fd_num_max = (sizeof(fd_set) * 8);
int defaultfd = -1;
int non_event = 0;
class PollServer
{
public:
PollServer(uint16_t port = defaultport)
:_port(port)
{
for(int i = 0; i < fd_num_max; i++)
{
_event_fds[i].fd = defaultfd;
_event_fds[i].events = non_event;
_event_fds[i].revents = non_event;
}
}
bool Init()
{
_listensock.Socket();
_listensock.Bind(_port);
_listensock.Listen();
return true;
}
void Start()
{
int listensock = _listensock.Getsock();
_event_fds[0].fd = listensock;
_event_fds[0].events = POLLIN;
int timeout = 3000; // 3s
while(1)
{
int n = poll(_event_fds, fd_num_max, timeout);
switch (n)
{
case 0:
std::cout << "time out, timeout: " << std::endl;
break;
case -1:
std::cerr << "select error" << std::endl;
break;
default:
// 有事件就绪了,TODO
std::cout << "get a new link!!!!!" << std::endl;
Dispatcher(); // 就绪的事件和fd你怎么知道只有一个呢???
break;
}
}
}
void Dispatcher()
{
for(int i = 0; i < fd_num_max; i++)
{
int fd = _event_fds[i].fd;
if (fd == defaultfd)
{
continue;
}
if(_event_fds[i].events & POLLIN) // 用来测试描述词组set中相关fd 的位是否为真
{
if (fd == _listensock.Getsock()) //说明是监听套接字就绪
{
Accepter(); // 连接管理器
}
else // non listenfd
{
Recver(fd, i);
}
}
}
}
void Accepter()
{
// 连接事件就绪了
int sockfd = _listensock.Accept();
if(sockfd < 0)
{
return;
}
LOG(NORMAL, "accept success");
int pos = 0;
for(pos = 1; pos < fd_num_max; pos++)
{
if(_event_fds[pos].fd != defaultfd)
{
continue;
}
else
{
break;
}
}
if(pos == fd_num_max)
{
LOG(WARNING, "server is full");
close(sockfd);
}
else
{
_event_fds[pos].fd = sockfd;
_event_fds[pos].events = POLLIN;
_event_fds[pos].revents = non_event;
PrintFd();
}
}
void Recver(int fd, int pos)
{
char buffer[1024];
int n = read(fd, buffer, sizeof(buffer) - 1);
if(n > 0)
{
buffer[n] = 0;
std::cout << "get a messge: " << buffer << std::endl;
}
else if(n == 0)
{
LOG(NORMAL, "client quit");
close(fd);
_event_fds[pos].fd = defaultfd; // 这里本质是从select中移除
}
else
{
LOG(WARNING, "client quit");
close(fd);
_event_fds[pos].fd = defaultfd; // 这里本质是从select中移除
}
}
void PrintFd()
{
std::cout << "online fd list: ";
for (int i = 0; i < fd_num_max; i++)
{
if (_event_fds[i].fd == defaultfd)
{
continue;
}
std::cout << _event_fds[i].fd << " ";
}
std::cout << std::endl;
}
~PollServer()
{
_listensock.Close();
}
private:
Sock _listensock;
uint16_t _port;
pollfd _event_fds[fd_num_max]; // 数组, 用户维护的!
// int wfd_array[fd_num_max];
};
(2)test.cpp:
#include "PollServer.hpp"
#include <memory>
int main()
{
std::unique_ptr<PollServer> svr(new PollServer());
svr->Init();
svr->Start();
return 0;
}
按照man手册的说法:是为处理大批量句柄而作了改进的poll。
它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll 有3个相关的系统调用,这一点和之前的poll是完全不一样的,还有一点是epoll还是一样只负责等待。
#include <sys/epoll.h>
int epoll_create(int size);
创建一个epoll的句柄:
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
(1)epoll的事件注册函数:
(2)第二个参数的取值:
(3)struct epoll_event 类型数据定义如下,该类型包含两个成员,其中一个为uint32_t类型成员events,用于控制该事件的属性是可读、可写还是异常等。events可以为表示4.2中宏的集合,还有一个为联合自定义类型数据,其中该联合类型可以传四种不同的数据表达不同的意义,但一般使用fd来指定文件描述符。
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
(4)events可以是以下几个宏的集合:
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
函数功能是收集在epoll监控的事件中已经发送的事件:
(1)当某一进程调用epoll_create方法时, Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关:
struct eventpoll
{
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
struct epitem
{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}
(2)总结一下,epoll的使用过程就是三部曲:
epoll有2种工作方式:水平触发(LT)和边缘触发(ET)
你妈喊你吃饭的例子:当你正在玩吃鸡的时候,眼看进入了决赛圈,你妈饭做好了,喊你吃饭的时候有两种方式:
如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式。
epoll的高性能,是有一定的特定场景的,如果场景选择的不适宜,epoll的性能可能适得其反。
使用 ET 模式的 epoll需要将文件描述设置为非阻塞。这个不是接口上的要求, 而是 “工程实践” 上的要求。
(1)假设这样的场景:服务器接受到一个10k的请求,会向客户端返回一个应答数据,如果客户端收不到应答,不会发送第二个10k请求。
(2)如果服务端写的代码是阻塞式的read,并且一次只 read 1k 数据的话(read不能保证一次就把所有的数据都读出来,参考 man 手册的说明。可能被信号打断),剩下的9k数据就会待在缓冲区中。
此时由于 epoll 是ET模式,并不会认为文件描述符读就绪,epoll_wait 就不会再次返回,剩下的 9k 数据会一直在缓冲区中,直到下一次客户端再给服务器写数据,epoll_wait 才能返回。但是问题来了:
所以,为了解决上述问题(阻塞read不一定能一下把完整的请求读完),于是就可以使用非阻塞轮训的方式来读缓冲区,保证一定能把完整的请求都读出来。
而如果是LT没这个问题。只要缓冲区中的数据没读完,epoll_wait 返回文件描述符读就绪。
(1)Poller.hpp:
#pragma once
#include <cerrno>
#include <cstring>
#include <sys/epoll.h>
#include "Log.hpp"
#include <unistd.h>
class Poller
{
public:
Poller()
{
_epfd = epoll_create(128);
if(_epfd == -1)
{
LOG(FATAL, "epoll_create error");
}
else
{
LOG(NORMAL, "epoll_create success");
}
}
int EpllerUpdate(int oper, int sock, int event)
{
int n = 0;
if(oper == EPOLL_CTL_DEL)
{
n = epoll_ctl(_epfd, oper, sock, nullptr);
if(n != 0)
{
LOG(FATAL, "epoll_ctl delete error!");
}
}
else
{
epoll_event ev;
ev.events = event;
ev.data.fd = sock;
n = epoll_ctl(_epfd, oper, sock, &ev);
if(n != 0)
{
LOG(FATAL, "epoll_ctl error!");
}
}
return n;
}
int EpollerWait(epoll_event revents[], int num)
{
int n = epoll_wait(_epfd, revents, num, _timeout);
return n;
}
~Poller()
{
close(_epfd);
}
private:
int _epfd;
int _timeout{3000};
};
(2)EpollServer.hpp:
#pragma once
#include <iostream>
#include <memory>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Poller.hpp"
#include "Log.hpp"
class EpollServer
{
static const int num = 64;
public:
EpollServer(uint16_t port)
: _port(port),
_listsocket_ptr(new Sock()),
_epoller_ptr(new Poller())
{}
void Init()
{
_listsocket_ptr->Socket();
_listsocket_ptr->Bind(_port);
_listsocket_ptr->Listen();
LOG(NORMAL, "create listen socket success");
}
void Start()
{
int listsockfd = _listsocket_ptr->Getsock();
// 将listsockfd添加到epoll中 -> listensock和他关心的事件,添加到内核epoll模型中rb_tree.
_epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, listsockfd, EPOLLIN);
epoll_event revs[num];
while(1)
{
int n = _epoller_ptr->EpollerWait(revs, num);
if(n > 0)
{
// 有事件就绪
LOG(DEBUG, "event happened");
Dispatcher(revs, n);
}
else if (n == 0)
{
LOG(NORMAL, "time out ...");
}
else
{
LOG(FATAL, "epll wait error");
}
}
}
void Dispatcher(epoll_event revs[], int num)
{
for(int i = 0; i < num; i++)
{
uint32_t events = revs[i].events;
int fd = revs[i].data.fd;
if(events & EPOLLIN)
{
if(fd == _listsocket_ptr->Getsock())
{
Accepter();
}
else
{
// 其他fd上面的普通读取事件就绪
Recver(fd);
}
}
else if(events & EPOLLOUT)
{
;
}
else
{
;
}
}
}
void Accepter()
{
int sockfd = _listsocket_ptr->Accept();
if(sockfd > 0)
{
// 我们不能直接读取吗
_epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, sockfd, EPOLLIN);
LOG(NORMAL, "get a new link");
}
}
void Recver(int fd)
{
char buffer[1024];
ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
if(n > 0)
{
buffer[n] = 0;
std::cout << "get a messge: " << buffer << std::endl;
// wrirte
std::string echo_str = "server echo $ ";
echo_str += buffer;
write(fd, echo_str.c_str(), echo_str.size());
}
else if(n == 0)
{
LOG(NORMAL, "client quit");
//细节
_epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
close(fd);
}
else
{
LOG(FATAL, "recv error");
_epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
close(fd);
}
}
private:
std::shared_ptr<Sock> _listsocket_ptr;
std::shared_ptr<Poller> _epoller_ptr;
uint16_t _port;
};
(3)test.cpp:
#include <iostream>
#include <memory>
#include "EpollServer.hpp"
int main()
{
std::unique_ptr<EpollServer> svr(new EpollServer(8080));
svr->Init();
svr->Start();
return 0;
}
(1)基于 LT 版本稍加修改即可:
注意:此代码考虑 listen_sock ET 的情况,如果将 listen_sock 设为 ET,则需要非阻塞轮询的方式 accept,否则会导致同一时刻大量的客户端同时连接的时候,只能 accept 一次的问题。
(2)由于ET模式的特性;我们需要每次把数据一次性读完,避免丢失;这就需要每个描述符sock都有一个收发、错误、缓冲区;并能执行相应的回调方法。
已经描述了;就需要我们用unordered_map统一组织起来。(便于我们再回调函数中实现)
(3)对于读取操作:
(4)对于写操作:
(5)TcpServer.hpp:
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <cerrno>
#include <functional>
#include <unordered_map>
#include <unistd.h>
#include <fcntl.h>
#include "Log.hpp"
#include "Poller.hpp"
#include "Socket.hpp"
void SetNonBlockOrDie(int sock)
{
int fl = fcntl(sock, F_GETFL);
if(fl < 0)
{
exit(-1);
}
fcntl(sock, F_SETFL, fl | O_NONBLOCK);
}
class Connection;
class TcpServer;
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
const static int g_buffer_size = 128;
using func_t = std::function<void(std::weak_ptr<Connection>)>;
class Connection
{
public:
Connection(int sockfd)
:_sockfd(sockfd)
{}
int Getsock()
{
return _sockfd;
}
void SetHander(func_t recv_cb, func_t send_cb, func_t except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
void AppendInBuffer(const std::string& in)
{
_in_buffer += in;
}
void AppendOutBuffer(const std::string& out)
{
_out_buffer += out;
}
std::string& Inbuffer()
{
return _in_buffer;
}
std::string& Outbuffer()
{
return _out_buffer;
}
void SetWeakPtr(TcpServer* tcp_server_ptr)
{
_tcp_server_ptr = tcp_server_ptr;
}
~Connection()
{}
public:
func_t _recv_cb;
func_t _send_cb;
func_t _except_cb;
// 添加一个回指指针
TcpServer* _tcp_server_ptr;
std::string _ip;
uint16_t _port;
private:
int _sockfd;
std::string _in_buffer;
std::string _out_buffer;
};
// enable_shared_from_this:可以提供返回当前对象的this对应的shared_ptr
class TcpServer : public std::enable_shared_from_this<TcpServer>
{
static const int num = 64;
public:
TcpServer(uint16_t port, func_t OnMessage)
:_port(port)
,_OnMessage(OnMessage)
,_quit(true)
,_epoller_ptr(new (Poller))
,_listensock_ptr(new (Sock))
{}
void Init()
{
_listensock_ptr->Socket();
SetNonBlockOrDie(_listensock_ptr->Getsock());
_listensock_ptr->Bind(_port);
_listensock_ptr->Listen();
LOG(NORMAL, "create listen socket success\n");
AddConnection(_listensock_ptr->Getsock(),
EVENT_IN, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}
void AddConnection(int sockfd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb, const std::string& ip = "0.0.0.0", uint16_t port = 0)
{
// 1. 给sock也建立一个connection对象,将listensock添加到Connection中,同时,listensock和Connecion放入_connections
std::shared_ptr<Connection> new_connection(new Connection(sockfd));
new_connection->SetWeakPtr(this); // shared_from_this(): 返回当前对象的shared_ptr
//std::shared_ptr<Connection> new_connection = std::make_shared<Connection>(sockfd, std::shared_ptr<TcpServer>(this));
new_connection->SetHander(recv_cb, send_cb, except_cb);
new_connection->_ip = ip;
new_connection->_port = port;
//2. 添加到unordered_map
_connections.insert(std::make_pair(sockfd, new_connection));
// 3. 我们添加对应的事件,除了要加到内核中,fd, event
_epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, sockfd, event);
}
void Accepter(std::weak_ptr<Connection> conn)
{
auto connection = conn.lock();
while(1)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sockfd = accept(connection->Getsock(), (struct sockaddr*)&peer, &len);
if(sockfd > 0)
{
uint16_t peerport = ntohs(peer.sin_port);
char ipbuf[128];
inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));
printf("get a new client, get info-> [%s:%d], sockfd : %d\n", ipbuf, peerport, sockfd);
SetNonBlockOrDie(sockfd);
// listensock只需要设置_recv_cb, 而其他sock,读,写,异常
AddConnection(sockfd, EVENT_IN,
std::bind(&TcpServer::Recver, this, std::placeholders::_1),
std::bind(&TcpServer::Sender, this, std::placeholders::_1),
std::bind(&TcpServer::Excepter, this, std::placeholders::_1),
ipbuf, peerport);
}
else
{
if (errno == EWOULDBLOCK)
{
break;
}
else if(errno == EINTR)
{
continue;
}
else
{
break;
}
}
}
}
void Recver(std::weak_ptr<Connection> conn)
{
if(conn.expired())
{
return;
}
auto connection = conn.lock();
int sockfd = connection->Getsock();
while(1)
{
char buffer[g_buffer_size];
memset(buffer, 0, sizeof(buffer));
ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取
if(n > 0)
{
connection->AppendInBuffer(buffer);
}
else if(n == 0)
{
printf("sockfd: %d, client info %s:%d quit...\n", sockfd, connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
else
{
if(errno == EWOULDBLOCK)
{
break;
}
else if(errno == EINTR)
{
continue;
}
else
{
printf("sockfd: %d, client info %s:%d recv error...\n", sockfd, connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
// 数据有了,但是不一定全,1. 检测 2. 如果有完整报文,就处理
_OnMessage(connection); // 你读到的sock所有的数据connection
}
void Sender(std::weak_ptr<Connection> conn)
{
if(conn.expired())
{
return;
}
auto connection = conn.lock();
auto& outbuffer = connection->Outbuffer();
while(1)
{
ssize_t n = send(connection->Getsock(), outbuffer.c_str(), outbuffer.size(), 0);
if(n > 0)
{
outbuffer.erase(0, n);
if(outbuffer.empty())
{
break;
}
}
else if(n == 0)
{
return;
}
else
{
if(errno == EWOULDBLOCK)
{
break;
}
else if(errno == EINTR)
{
continue;
}
else
{
printf("sockfd: %d, client info %s:%d send error...\n", connection->Getsock(), connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
if(!outbuffer.empty())
{
// 开启对写事件的关心
EnableEvent(connection->Getsock(), true, true);
}
else
{
// 关闭对写事件的关心
EnableEvent(connection->Getsock(), true, false);
}
}
void Excepter(std::weak_ptr<Connection> conn)
{
if(conn.expired())
{
return;
}
auto connection = conn.lock();
int fd = connection->Getsock();
printf("Excepter hander sockfd: %d, client info %s:%d excepter handler\n",
connection->Getsock(), connection->_ip.c_str(), connection->_port);
// 1. 移除对特定fd的关心
_epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
// 2. 关闭异常的文件描述符
printf("close %d done...\n", fd);
close(fd);
// 3. 从unordered_map中移除
printf("remove %d from _connections...\n", fd);
_connections.erase(fd);
}
void EnableEvent(int sockfd, bool readable, bool writeable)
{
uint32_t events = 0;
events |= ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
_epoller_ptr->EpllerUpdate(EPOLL_CTL_MOD, sockfd, events);
}
bool IsConnectionSafe(int fd)
{
auto iter = _connections.find(fd);
if(iter == _connections.end())
{
return false;
}
return true;
}
void Start()
{
_quit = false;
while(1)
{
Dispatcher();
PrintConnection();
}
_quit = true;
}
void Dispatcher()
{
int n = _epoller_ptr->EpollerWait(revs, num);
for(int i = 0; i < n; i++)
{
uint32_t events = revs[i].events;
int sockfd = revs[i].data.fd;
if((events & EPOLLIN) && IsConnectionSafe(sockfd))
{
if(_connections[sockfd]->_recv_cb)
{
_connections[sockfd]->_recv_cb(_connections[sockfd]);
}
}
if((events & EPOLLOUT) && IsConnectionSafe(sockfd))
{
if(_connections[sockfd]->_send_cb)
{
_connections[sockfd]->_send_cb(_connections[sockfd]);
}
}
}
}
void PrintConnection()
{
std::cout << "_connections fd list: ";
for (auto &connection : _connections)
{
std::cout << connection.second->Getsock() << ", ";
std::cout << "inbuffer: " << connection.second->Inbuffer().c_str();
}
std::cout << std::endl;
}
~TcpServer()
{}
private:
std::shared_ptr<Poller> _epoller_ptr;
std::shared_ptr<Sock> _listensock_ptr; // 监听socket, 可以把他移除到外部
std::unordered_map<int, std::shared_ptr<Connection>> _connections;
struct epoll_event revs[num];
uint16_t _port;
bool _quit;
// 让上层处理信息
func_t _OnMessage;
};
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。