当前位置:   article > 正文

高级IO和5种IO模型

高级IO和5种IO模型

1. 高级IO

1.1 IO的基本概念

  • 概念:内存和外设进行沟通的动作叫做IO,IO也就是输入和输出.
  • 在冯诺依曼体系当中:将数据从输入设备拷贝到内存就叫做输入,将数据从内存拷贝到输出设备就叫做输出。

  • 对文件进行的读写操作本质就是一种IO,文件IO对应的外设就是磁盘。
  • 对网络进行的读写操作本质也是一种IO,网络IO对应的外设就是网卡。
  • 在网络层面,数据往网络里写的本质是将数据从内存拷贝到网卡上,从网络里读的本质是将数据从网卡拷贝到内存。

我们之前学到的所有关于IO的接口都可以称作拷贝接口,以send为例子

  • send:把数据拷贝到TCP的发送缓冲区里,读数据的时候是从缓冲区里面去拿,有数据才能拿,没数据只能等。

以接收为例子,如果缓冲区满了,就通告对方,然后对方就不发数据了,我的缓冲区为什么会满呢?无非就是上层来不及取或者取的太慢了,数据来的太快导致缓冲区被打满了。

当缓冲区满了,如何让发送方尽快发呢?肯定就是通知上层尽快把数据取走。我们可以认为是一种生产消费者模型,其中缓冲区就是交易场所:

1.2 OS如何得知外设当中有数据可读取

输入就是操作系统将数据从外设拷贝到内存的过程,操作系统一定要通过某种方法得知特定外设上是否有数据就绪:

  • 首先需要明确的是:并不是操作系统想要从外设读取数据时外设上就一定有数据。

    • 比如用户正在访问某台服务器,当用户的请求报文发出后就需要等待从网卡当中读取服务器发来的响应数据。
    • 但此时对方服务器可能还没有收到我们发出的请求报文,或是正在对我们的请求报文进行数据分析,也有可能服务器发来的响应数据还在网络中路由。
  • 但操作系统不会主动去检测外设上是否有数据就绪,这种做法一定会降低操作系统的工作效率。

    • 因为大部分情况下外设当中都是没有数据的,因此操作系统所做的大部分检测工作其实都是徒劳的。
  • 操作系统实际采用的是中断的方式来得知外设上是否有数据就绪的,当某个外设上面有数据就绪时,该外设就会向CPU当中的中断控制器发送中断信号,中断控制器再根据产生的中断信号的优先级按顺序发送给CPU。

  • 每一个中断信号都有一个对应的中断处理程序,存储中断信号和中断处理程序映射关系的表叫做中断向量表,当CPU收到某个中断信号时就会自动停止正在运行的程序,然后根据该中断向量表执行该中断信号对应的中断处理程序,处理完毕后再返回原被暂停的程序继续运行。

需要注意的是,CPU不直接和外设打交道指的是在数据层面上,而外设其实是可以直接将某些控制信号发送给CPU当中的某些控制器的。

1.3 OS如何处理从网卡中读取到的数据包

操作系统任何时刻都可能会收到大量的数据包,因此操作系统必须将这些数据包管理起来,所谓的管理就是“先描述,再组织”。在内核当中有一个结构叫做sk_buff,该结构就是用来管理和控制接收或发送数据包的信息的。

(1)如下是一个简化版的sk_buff结构:

struct sk_buff
{
    char* transport_header;
    char* network_header;
    char* mac_header;
    char* data;
    struct sk_buff* next;
    struct sk_buff* prev;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

(2)当操作系统从网卡当中读取到一个数据包后,会将该数据依次交给链路层、网络层、传输层、应用层进行解包和分用,最终将数据包中的数据交给了上层用户,如下是sk_buff结构是如何进行数据包的解包和分用的过程:

  • 当操作系统从网卡中读取到一个数据包后,就会定义出一个sk_buff结构,然后用sk_buff结构当中的data指针指向这个读取到的数据包,并将定义出来的这个sk_buff结构与其他sk_buff结构以双链表的形式组织起来,此时操作系统对各个数据包的管理就变成了对双链表的增删查改等操作。
  • 接下来我们需要将读取上来的数据包交给最底层的链路层处理,进行链路层的解包和分用,此时就是让sk_buff结构当中的mac_header指针指向最初的数据包,然后向后读取链路层的报头,剩下的就是需要交给网络层处理的有效载荷了,此时便完成了链路层的解包。
  • 这时链路层就需要将有效载荷向上交付给网络层进行解包和分用了,这里所说的向上交付只是形象的说法,实际向上交付并不是要将数据从链路层的缓冲区拷贝到网络层的缓冲区,我们只需要让sk_buff结构当中的network_header指针,指向数据包中链路层报头之后的数据即可,然后继续向后读取网络层的报头,便完成了网络层的解包。
  • 紧接着就是传输层对数据进行处理了,同样的道理,让sk_buff结构当中的transport_header指针,指向数据包中网络层报头之后的数据,然后继续向后读取传输层的报头,便完成了传输层的解包。
  • 传输层解包后就可以根据具体使用的传输层协议,对应将剩下的数据拷贝到TCP或UDP的接收缓冲区供用户读取即可。


(3)发送数据时对数据进行封装也是同样的道理,就是依次在数据前面拷贝上对应的报头,最后再将数据发送出去(UDP)或拷贝到发送缓冲区(TCP)即可。也就是说,数据包在进行封装和解包的过程中,本质数据的存储位置是没有发生变化的,我们实际只是在用不同的指针对数据进行操作而已。

但内核中的sk_buff并不像不是那样简单:

  • 一方面,为了保证高效的网络报文处理效率,这就要求sk_buff的结构也必须是高效的。
  • 另一方面,sk_buff结构需要被内核协议中的各个协议共同使用,因此sk_buff必须能够兼容所有的网络协议。

1.4 IO的步骤

当程序运行到IO函数时一般都在阻塞式等待,这也算作IO过程中的一个环节,但真正意义上的IO就是讲数据拷贝到缓冲区中,任何IO过程都要包含两个步骤:等待和拷贝 即: IO = 等待+数据拷贝。

要提高IO的效率主要分为两步:

  • 第一步是等,即等待IO条件就绪。
  • 第二步是拷贝,也就是当IO条件就绪后将数据拷贝到内存或外设。

任何IO的过程,都包含“等”和“拷贝”这两个步骤,在实际的应用场景中,等待消耗的时间往往都远远高于拷贝的时间所以提高IO效率的本质是:尽可能地减少单位时间内“等待”的比重。

提高IO效率的方式有两种改变等待的方式和减少等待的比重高效的IO本质就是减少单位时间内“等待”的比重

2. 五种IO模型

2.1 利用钓鱼来理解

(1)我们以钓鱼为例子:IO的过程其实和钓鱼是非常类似的。

  • 钓鱼的过程同样分为“等”和“拷贝”两个步骤,只不过这里的“等”指的是等鱼上钩,“拷贝”指的是当鱼上钩后将鱼从河里“拷贝”到我们的鱼桶当中。
  • IO时“等”消耗的时间往往比“拷贝”消耗的时间多,钓鱼也恰好符合这个特点。钓鱼时我们大部分时间都在等鱼上钩,而当鱼上钩后只需要一瞬间就能将鱼“拷贝”上来。

(2)在谈论高效的IO之前,我们先来看看什么样的钓鱼方式才是高效的,下面给出五个人的钓鱼方式:

  • 张三:拿了1个鱼竿,将鱼钩抛入水中后就死死的盯着浮漂,什么也不做,当有鱼上钩后就挥动鱼竿将鱼钓上来。
  • 李四:拿了1个鱼竿,将鱼钩抛入水中后就去做其他事情,然后定期观察浮漂,如果有鱼上钩则挥动鱼竿将鱼钓上来,否则继续去做其他事情。
  • 王五:拿了1个鱼竿,将鱼钩抛入水中后在鱼竿顶部绑一个铃铛,然后就去做其他事情,如果铃铛响了就挥动鱼竿将鱼钓上来,否则就根本不管鱼竿。
  • 赵六:拿了100个鱼竿,将100个鱼竿抛入水中后就定期观察这100个鱼竿的浮漂,如果某个鱼竿有鱼上钩则挥动对应的鱼竿将鱼钓上来。
  • 田七:田七是一个有钱的老板,他给了自己的司机一个桶、一个电话、一个鱼竿,让司机去钓鱼,当鱼桶装满的时候再打电话告诉田七来拿鱼,而田七自己则开车去做其他事情去了。

(3)那么张三、李四、王五的钓鱼效率是否一样呢?为什么?

其实张三、李四、王五的钓鱼效率本质上是一样的。

  • 首先他们的钓鱼方式都是一样的,都是先等鱼上钩,然后再将鱼钓上来。
  • 其次,因为他们每个人都是拿的一根鱼竿,当河里有鱼来咬鱼钩时,这条鱼咬哪一个鱼钩的概率都是相等的。

因此张三、李四、王五他们三个人的钓鱼的效率是一样的,他们只是等鱼上钩的方式不同而已,张三是死等,李四是定期检测浮漂,而王五是通过铃铛来判断是否有鱼上钩。

注意这里问的是他们的钓鱼效率是否是一样的,而不是问他们整体谁做的事最多,如果说整体做事情的量的话,那一定是王五做得最多,李四次之,张三最少。

(4)张三、李四、王五它们三个人分别和赵六比较,谁的钓鱼效率更高?

赵六毫无疑问是这四个人当中钓鱼效率最高的,因为赵六同时在等多个鱼竿上有鱼上钩,因此在单位时间内,赵六的鱼竿有鱼上钩的概率是最大的。

  • 为了方便计算,我们假设赵六拿了97个鱼竿,加上张三、李四、王五的鱼竿一共就有100个鱼竿。
  • 当河里有鱼来咬鱼钩时,这条鱼咬张三、李四、王五的鱼钩的概率都是百分之一,而咬赵六的鱼钩的概率就是百分之九十七。
  • 因此在单位时间内,赵六的鱼竿上有鱼的概率是张三、李四、王五的97倍,

高效的钓鱼就是要减少单位时间内“等”的时间,增加“拷贝”的时间,所以说赵六的钓鱼效率是这四个人当中最高的。赵六的钓鱼效率之所以高,是因为赵六一次等待多个鱼竿上的鱼上钩,此时就可以将“等”的时间进行重叠。

(5)如何看待田七的这种钓鱼方式

  • 田七让自己的司机帮自己钓鱼,自己开车去做其他事情去了,此时这个司机具体怎么钓鱼已经不重要了,他可以模仿张三、李四、王五、赵六任何一个人的钓鱼方式进行钓鱼,最重要的是田七本人并没有参与整个钓鱼的过程,他只是发起了钓鱼的任务,而真正钓鱼的是司机,田七在司机钓鱼期间可能在做任何其他事情,如果将钓鱼看作是一种IO的话,那田七的这种钓鱼方式就叫做异步IO,它本身是没有参与IO的等待和拷贝的过程。
  • 而对于张三、李四、王五、赵六来说,他们都需要自己等鱼上钩,当鱼上钩后又需要自己把鱼从河里钓上来,对应到IO当中就是需要自己进行数据的拷贝,也需要自己进行等待,因此他们四个人的钓鱼方式都叫做同步IO。

(6)这五个人的钓鱼方式分别对应的就是五种IO模型:

  • 张三这种死等的钓鱼方式对应就是阻塞IO。
  • 李四这种定时检测是否有鱼上钩的方式就是非阻塞IO。
  • 王五这种通过设置铃铛得知事件是否就绪的方式就是信号驱动IO。
  • 王五这种一次等待多个鱼竿上有鱼的钓鱼方式就是IO多路转接。
  • 田七这种让别人帮自己钓鱼的钓鱼方式就是异步IO。
IO模型简单对比解释
阻塞IO一心一意地等待数据到来
非阻塞IO三心二意地轮询式等待
信号驱动信号递达时再来读取或写入数据
多路转接让大批线程等待,自身读取数据
异步通信IO让其他进程或线程进行等待和读取,自身获取结果

阻塞IO,非阻塞IO和信号驱动IO本质上是不能提高IO的效率的,但非阻塞IO和信号驱动IO能提高整体做事的效率。

上述钓鱼的例子中:钓鱼的河对应就是内核,这里的每一个人都是进程或线程,鱼竿对应的就是文件描述符fd或套接字,装鱼的桶对应的就是用户缓冲区。

(7)如何区分同步IO和异步IO

  • 判断一个IO过程是同步的还是异步的,本质就是看当前进程或线程是否需要参与IO过程,如果要参与等待或者拷贝那就是同步IO,否则就是异步IO。

2.2 阻塞IO

阻塞IO就是在内核还没将数据准备好,系统调用会一直等待,阻塞IO是最常见的IO模型,所有的套接字默认就是阻塞等待:

  • 比如当调用recvfrom函数从某个套接字上读取数据时,可能底层数据还没有准备好,此时就需要等待数据就绪,当数据就绪后再将数据从内核拷贝到用户空间,最后recvfrom函数才会返回。
  • 在recvfrom函数等待数据就绪期间,在用户看来该进程或线程就阻塞住了(卡住了),本质就是操作系统将该进程或线程的状态设置为S状态,然后将其进程PCB放入等待队列当中,当数据就绪后操作系统再将该进程PCB其从等待队列当中唤醒,将该进程或线程的状态设置为R状态,把进程PCB放到运行队列,然后该进程或线程再将数据从内核拷贝到用户空间。

以阻塞方式进行IO操作的进程或线程,在“等”和“拷贝”没有结束期间都不会返回,在用户看来就像是阻塞住了,因此我们称之为阻塞IO。

2.3 非阻塞IO

(1)非阻塞IO:如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回EWOULDBLOCK错误码。

非阻塞IO往往需要程序员以循环的方式反复尝试读写文件描述符,这个过程称为轮询,这对CPU来说是较大的浪费,一般只有特定场景下才使用:

  • 比如当调用recvfrom函数以非阻塞方式从某个套接字上读取数据时,如果底层数据还没有准备好,那么recvfrom函数会立马返回,而不会让该进程或线程进行阻塞等待。
  • 因为没有读取的数据,因此该进程或线程后续还需要继续调用recvfrom函数,检测底层数据是否就绪,如果没有就绪则继续返回,直到某次检测到底层数据就绪后,再将数据从内核拷贝到用户空间然后进行成功返回。
  • 每次调用recvfrom函数读取数据时,就算底层数据没有就绪,recvfrom函数也会立马返回,在用户看来该进程或线程就没有被阻塞住,因此我们称之为非阻塞IO。

(2)阻塞IO和非阻塞IO的区别:

  • 阻塞和非阻塞的共同点:都是同步IO。
  • 唯一的区别:等待的方式不同 拷贝数据还是得他们去拷,一个是阻塞等,一个是没有就绪就在应用层立马返回。
  • 阻塞IO:当数据没有就绪时,后续检测数据是否就绪的工作是由操作系统发起的。
  • 非阻塞IO:当数据没有就绪时,后续检测数据是否就绪的工作是由用户发起的。

2.4 信号驱动IO

(1)信号驱动IO就是当内核将数据准备好的时候,使用SIGIO信号通知应用程序进行IO操作:


当底层数据就绪的时候会向当前进程或线程递交SIGIO信号(29号信号----可以通告kill -l进行查看所有的信号),因此可以通过signal或sigaction函数进行自定义捕捉,将SIGIO的信号处理程序自定义为需要进行的IO操作,当底层数据就绪时就会自动执行对应的IO操作。

  • 例如我们需要调用recvfrom函数从某个套接字上读取数据,那么就可以将该操作定义为SIGIO的信号处理程序,
  • 当底层数据就绪时,操作系统就会递交SIGIO信号,此时就会自动执行我们定义的信号处理程序,进程将数据从内核拷贝到用户空间。

(2)信号的产生是异步的,但信号驱动IO是同步IO的一种。

  • 信号的产生异步的是因为信号在任何时刻都可能产生。
  • 但信号驱动IO是同步IO的一种,因为当底层数据就绪时,当前进程或线程需要停下正在做的事情,转而进行数据的拷贝操作,因此当前进程或线程仍然需要参与IO过程。

信号驱动只使用了recv的拷贝功能,等待数据的过程是异步的(信号的产生是异步的),但拷贝数据是同步的。

2.5 IO多路转接

IO多路转接也叫做IO多路复用,能够同时等待多个文件描述符的就绪状态:


IO多路转接的思想:

  • 因为IO过程分为“等”和“拷贝”两个步骤,因此我们使用的recvfrom等接口的底层实际上都做了两件事,第一件事就是当数据不就绪时需要进行等待,第二件事就是当数据就绪后需要进行拷贝。
  • 虽然recvfrom等接口也有“等”的能力,但这些接口一次只能“等”一个文件描述符上的数据或空间就绪,这样IO效率太低了。
  • 因此系统为我们提供了三组接口,分别叫做select、poll和epoll,他们一次可以等待一批文件描述符就绪,这些接口的核心工作就是“等”,我们可以将所有“等”的工作都交给这些多路转接接口。
  • 因为这些多路转接接口是一次“等”多个文件描述符的,因此能够将“等”的时间进行重叠,当数据就绪后再调用对应的recvfrom等函数进行数据的拷贝,此时这些函数就能够直接进行拷贝,而不需要进行“等”操作了。

IO多路转接就像现实生活中的黄牛一样,只不过IO多路转接更像是帮人排队的黄牛,因为多路转接接口实际并没有帮我们进行数据拷贝的操作,这些排队黄牛可以一次帮多个人排队,此时就将多个人排队的时间进行了重叠。

2.6 异步IO

异步IO就是由内核在数据拷贝完成时,通知应用程序 (异步IO不参与等待和拷贝的过程):

  • 进行异步IO需要调用一些异步IO的接口,异步IO接口调用后会立马返回,因为异步IO不需要你进行“等”和“拷贝”的操作这两个动作都由操作系统来完,,你要做的只是发起IO调用。
  • 当异步IO完成后,操作系统会通知应用程序,因此进行异步IO的进程或线程并不参与IO的所有细节。

我们将缓冲区变量提供给异步接口,接口会等待并将数据放到缓冲区中,并通知进程,进程可以直接处理数据,并不会参与任何IO过程,所以是异步的。

异步IO系统提供有一些对应的系统接口,但大多使用复杂,也不建议使用,异步IO也有更好的替代方案。

3. 高级IO的概念

3.1 同步通信 VS 异步通信

(1)同步和异步关注的是消息通信机制:

  • 所谓同步:就是在发出一个调用时,在没有得到结果之前,该调用就不返回,但是一旦调用返回,就得到返回值了;换句话说就是由调用者主动等待这个调用的结果。
  • 异步则是相反调用在发出之后,这个调用就直接返回了,所有没有返回结果;换句话说就是当一个异步过程调用发出后,调用者不会立刻得到结果;而是在调用发出后,被调用者通过状态来通知来通知调用,或通过回调函数处理这个调用。

(2)为什么非阻塞IO在没有得到结果之前就返回了

  • IO是分为“等待”和“拷贝”两步的,当调用recvfrom进行非阻塞IO时,如果数据没有就绪,那么调用会直接返回,此时这个调用返回时并没有完成一个完整的IO过程,即便调用返回了那也是属于"错误"的返回。
  • 因此该进程或线程后续还需要继续调用recvfrom,轮询检测数据是否就绪,当数据就绪后最后再把数据从内核拷贝到用户空间,这才是一次完整的IO过程。

因此在进行非阻塞IO时,在没有得到结果之前,虽然这个调用会返回,但后续还需要继续进行轮询检测,因此可以理解成调用还没有返回,而只有当某次轮询检测到数据就绪,并且完成数据拷贝后才认为该调用返回了。

(3)同步IO通信 VS 同步与互斥

在多进程和多线程当中有同步与互斥的概念,但是这里的同步通信和进程或线程之间的同步是完全不相干的概念。

  • 进程/线程同步指的是,在保证数据安全的前提下,让进程/线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,谈论的是进程/线程间的一种工作关系。
  • 同步IO指的是进程/线程与操作系统之间的关系谈论的是进程/线程是否需要主动参与IO过程。

因此当看到“同步”这个词的时候,一定要先明确这个同步是同步通信的同步,还是同步与互斥的同步。

3.2 阻塞 VS 非阻塞

(1)阻塞和非阻塞关注的是程序在等待调用结果(消息、返回值)时的状态:

  • 阻塞调用是指调用结果返回之前,当前线程会被挂起,调用线程只有在得到结果之后才会返回。
  • 非阻塞调用指的是:在不能立刻得到结果之前,该调用不会阻塞当前线程。

注意如果非阻塞调用没有获取到数据时,是以出错的形式返回的,但并不算真正的错误,通过错误码errno区分出错和条件未就绪。

4. 阻塞IO

(1)系统中大部分的接口都是阻塞式接口。比如我们可以用read函数从标准输入(键盘)当中读取数据,然后使用write函数输出到标准输出(屏幕)当中:

#include<iostream>
#include<unistd.h>
using namespace std;

int main()
{
   while(1)
   {
      char buffer[1024] = {0};
      size_t s = read(0,buffer,sizeof(buffer)-1);
      if(s > 0)
      {
        buffer[s] = 0;
        write(1,buffer,sizeof(buffer));
      }
      else 
      {
        cout << "read errno"<<endl;
        break;
      }
   }
   
   return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

(2)程序运行后,如果我们不进行输入操作,此时该进程就会阻塞住,根本原因就是因为此时底层数据不就绪,因此read函数需要进行阻塞等待:


(3)我们也可以发现当前进程的状态是S状态,也就是放到了等待队列当中。

(4)一旦我们进行了输入操作,此时read函数就会检测到底层数据就绪,然后立马将数据读取到从内核拷贝到我们传入的buffer数组当中,并且调用write函数,将读取到的数据输出到显示器上面,最后我们就看到了我们输入的字符串。

5. 非阻塞IO

打开文件时默认都是以阻塞的方式打开的,如果要以非阻塞的方式打开某个文件,需要在使用open函数打开文件时携带O_NONBLOCK或O_NDELAY选项,此时就能够以非阻塞的方式打开文件。

如果要将已经打开的某个文件或套接字设置为非阻塞,此时就需要用到fcntl函数。

5.1 fcntl函数

(1)函数原型:

#include <unistd.h>
#include <fcntl.h>
int fcntl(int fildes, int cmd, ...); //可变参数列表
  • 1
  • 2
  • 3

(2)参数说明:

  • fd:已经打开的文件描述符。
  • cmd:需要进行的操作。
  • …:可变参数,传入的cmd值不同,后面追加的参数也不同。

(3)fcntl函数常用的5种功能与其对应的cmd取值如下:

  • 复制一个现有的描述符(cmd=F_DUPFD)。
  • 获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD)。
  • 获得/设置文件状态标记(cmd=F_GETFL或F_SETFL)。
  • 获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN)。
  • 获得/设置记录锁(cmd=F_GETLK, F_SETLK或F_SETLKW)。

(4)返回值:

  • 如果函数调用成功,则返回值取决于具体进行的操作。
  • 如果函数调用失败,则返回-1,同时错误码会被设置。

(5)使用例子:我们可以定义一个函数,该函数就用于将指定的文件描述符设置为非阻塞状态:

void SetNonBlock(int fd)
{
    int f1 = fcntl(fd,F_GETFL);//获取文件描述符对应的文件状态标记
    if(f1 < 0)
    {
        perror("fcntl");
        return ;
    }
    
    fcntl(fd,F_SETFL,f1 | O_NONBLOCK);//添加非阻塞标记 O_NONBLOCK,对文件状态标记进行设置
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

此时就将该文件描述符设置为了非阻塞状态。

(6)以非阻塞轮询方式读取标准输入err版本:

此时在调用read函数读取标准输入之前,调用SetNonBlock函数将0号文件描述符设置为非阻塞就行了。

int main()
{
   SetNonBlock(0);//将0号描述符设置为非阻塞状态
   while(1)
   {
      char buffer[1024] = {0};
      size_t s = read(0,buffer,sizeof(buffer)-1);//少读取一个,是为了在最后的位置放\0
      if(s > 0)
      {
          buffer[s] = 0;
          write(1,buffer,sizeof(buffer));
          printf("read success: s:%d errno:%d \n",s,errno);
      }
      else 
      {
          printf("read failed: s:%d errno:%d \n",s,errno); 
      }
      
      sleep(1);
   }
   
   return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

注意:上面的是错误的!如果非阻塞调用没有获取到数据时,是以出错的形式返回的,但并不算真正的错误,通过错误码errno区分出错和条件未就绪。

  • 我们读取数据,如果数据没有就绪和真正的发生错误都是使用同样的方式进行标识,如何进一步区分呢? ->使用错误码errno!来区别真正的出错和数据没有就绪的"出错"。

(7)区别真正的出错和数据没有就绪的"出错":

  • ①当read函数以非阻塞方式读取标准输入时,如果底层数据不就绪,那么read函数就会立即返回,但当底层数据不就绪时,read函数是以出错的形式返回的,此时的错误码会被设置为EAGAIN或EWOULDBLOCK。

  • ②因此在以非阻塞方式读取数据时,如果调用read函数时得到的返回值是-1,此时还需要通过错误码进一步进行判断,如果错误码的值是EAGAIN或EWOULDBLOCK,说明本次调用read函数出错是因为底层数据还没有就绪,因此后续还应该继续调用read函数进行轮询检测数据是否就绪,当数据继续时再进行数据的读取。

  • ③调用read函数在读取到数据之前可能会被其他信号中断,此时read函数也会以出错的形式返回,此时的错误码会被设置为EINTR,此时应该重新执行read函数进行数据的读取。

  • ④因此在以非阻塞的方式读取数据时,如果调用read函数读取到的返回值为-1,此时并不应该直接认为read函数在底层读取数据时出错了,而应该继续判断错误码,如果错误码的值为EAGAIN、EWOULDBLOCK或EINTR则应该继续调用read函数再次进行读取。

(8)正确的非阻塞轮询的代码:

int main()
{
    SetNonBlock(0);//将0号描述符设置为非阻塞状态
    while (true)
    {
        char buffer[1024] = {0};
        ssize_t s = read(0, buffer, sizeof(buffer)-1);//少读取一个,是为了在最后的位置放\0
        if(s > 0)
        {
            buffer[s] = 0;
            write(1,buffer,sizeof(buffer));
            printf("read success: s:%d errno:%d \n",s,errno);
        }
        else
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)//底层数据没有就绪
            { 
                printf("数据没有准备好,再试一下吧!\n");
                printf("read failed: s:%d errno:%d \n",s,errno);
                sleep(1);
                continue;
            }
            else if (errno == EINTR)//在读取数据之前被信号中断
            { 
                printf("数据被信号中断了,再试一下吧!\n");
                sleep(1);
                continue;
            }
            else
            {
                printf("读取错误!\n");
                break;
            }
        }
    }
    
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 运行代码后,当我们没有输入数据时,程序就会不断调用read函数检测底层数据是否就绪。
  • 一旦我们进行了输入操作,此时read函数就会在轮询检测时检测到,紧接着立马将数据读取到从内核拷贝到我们传入的buffer数组当中,并且调用write函数,读取到的数据输出到显示器上面。

采用0号描述符进行操作是因为0号描述符必须要用户参与,必须得有数据输入,条件才能就绪。

6. IO事件就绪

  1. IO事件就绪可以理解为两方面:一是读事件就绪,一是写事件就绪,可以认为接收缓冲区有数据就是读事件就绪,发送缓冲区里无数据就是写事件就绪。一旦等的事件就绪,我们就可以进行拷贝/读取了。

  2. 但事实上,频繁读写系统内核中的发送接收缓冲区会进行状态切换,期间会进行一系列的处理工作,会带来效率的下降,所以一般接收缓冲区中设有高水位,发送缓冲区中设有低水位的概念。

  3. 当超过"水位"才进行发送/通知上层读取。

7. I/O多路转接之Select

7.1 初识select

系统提供select函数来实现多路复用输入/输出模型:

  • select系统调用是用来让我们的程序监视多个文件描述符的状态变化的。
  • 程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变。

7.2 Select函数原型

int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
  • 1

(1)参数解释:

  • 参数nfds是需要监视的最大的文件描述符值+1;
  • rdset、wrset、exset分别对应于需要检测的可读文件描述符的集合,可写文件描述符的集合及异常文件描述符的集合;
  • 参数timeout为结构timeval,用来设置select()的等待时间;

(2)参数timeout的设置:

  • NULL:则表示select()没有timeout,select将一直被阻塞,直到某个文件描述符上发生了事件;
  • 0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生。
  • 特定的时间值:如果在指定的时间段里没有事件发生,select将超时返回。

struct timeval timeout = {0, 0} //表示非阻塞
struct timeval timeout = {5, 0} //5秒以内阻塞式,超过5秒,非阻塞返回一次 
  • 1
  • 2

(4)fd_set类型解析

①在使用select函数时,就免不了用到fd_set结构体。那fd_set就是怎么样的?下面我们先看在man手册中关于select:

②那么fd_set究竟是什么?

typedef long int __fd_mask;


/* It's easier to assume 8-bit bytes than to get CHAR_BIT. */
#define __NFDBITS (8 * (int) sizeof (__fd_mask))
#define __FDELT(d) ((d) / __NFDBITS)
#define __FDMASK(d) ((__fd_mask) 1 << ((d) % __NFDBITS))

/* fd_set for select and pselect. */
typedef struct
  {
    /* XPG4.2 requires this member name. Otherwise avoid the name
       from the global namespace. */
#ifdef __USE_XOPEN
    __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
    __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
  } fd_set;

/* Maximum number of file descriptors in `fd_set'. */
#define FD_SETSIZE __FD_SETSIZE   //__FD_SETSIZE等于1024

/* Access macros for `fd_set'.  */
#define FD_SET(fd, fdsetp)      __FD_SET (fd, fdsetp)
#define FD_CLR(fd, fdsetp)      __FD_CLR (fd, fdsetp)
#define FD_ISSET(fd, fdsetp)    __FD_ISSET (fd, fdsetp)
#define FD_ZERO(fdsetp)         __FD_ZERO (fdsetp)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

③根据分析,我么可以把这个结构理解为一个整数数组,更严格的说是一个 “位图”。使用位图中对应的位来表示要监视的文件描述符。并且还提供了一组操作fd_set的接口函数,来方便的操作该位图!

void FD_CLR(int fd, fd_set *set); // 用来清除描述词组set中相关fd 的位
int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组set中相关fd 的位是否为真
void FD_SET(int fd, fd_set *set); // 用来设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位
  • 1
  • 2
  • 3
  • 4

(5)函数的返回值

  • 执行成功则返回文件描述词状态已改变的个数。
  • 如果返回0代表在描述词状态改变前已超过timeout时间,没有返回。
  • 当有错误发生时则返回-1,错误原因存于errno,此时参数readfds,writefds, exceptfds和timeout的值变成不可预测。

(6)其中,错误值可能为:

  • EBADF 文件描述词为无效的或该文件已关闭。
  • EINTR 此调用被信号所中断。
  • EINVAL 参数n 为负值。
  • ENOMEM 核心内存不足。

7.3 理解Select的执行过程

理解select模型的关键在于理解fd_set,为说明方便,取fd_set长度为1字节,fd_set中的每一bit可以对应一个文件描述符fd。则1字节长的fd_set最大可以对应8个fd。

  1. 执行fd_set set; FD_ZERO(&set);则set用位表示是0000,0000。
  2. 若fd=5,执行FD_SET(fd,&set);后set变为0001,0000(第5位置为1) 。
  3. 若再加入fd=2,fd=1,则set变为0001,0011 。
  4. 执行select(6,&set,0,0,0)阻塞等待 。
  5. 若fd=1,fd=2上都发生可读事件,则select返回,此时set变为0000,0011。注意:没有事件发生的fd=5被清空。

7.4 socket就绪条件

7.4.1 读就绪

  • socket内核中,接收缓冲区中的字节数,大于等于低水位标记SO_RCVLOWAT。此时可以无阻塞的读该文件描述符,并且返回值大于0。
  • socket TCP通信中,对端关闭连接,此时对该socket读,则返回0。
  • 监听的socket上有新的连接请求。
  • socket上有未处理的错误。

7.4.2 写就绪

  • socket内核中,发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小),大于等于低水位标记SO_SNDLOWAT,此时可以无阻塞的写,并且返回值大于0。
  • socket的写操作被关闭(close或者shutdown),对一个写操作被关闭的socket进行写操作,会触发SIGPIPE信号。
  • socket使用非阻塞connect连接成功或失败之后。
  • socket上有未读取的错误。

7.4.3 异常就绪

  • socket上收到带外数据。关于带外数据,和TCP紧急模式相关(TCP协议头中,有一个紧急指针的字段)。

7.5 Select的特点

  1. 可监控的文件描述符个数取决与sizeof(fd_set)的值。我这边服务器上sizeof(fd_set)=512,每bit表示一个文件描述符,则我服务器上支持的最大文件描述符是512*8=4096。
  2. 将fd加入select监控集的同时,还要再使用一个数据结构array保存放到select监控集中的fd。

这里我们思考为什么需要额外的数据结构array保存放到select监控集中的fd?

  1. 一方面是用于再select 返回后,array作为源数据和fd_set进行FD_ISSET判断。
  2. 另一方面是select返回后会把以前加入的但并无事件发生的fd清空,则每次开始select前都要重新从array取得fd逐一加入(FD_ZERO最先),扫描array的同时取得fd最大值maxfd,用于select的第一个参数。

也正是因为以上两点,Select也有不得不说的缺点!

7.6 Select缺点

  • 每次调用select,都需要手动设置fd集合,从接口使用角度来说也非常不便。
  • 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大。
  • 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。
  • select支持的文件描述符数量太小。

7.7 Select使用示例

(1)SelectServer.hpp:

#pragma once
#include <iostream>
#include <sys/select.h>
#include <sys/time.h>
#include "Socket.hpp"

static const uint16_t defaultport = 8080;
static const int fd_num_max = (sizeof(fd_set) * 8);
int defaultfd = -1;

class SelectServer
{
public:
    SelectServer(uint16_t port = defaultport)
        :_port(port)
    {
        for(int i = 0; i < fd_num_max; i++)
        {
            fd_array[i] = defaultfd;
        }
    }

    bool Init()
    {
        _listensock.Socket();
        _listensock.Bind(_port);
        _listensock.Listen();

        return true;
    }

    void Start()
    {
        int listensock = _listensock.Getsock();
        fd_array[0] = listensock;
        while(1)
        {
            fd_set rfds;
            FD_ZERO(&rfds);   // 用来清除描述词组set的全部位

            int maxfd = fd_array[0];
            for(int i = 0; i < fd_num_max; i++)
            {
                if(fd_array[i] == defaultfd)
                {
                    continue;
                }

                FD_SET(fd_array[i], &rfds);    // 用来设置描述词组set中相关fd的位
                if(fd_array[i] > maxfd)
                {
                    maxfd = fd_array[i];
                    LOG(NORMAL, "max fd update");
                }
            }

            timeval timeout = {3, 0};
            int n = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);
            switch (n)
            {
            case 0:
                std::cout << "time out, timeout: " << timeout.tv_sec << "." << timeout.tv_usec << std::endl;
                break;
            case -1:
                std::cerr << "select error" << std::endl;
                break;
            default:
                // 有事件就绪了,TODO
                std::cout << "get a new link!!!!!" << std::endl;
                Dispatcher(rfds);   // 就绪的事件和fd你怎么知道只有一个呢???
                break;
            }
        }
    }

    void Dispatcher(fd_set rfds)
    {
        for(int i = 0; i < fd_num_max; i++)
        {
            int fd = fd_array[i];
            if (fd == defaultfd)
            {
                continue;
            }

            if(FD_ISSET(fd, &rfds))  // 用来测试描述词组set中相关fd 的位是否为真
            {
                if (fd == _listensock.Getsock())   //说明是监听套接字就绪
                {
                    Accepter(); // 连接管理器
                }
                else // non listenfd
                {
                    Recver(fd, i);
                }
            }
        }
    }

    void Accepter()
    {
        // 连接事件就绪了
        int sockfd = _listensock.Accept();
        if(sockfd < 0)
        {
            return;
        }

        LOG(NORMAL, "accept success");
        int pos = 0;
        for(pos = 1; pos < fd_num_max; pos++)
        {
            if(fd_array[pos] != defaultfd)
            {
                continue;
            }
            else
            {
                break;
            }
        }

        if(pos == fd_num_max)
        {
            LOG(WARNING, "server is full");
            close(sockfd);
        }
        else
        {
            fd_array[pos] = sockfd;
            PrintFd();
        }
    }

    void Recver(int fd, int pos)
    {
        char buffer[1024];
        int n = read(fd, buffer, sizeof(buffer) - 1);
        if(n > 0)
        {
            buffer[n] = 0;
            std::cout << "get a messge: " << buffer << std::endl;
        }
        else if(n == 0)
        {
            LOG(NORMAL, "client quit");
            close(fd);
            fd_array[pos] = defaultfd; // 这里本质是从select中移除
        }
        else
        {
            LOG(WARNING, "client quit");
            close(fd);
            fd_array[pos] = defaultfd; // 这里本质是从select中移除
        }
    }

    void PrintFd()
    {
        std::cout << "online fd list: ";
        for (int i = 0; i < fd_num_max; i++)
        {
            if (fd_array[i] == defaultfd)
            {
                continue;
            }

            std::cout << fd_array[i] << " ";
        }

        std::cout << std::endl;
    }

    ~SelectServer()
    {
        _listensock.Close();
    }

private:
    Sock _listensock;
    uint16_t _port;
    int fd_array[fd_num_max];   // 数组, 用户维护的!
    // int wfd_array[fd_num_max];
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184

(2)test.cpp:

#include "SelectServer.hpp"
#include <memory>

int main()
{
    std::unique_ptr<SelectServer> svr(new SelectServer());
    svr->Init();
    svr->Start();

    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

8. I/O多路转接之Poll

8.1 Poll函数接口

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • 1
  • 2

(1)参数说明:

  • fds是一个poll函数监听的结构列表,每一个元素中,包含了三部分内容:文件描述符、监听的事件集合、返回的事件集合。
  • nfds表示fds数组的长度。
  • timeout表示poll函数的超时时间,单位是毫秒(ms)。

(2)pollfd结构体的定义:

// pollfd结构
struct pollfd 
{
	int fd;         /* file descriptor */
    short events;     /* requested events */
    short revents;    /* returned events */
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • fd:特定的文件描述符,若设置为负值,则忽略events字段,并且revents字段返回。
  • events:需要监视的文件描述符上的哪些事件。
  • revents:poll 函数返回时告知用户该文件描述符上的哪些事件已经就绪。

(3)events 和 revents 的取值:

事件描述是否可作为输入是否可作为输出
POLLIN数据(包括普通数据和优先数据)可读
POLLRDNORM普通数据可读
POLLRDBAND优先级带数据可读(Linux不支持)
POLLPRI高优先级数据可读,比如TCP带外数据
POLLOUT数据(包括普通数据和优先数据)可读
POLLWRNORM普通数据可写
POLLWRBAND优先级带数据可写
POLLRDHUPTCP连接被对方关闭,或者对方关闭了写操作,它由GNU引入
POLLERR错误
POLLHUP挂起,比如管道的写端被关闭后,读端描述符上将收到POLLHUP事件
POLLNVAL文件描述符没有打开

这些取值实际上都是以宏的方式进行定义的,二进制序列当中只要一个比特位是1,且1的位置各不相同:

(4)返回结果:

  • 返回值小于0,表示出错。
  • 返回值等于0,表示poll函数等待超时。
  • 返回值大于0,表示poll由于监听的文件描述符就绪而返回。

8.2 Poll的优点

  • struct pollfd 结构当中包含了 events 和 revents,相当于select 函数的输入输出型参数进行分离,不再适用 select 参数 - 值 传递的方式,接口适用更简单
  • poll 并没有最大数量限制(但是数量过大后性能也会下降)

8.3 Poll的缺点

  • poll 中监听的文件描述符数量太多时和 select函数一样,poll 返回后,需要轮询 pollfd 来获取就绪的描述符。
  • 每次调用 poll 都需要把大量的pollfd 结构从用户态拷贝到内核中。
  • 同时连接的大量客户端在一起时刻,可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

8.4 Poll使用示例

(1)PollServer.hpp:

#pragma once
#include <iostream>
#include <poll.h>
#include <sys/time.h>
#include "Socket.hpp"

static const uint16_t defaultport = 8080;
static const int fd_num_max = (sizeof(fd_set) * 8);
int defaultfd = -1;
int non_event = 0;

class PollServer
{
public:
    PollServer(uint16_t port = defaultport)
        :_port(port)
    {
        for(int i = 0; i < fd_num_max; i++)
        {
            _event_fds[i].fd = defaultfd;
            _event_fds[i].events = non_event;
            _event_fds[i].revents = non_event;
        }
    }

    bool Init()
    {
        _listensock.Socket();
        _listensock.Bind(_port);
        _listensock.Listen();

        return true;
    }

    void Start()
    {
        int listensock = _listensock.Getsock();
        _event_fds[0].fd = listensock;
        _event_fds[0].events = POLLIN;
        int timeout = 3000; // 3s
        while(1)
        {
            int n = poll(_event_fds, fd_num_max, timeout);
            switch (n)
            {
            case 0:
                std::cout << "time out, timeout: " << std::endl;
                break;
            case -1:
                std::cerr << "select error" << std::endl;
                break;
            default:
                // 有事件就绪了,TODO
                std::cout << "get a new link!!!!!" << std::endl;
                Dispatcher();   // 就绪的事件和fd你怎么知道只有一个呢???
                break;
            }
        }
    }

    void Dispatcher()
    {
        for(int i = 0; i < fd_num_max; i++)
        {
            int fd = _event_fds[i].fd;
            if (fd == defaultfd)
            {
                continue;
            }

            if(_event_fds[i].events & POLLIN)  // 用来测试描述词组set中相关fd 的位是否为真
            {
                if (fd == _listensock.Getsock())   //说明是监听套接字就绪
                {
                    Accepter(); // 连接管理器
                }
                else // non listenfd
                {
                    Recver(fd, i);
                }
            }
        }
    }

    void Accepter()
    {
        // 连接事件就绪了
        int sockfd = _listensock.Accept();
        if(sockfd < 0)
        {
            return;
        }

        LOG(NORMAL, "accept success");
        int pos = 0;
        for(pos = 1; pos < fd_num_max; pos++)
        {
            if(_event_fds[pos].fd != defaultfd)
            {
                continue;
            }
            else
            {
                break;
            }
        }

        if(pos == fd_num_max)
        {
            LOG(WARNING, "server is full");
            close(sockfd);
        }
        else
        {
            _event_fds[pos].fd = sockfd;
            _event_fds[pos].events = POLLIN;
            _event_fds[pos].revents = non_event;
            PrintFd();
        }
    }

    void Recver(int fd, int pos)
    {
        char buffer[1024];
        int n = read(fd, buffer, sizeof(buffer) - 1);
        if(n > 0)
        {
            buffer[n] = 0;
            std::cout << "get a messge: " << buffer << std::endl;
        }
        else if(n == 0)
        {
            LOG(NORMAL, "client quit");
            close(fd);
            _event_fds[pos].fd = defaultfd; // 这里本质是从select中移除
        }
        else
        {
            LOG(WARNING, "client quit");
            close(fd);
            _event_fds[pos].fd = defaultfd; // 这里本质是从select中移除
        }
    }

    void PrintFd()
    {
        std::cout << "online fd list: ";
        for (int i = 0; i < fd_num_max; i++)
        {
            if (_event_fds[i].fd == defaultfd)
            {
                continue;
            }

            std::cout << _event_fds[i].fd << " ";
        }

        std::cout << std::endl;
    }

    ~PollServer()
    {
        _listensock.Close();
    }

private:
    Sock _listensock;
    uint16_t _port;
    pollfd _event_fds[fd_num_max]; // 数组, 用户维护的!
    // int wfd_array[fd_num_max];
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171

(2)test.cpp:

#include "PollServer.hpp"
#include <memory>

int main()
{
    std::unique_ptr<PollServer> svr(new PollServer());
    svr->Init();
    svr->Start();

    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

9. I/O多路转接之epoll

9.1 epoll初识

按照man手册的说法:是为处理大批量句柄而作了改进的poll。
它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

9.2 epoll的相关系统调用

epoll 有3个相关的系统调用,这一点和之前的poll是完全不一样的,还有一点是epoll还是一样只负责等待。

9.2.1 epoll_create

#include <sys/epoll.h>

int epoll_create(int size);
  • 1
  • 2
  • 3


创建一个epoll的句柄:

  • 自从linux2.6.8之后, size参数是被忽略的。
  • 用完之后,必须调用close()关闭。

9.2.2 epoll_ctl

#include <sys/epoll.h>

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • 1
  • 2
  • 3

(1)epoll的事件注册函数:

  • 它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
  • 第一个参数是epoll_create()的返回值(epoll的句柄)。
  • 第二个参数表示动作,用三个宏来表示。
  • 第三个参数是需要监听的fd。
  • 第四个参数是告诉内核需要监听什么事。

(2)第二个参数的取值:

  • EPOLL_CTL_ADD:注册新的fd到epfd中;
  • EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
  • EPOLL_CTL_DEL:从epfd中删除一个fd;

(3)struct epoll_event 类型数据定义如下,该类型包含两个成员,其中一个为uint32_t类型成员events,用于控制该事件的属性是可读、可写还是异常等。events可以为表示4.2中宏的集合,还有一个为联合自定义类型数据,其中该联合类型可以传四种不同的数据表达不同的意义,但一般使用fd来指定文件描述符。

typedef union epoll_data 
{
    void      *ptr;
    int        fd;
    uint32_t   u32;
    uint64_t   u64;
} epoll_data_t;
 
struct epoll_event 
{
    uint32_t   events;   /* Epoll events */
    epoll_data_t data;   /* User data variable */
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

(4)events可以是以下几个宏的集合:

  • EPOLLIN:表示对应的文件描述符可以读 (包括对端SOCKET正常关闭)。
  • EPOLLOUT:表示对应的文件描述符可以写。
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来)。
  • EPOLLERR:表示对应的文件描述符发生错误。
  • EPOLLHUP:表示对应的文件描述符被挂断。
  • EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

9.2.3 epoll_wait

#include <sys/epoll.h>

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
  • 1
  • 2
  • 3


函数功能是收集在epoll监控的事件中已经发送的事件:

  • 参数events是分配好的epoll_event结构体数组。
  • epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存)。
  • maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size。
  • 参数timeout是超时时间 (毫秒, 0会立即返回, -1是永久阻塞)。
  • 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败。

9.3 epoll底层原理


(1)当某一进程调用epoll_create方法时, Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关:

struct eventpoll
{
	....
	/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
	struct rb_root rbr;
	/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
	struct list_head rdlist;
	....
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  1. 每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件;
  2. 这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度);
  3. 而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法;
  4. 这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中;
  5. 在epoll中,对于每一个事件,都会建立一个epitem结构体;
struct epitem
{
	struct rb_node rbn;//红黑树节点
	struct list_head rdllink;//双向链表节点
	struct epoll_filefd ffd; //事件句柄信息
	struct eventpoll *ep; //指向其所属的eventpoll对象
	struct epoll_event event; //期待发生的事件类型
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。
  • 如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户,这个操作的时间复杂度是O(1)。

(2)总结一下,epoll的使用过程就是三部曲:

  • 调用epoll_create创建一个epoll句柄。
  • 调用epoll_ctl,将要监控的文件描述符进行注册。
  • 调用epoll_wait,等待文件描述符就绪。

9.4 epoll的优点

  • 接口使用方便。虽然拆分成了三个函数,但是反而使用起来更方便高效,不需要每次循环都设置关注的文件描述符,也做到了输入输出参数分离开。
  • 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中,这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)。
  • 事件回调机制:避免使用遍历,而是使用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中。 epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪,这个操作时间复杂度O(1)。即使文件描述符数目很多,效率也不会受到影响。
  • 没有数量限制:文件描述符数目无上限。

9.5 epoll工作方式

epoll有2种工作方式:水平触发(LT)和边缘触发(ET)

你妈喊你吃饭的例子:当你正在玩吃鸡的时候,眼看进入了决赛圈,你妈饭做好了,喊你吃饭的时候有两种方式:

  • 如果你妈喊你一次,你没动,那么你妈会继续喊你第二次,第三次…(亲妈,水平触发)。
  • 如果你妈喊你一次,你没动,你妈就不管你了(后妈,边缘触发)。

9.5.1 水平触发Level Triggered 工作模式

  • 当epoll检测到socket上事件就绪的时候,可以不立刻进行处理,或者只处理一部分。
  • 由于只读了1K数据,缓冲区中还剩1K数据,在第二次调用 epoll_wait 时,epoll_wait仍然会立刻返回并通知socket读事件就绪。
  • 直到缓冲区上所有的数据都被处理完,epoll_wait 才不会立刻返回。
  • 支持阻塞读写和非阻塞读写。

9.5.2 边缘触发Edge Triggered工作模式

如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式。

  • 当epoll检测到socket上事件就绪时,必须立刻处理。
  • 虽然只读了1K的数据,缓冲区还剩1K的数据,在第二次调用 epoll_wait 的时候,epoll_wait 不会再返回了。
  • 也就是说ET模式下,文件描述符上的事件就绪后,只有一次处理机会。
  • ET的性能比LT性能更高( epoll_wait 返回的次数少了很多)。Nginx默认采用ET模式使用epoll。
  • 只支持非阻塞的读写。

9.5.3 epoll的使用场景

epoll的高性能,是有一定的特定场景的,如果场景选择的不适宜,epoll的性能可能适得其反。

  • 对于多连接,且多连接中只有一部分连接比较活跃时,比较适合使用epoll。
  • 例如:典型的一个需要处理上万个客户端的服务器,例如各种互联网APP的入口服务器,这样的服务器就很适合epoll。如果只是系统内部,服务器和服务器之间进行通信,只有少数的几个连接,这种情况下用epoll就并不合适,具体要根据需求和场景特点来决定使用哪种IO模型。

9.6 对比LT和ET

  • LT是 epoll 的默认行为。使用 ET 能够减少 epoll 触发的次数,但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完。
  • 相当于一个文件描述符就绪之后,不会反复被提示就绪,看起来就比 LT 更高效一些,但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理,不让这个就绪被重复提示的话,其实性能也是一样的。
  • 另一方面,ET 的代码复杂程度更高了。

9.7 理解ET模式和非阻塞文件描述符

使用 ET 模式的 epoll需要将文件描述设置为非阻塞。这个不是接口上的要求, 而是 “工程实践” 上的要求。

(1)假设这样的场景:服务器接受到一个10k的请求,会向客户端返回一个应答数据,如果客户端收不到应答,不会发送第二个10k请求。

(2)如果服务端写的代码是阻塞式的read,并且一次只 read 1k 数据的话(read不能保证一次就把所有的数据都读出来,参考 man 手册的说明。可能被信号打断),剩下的9k数据就会待在缓冲区中。

此时由于 epoll 是ET模式,并不会认为文件描述符读就绪,epoll_wait 就不会再次返回,剩下的 9k 数据会一直在缓冲区中,直到下一次客户端再给服务器写数据,epoll_wait 才能返回。但是问题来了:

  • 服务器只读到1k个数据,要10k读完才会给客户端返回响应数据。
  • 客户端要读到服务器的响应。
  • 客户端发送了下一个请求,epoll_wait 才会返回,才能去读缓冲区中剩余的数据。


所以,为了解决上述问题(阻塞read不一定能一下把完整的请求读完),于是就可以使用非阻塞轮训的方式来读缓冲区,保证一定能把完整的请求都读出来。

而如果是LT没这个问题。只要缓冲区中的数据没读完,epoll_wait 返回文件描述符读就绪。

9.8 epoll使用示例(LT模式)

(1)Poller.hpp:

#pragma once
#include <cerrno>
#include <cstring>
#include <sys/epoll.h>
#include "Log.hpp"
#include <unistd.h>

class Poller
{
public:
    Poller()
    {
        _epfd = epoll_create(128);
        if(_epfd == -1)
        {
            LOG(FATAL, "epoll_create error");
        }
        else
        {
            LOG(NORMAL, "epoll_create success");
        }
    }

    int EpllerUpdate(int oper, int sock, int event)
    {
        int n = 0;
        if(oper == EPOLL_CTL_DEL)
        {
            n = epoll_ctl(_epfd, oper, sock, nullptr);
            if(n != 0)
            {
                LOG(FATAL, "epoll_ctl delete error!");
            }
        }
        else
        {
            epoll_event ev;
            ev.events = event;
            ev.data.fd = sock;

            n = epoll_ctl(_epfd, oper, sock, &ev);
            if(n != 0)
            {
                LOG(FATAL, "epoll_ctl error!");
            }
        }

        return n;
    }

    int EpollerWait(epoll_event revents[], int num)
    {
        int n = epoll_wait(_epfd, revents, num, _timeout);
        return n;
    }

    ~Poller()
    {
        close(_epfd);
    }

private:
    int _epfd;
    int _timeout{3000};
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

(2)EpollServer.hpp:

#pragma once
#include <iostream>
#include <memory>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Poller.hpp"
#include "Log.hpp"

class EpollServer
{
    static const int num = 64;

public:
    EpollServer(uint16_t port)
        : _port(port),
          _listsocket_ptr(new Sock()),
          _epoller_ptr(new Poller())
    {}

    void Init()
    {
        _listsocket_ptr->Socket();
        _listsocket_ptr->Bind(_port);
        _listsocket_ptr->Listen();

        LOG(NORMAL, "create listen socket success");
    }

    void Start()
    {
        int listsockfd = _listsocket_ptr->Getsock();
        // 将listsockfd添加到epoll中 -> listensock和他关心的事件,添加到内核epoll模型中rb_tree.
        _epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, listsockfd, EPOLLIN);
        epoll_event revs[num];
        while(1)
        {
            int n = _epoller_ptr->EpollerWait(revs, num);
            if(n > 0)
            {
                // 有事件就绪
                LOG(DEBUG, "event happened");
                Dispatcher(revs, n);
            }
            else if (n == 0)
            {
                LOG(NORMAL, "time out ...");
            }
            else
            {
                LOG(FATAL, "epll wait error");
            }
        }
    }

    void Dispatcher(epoll_event revs[], int num)
    {
        for(int i = 0; i < num; i++)
        {
            uint32_t events = revs[i].events;
            int fd = revs[i].data.fd;
            if(events & EPOLLIN)
            {
                if(fd == _listsocket_ptr->Getsock())
                {
                    Accepter();
                }
                else
                {
                    // 其他fd上面的普通读取事件就绪
                    Recver(fd);
                }
            }
            else if(events & EPOLLOUT)
            {
                ;
            }
            else
            {
                ;
            }
        } 
    }

    void Accepter()
    {   
        int sockfd = _listsocket_ptr->Accept();
        if(sockfd > 0)
        {
            // 我们不能直接读取吗
            _epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, sockfd, EPOLLIN);
            LOG(NORMAL, "get a new link");
        }
    }

    void Recver(int fd)
    {
        char buffer[1024];
        ssize_t n = read(fd, buffer, sizeof(buffer) - 1); 
        if(n > 0)
        {
            buffer[n] = 0;
            std::cout << "get a messge: " << buffer << std::endl;
            // wrirte
            std::string echo_str = "server echo $ ";
            echo_str += buffer;
            write(fd, echo_str.c_str(), echo_str.size());
        }
        else if(n == 0)
        {
            LOG(NORMAL, "client quit");
            //细节
            _epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
            close(fd);
        }
        else
        {
            LOG(FATAL, "recv error");
            _epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
            close(fd);
        }
    }

private:
    std::shared_ptr<Sock> _listsocket_ptr;
    std::shared_ptr<Poller> _epoller_ptr;
    uint16_t _port;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127

(3)test.cpp:

#include <iostream>
#include <memory>
#include "EpollServer.hpp"

int main()
{ 
    std::unique_ptr<EpollServer> svr(new EpollServer(8080));
    svr->Init();
    svr->Start();

    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

9.9 epoll使用示例(ET模式)

(1)基于 LT 版本稍加修改即可:

  • 修改TcpServer.hpp,新增非阻塞读和非阻塞写接口。
  • 对于 accept 返回的sockfd加上 EPOLLET 这样的选项。

注意:此代码考虑 listen_sock ET 的情况,如果将 listen_sock 设为 ET,则需要非阻塞轮询的方式 accept,否则会导致同一时刻大量的客户端同时连接的时候,只能 accept 一次的问题。

(2)由于ET模式的特性;我们需要每次把数据一次性读完,避免丢失;这就需要每个描述符sock都有一个收发、错误、缓冲区;并能执行相应的回调方法。


已经描述了;就需要我们用unordered_map统一组织起来。(便于我们再回调函数中实现)

(3)对于读取操作:

  1. 当buffer由不可读状态变为可读的时候,即由空变为不空的时候。
  2. 当有新数据到达时,即buffer中的待读内容变多的时候。
  3. 当buffer中有数据可读(即buffer不空)且用户对相应fd进行epoll_mod修改为epol_IN事件时。

(4)对于写操作:

  1. 当buffer由不可写变为可写的时候,即由满状态变为不满状态的时候。
  2. 当有旧数据被发送走时,即buffer中待写的内容变少得时候。
  3. 当buffer中有可写空间(即buffer不满)且用户对相应fd进行epoll_mod修改为epol_OUT事件时

(5)TcpServer.hpp:

#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <cerrno>
#include <functional>
#include <unordered_map>
#include <unistd.h>
#include <fcntl.h>
#include "Log.hpp"
#include "Poller.hpp"
#include "Socket.hpp"

void SetNonBlockOrDie(int sock)
{
    int fl = fcntl(sock, F_GETFL);
    if(fl < 0)
    {
        exit(-1);
    }

    fcntl(sock, F_SETFL, fl | O_NONBLOCK);
}

class Connection;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
const static int g_buffer_size = 128;

using func_t = std::function<void(std::weak_ptr<Connection>)>;

class Connection
{
public:
    Connection(int sockfd)
        :_sockfd(sockfd)
    {}

    int Getsock()
    {
        return _sockfd;
    }
    
    void SetHander(func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }

    void AppendInBuffer(const std::string& in)
    {
        _in_buffer += in;
    }

    void AppendOutBuffer(const std::string& out)
    {
        _out_buffer += out;
    }

    std::string& Inbuffer()
    {
        return _in_buffer;
    }

    std::string& Outbuffer()
    {
        return _out_buffer;
    }

    void SetWeakPtr(TcpServer* tcp_server_ptr)
    {
        _tcp_server_ptr = tcp_server_ptr;
    }

    ~Connection()
    {}

public:
    func_t _recv_cb;
    func_t _send_cb;
    func_t _except_cb;
    // 添加一个回指指针
    TcpServer* _tcp_server_ptr;

    std::string _ip;
    uint16_t _port;

private:
    int _sockfd;
    std::string _in_buffer;
    std::string _out_buffer;
};

// enable_shared_from_this:可以提供返回当前对象的this对应的shared_ptr
class TcpServer : public std::enable_shared_from_this<TcpServer>
{
    static const int num = 64;

public:
    TcpServer(uint16_t port, func_t OnMessage)
        :_port(port)
        ,_OnMessage(OnMessage)
        ,_quit(true)
        ,_epoller_ptr(new (Poller))
        ,_listensock_ptr(new (Sock))
    {}

    void Init()
    {
        _listensock_ptr->Socket();
        SetNonBlockOrDie(_listensock_ptr->Getsock());
        _listensock_ptr->Bind(_port);
        _listensock_ptr->Listen();

        LOG(NORMAL, "create listen socket success\n");
        AddConnection(_listensock_ptr->Getsock(), 
                      EVENT_IN, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
    }

    void AddConnection(int sockfd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb, const std::string& ip = "0.0.0.0", uint16_t port = 0)
    {
        // 1. 给sock也建立一个connection对象,将listensock添加到Connection中,同时,listensock和Connecion放入_connections
        std::shared_ptr<Connection> new_connection(new Connection(sockfd));
        new_connection->SetWeakPtr(this);  // shared_from_this(): 返回当前对象的shared_ptr
        //std::shared_ptr<Connection> new_connection = std::make_shared<Connection>(sockfd, std::shared_ptr<TcpServer>(this));
        new_connection->SetHander(recv_cb, send_cb, except_cb);
        new_connection->_ip = ip;
        new_connection->_port = port;

        //2. 添加到unordered_map
        _connections.insert(std::make_pair(sockfd, new_connection));

        // 3. 我们添加对应的事件,除了要加到内核中,fd, event
        _epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, sockfd, event);
    }

    void Accepter(std::weak_ptr<Connection> conn)
    {
        auto connection = conn.lock();
        while(1)
        {
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);
            int sockfd = accept(connection->Getsock(), (struct sockaddr*)&peer, &len);
            if(sockfd > 0)
            {
                uint16_t peerport = ntohs(peer.sin_port);
                char ipbuf[128];
                inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));
                printf("get a new client, get info-> [%s:%d], sockfd : %d\n", ipbuf, peerport, sockfd);

                SetNonBlockOrDie(sockfd);
                // listensock只需要设置_recv_cb, 而其他sock,读,写,异常
                AddConnection(sockfd, EVENT_IN,
                              std::bind(&TcpServer::Recver, this, std::placeholders::_1),
                              std::bind(&TcpServer::Sender, this, std::placeholders::_1),
                              std::bind(&TcpServer::Excepter, this, std::placeholders::_1),
                              ipbuf, peerport);
            }
            else
            {
                if (errno == EWOULDBLOCK)
                {
                    break;
                }
                else if(errno == EINTR)
                {
                    continue;
                }
                else
                {
                    break;
                }
            }
        }
    }

    void Recver(std::weak_ptr<Connection> conn)
    {
        if(conn.expired())
        {
            return;
        }

        auto connection = conn.lock();
        int sockfd = connection->Getsock();
        while(1)
        {
            char buffer[g_buffer_size];
            memset(buffer, 0, sizeof(buffer));
            ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取
            if(n > 0)
            {
                connection->AppendInBuffer(buffer);
            }
            else if(n == 0)
            {
                printf("sockfd: %d, client info %s:%d quit...\n", sockfd, connection->_ip.c_str(), connection->_port);
                connection->_except_cb(connection);
                return;
            }
            else
            {
                if(errno == EWOULDBLOCK)
                {
                    break;
                }
                else if(errno == EINTR)
                {
                    continue;
                }
                else
                {
                    printf("sockfd: %d, client info %s:%d recv error...\n", sockfd, connection->_ip.c_str(), connection->_port);
                    connection->_except_cb(connection);
                    return;
                }
            }
        }

        // 数据有了,但是不一定全,1. 检测 2. 如果有完整报文,就处理
        _OnMessage(connection); // 你读到的sock所有的数据connection
    }

    void Sender(std::weak_ptr<Connection> conn)
    {
        if(conn.expired())
        {
            return;
        }

        auto connection = conn.lock();
        auto& outbuffer = connection->Outbuffer();
        while(1)
        {
            ssize_t n = send(connection->Getsock(), outbuffer.c_str(), outbuffer.size(), 0);
            if(n > 0)
            {
                outbuffer.erase(0, n);
                if(outbuffer.empty())
                {
                    break;
                }
            }
            else if(n == 0)
            {
                return;
            }
            else
            {
                if(errno == EWOULDBLOCK)
                {
                    break;
                }
                else if(errno == EINTR)
                {
                    continue;
                }
                else
                {
                    printf("sockfd: %d, client info %s:%d send error...\n", connection->Getsock(), connection->_ip.c_str(), connection->_port);
                    connection->_except_cb(connection);
                    return;
                }
            }
        }

        if(!outbuffer.empty())
        {
            // 开启对写事件的关心
            EnableEvent(connection->Getsock(), true, true);
        }
        else
        {
            // 关闭对写事件的关心
            EnableEvent(connection->Getsock(), true, false);
        }
    }

    void Excepter(std::weak_ptr<Connection> conn)
    {
        if(conn.expired())
        {
            return;
        }

        auto connection = conn.lock();
        int fd = connection->Getsock();
        printf("Excepter hander sockfd: %d, client info %s:%d excepter handler\n",
            connection->Getsock(), connection->_ip.c_str(), connection->_port);
        
        // 1. 移除对特定fd的关心
        _epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);

        // 2. 关闭异常的文件描述符
        printf("close %d done...\n", fd);
        close(fd);

        // 3. 从unordered_map中移除
        printf("remove %d from _connections...\n", fd);
        _connections.erase(fd);
    }

    void EnableEvent(int sockfd, bool readable, bool writeable)
    {
        uint32_t events = 0;
        events |= ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
        _epoller_ptr->EpllerUpdate(EPOLL_CTL_MOD, sockfd, events);
    }

    bool IsConnectionSafe(int fd)
    {
        auto iter = _connections.find(fd);
        if(iter == _connections.end())
        {
            return false;
        }

        return true;
    }

    void Start()
    {
        _quit = false;
        while(1)
        {
            Dispatcher();
            PrintConnection();
        }

        _quit = true;
    }

    void Dispatcher()
    {
        int n = _epoller_ptr->EpollerWait(revs, num);
        for(int i = 0; i < n; i++)
        {
            uint32_t events = revs[i].events;
            int sockfd = revs[i].data.fd;

            if((events & EPOLLIN) && IsConnectionSafe(sockfd))
            {
                if(_connections[sockfd]->_recv_cb)
                {
                    _connections[sockfd]->_recv_cb(_connections[sockfd]);
                }
            }

            if((events & EPOLLOUT) && IsConnectionSafe(sockfd))
            {
                if(_connections[sockfd]->_send_cb)
                {
                    _connections[sockfd]->_send_cb(_connections[sockfd]);
                }
            }
        }
    }

    void PrintConnection()
    {
        std::cout << "_connections fd list: ";
        for (auto &connection : _connections)
        {
            std::cout << connection.second->Getsock() << ", ";
            std::cout << "inbuffer: " << connection.second->Inbuffer().c_str();
        }

        std::cout << std::endl;
    }

    ~TcpServer()
    {}

private:
    std::shared_ptr<Poller> _epoller_ptr;
    std::shared_ptr<Sock> _listensock_ptr;  // 监听socket, 可以把他移除到外部
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;

    struct epoll_event revs[num];
    uint16_t _port;
    bool _quit;
    // 让上层处理信息
    func_t _OnMessage;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  • 377
  • 378
  • 379
  • 380
  • 381
  • 382
  • 383
  • 384
  • 385
  • 386
  • 387
  • 388
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/464197
推荐阅读
相关标签
  

闽ICP备14008679号