赞
踩
这节需要我们明白为什么non_blocking网络编程中应用层的buffer是必要的。这节的很多内容都是陈硕《Linux多线程服务器编程》书中的内容原话。
在使用epoll中,我们一般是搭配非阻塞IO一起使用,而非阻塞IO的核心思想是避免阻塞在read()或write()或其他IO系统调用上,这样可以最大限度的复用thread-of-control。IO线程只能阻塞在IO multiplexing函数上,如select/poll/epoll_wait。这样一来,应用层的缓冲是必要的,每个 TCP sockt都需要有stateful的input buffer和output buffer。
假设程序想要发送100KB的数据,但是在调用write中,操作系统只接受了80KB(受TCP advertised window控制),那这时调用者肯定不想原地等待。如果有buffer,调用者只管将数据放入buffer,其它由网络库处理即可。对于应用程序而言,它不应该关心数据到底是一次性发送还是分成几次发送的,这些都由网络库来操心。
TCP是一个无边界的字节流协议,接收方必须要处理“收到的数据尚不构成一条完整的消息”和“一次收到两条消息的数据”等情况。如果有buffer,网络库收到数据之后,先放到input buffer,等构成一条完整的消息再通知程序进行业务逻辑的操作。
- class Buffer
- {
- public:
- static const size_t KCheapPprepend = 8; //数据包长度8字节
- static const size_t KInitailSize = 1024;//缓冲区初始的大小
- explicit Buffer(size_t initialSize=KInitailSize)
- :buffer_(KCheapPprepend+initialSize)
- ,readerIndex_(KCheapPprepend)
- ,writerIndex_(KCheapPprepend)
- {}
-
- //待读取的数据大小
- const size_t readableBytes()const {
- return writerIndex_ - readerIndex_;
- }
-
- //可写的空闲空间大小
- const size_t writeableBytes()const {
- return buffer_.size() - writerIndex_;
- }
-
- const size_t prependableBytes()const {
- return readerIndex_;
- }
-
- private:
- std::vector<char> buffer_;
- size_t readerIndex_;
- size_t writerIndex_;
-
- };
就是返回缓冲区中可读取的起始地址
- //缓冲区中可读取的起始位置
- const char* peek()const {
- return begin() + readerIndex_;
- }
-
- //返回Buffer底层的数据首地址
- char* begin() {
- return &*buffer_.begin();
- }
- //常对象只能调用常方法,不能调用普通的,而这里是一定要使用const char*,用char*会出现 "错误:从“const char*”到“char*”的无效转换[-fpermissive]"
- const char* begin()const {
- return &*buffer_.begin();
- }
用在读取了buffer中的可读取的数据后,更新readerIndex_,即是把readerIndex_往后移动len长度。
-
- //读取了数据后,更新readerIndex_
- void retrieve(size_t len) {
- if (len < readableBytes()) {
- readerIndex_ += len;
- }
- else {
- readerIndex_ = KCheapPprepend;
- writerIndex_ = KCheapPprepend;
- }
- }
不管是从fd中读取数据到缓冲区inputBuffer_,还是发送数据也要写入outputBuffer_,都是要往writeable区间内添加数据
- void append(const char* data, size_t len) {
- if (writeableBytes() < len) {
- makeSpace(len); //扩容
- }
- std::copy(data, data + len, beginWirte());
- writerIndex_ += len;
- }
-
- char* beginWirte() {
- return begin() + writerIndex_;
- }
- const char* beginWirte()const {
- return begin() + writerIndex_;
- }
-
- void makeSpace(size_t len) {
- if (writeableBytes() + prependableBytes() < len + KCheapPprepend) {
- buffer_.resize(writerIndex_ + len);
- }
- else {
- auto readable = readableBytes();
-
- //挪动
- std::copy(begin() + readerIndex_, begin() + writerIndex_, begin() + KCheapPprepend);
- //更新readerIndex_, writerIndex_
- readerIndex_ = KCheapPprepend;
- writerIndex_ = readerIndex_ + readable;
- }
- }
从fd上读取数据,存放到writeIndex_指向的地址,返回实际读取的数据大小
- ssize_t Buffer::readFd(int fd, int* saveErrno)
- {
- char extrabuffer[65535];
- struct iovec vec[2];
-
- auto writable = writeableBytes();
-
- vec[0].iov_base = begin() + writerIndex_; //第一块缓冲区
- vec[0].iov_len = writable; //iov_base缓冲区可写的空间大小
-
- vec[1].iov_base = extrabuffer; //第二快缓冲区
- vec[1].iov_len = sizeof(extrabuffer);
-
- //若Buffer有65535字节的空间空间,就不适用栈上的缓冲区
- auto iovcnt = (writable < sizeof(extrabuffer)) ? 2 : 1;
- auto n = ::readv(fd, vec, iovcnt);
- if (n < 0) {
- *saveErrno = errno;
- }
- else if (static_cast<size_t>(n) <= writable) {
- writerIndex_ += n;
- }
- else {
- //Buffer底层的可写空间不够存放n字节数据,
- writerIndex_ = buffer_.size(); //更新writerIndex_为末尾,再使用append
- append(extrabuffer, n - writable);
- }
-
- return n;
- }
Buffer缓冲区是有大小的(占用堆区内存),但是我们无法知道fd上的流式数据有多少,如果我们将缓冲区开的非常大,大到肯定是能容纳所有读取的数据,这就太浪费空间了,muduo使用readv(2)结合栈上空间巧妙解决了这种问题。
1.如果读取的数据不多并且Buffer写空间大小满足,则直接写入Buffer。
2。如果读取的数据长度超过Buffer写空间大小,超过的部分会读取到栈空间extrabuf,
然后程序再把extrabuf里的数据append()到buffer中。
添加了Buffer类后使用,在这节中用法变化不大,在Server类中添加了成员变量Buffer inputBuffer_,并且在成员函数handleEvent()中使用inputBuffer_。
完整源代码:https://github.com/liwook/CPPServer/tree/main/code/server_v8
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。