赞
踩
目录
在前面,我们学习了五种IO模型,对IO有了基本的认识,知道了select效率很高,可以等待多个文件描述符,那他是如何等待的呢?我们又该如何使用呢?
系统提供select函数来实现多路复用输入/输出模型
- select系统调用是用来让我们的程序监视多个文件描述符的状态变化的
- 程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变
select只负责等待,不负责拷贝,一次可以等待多个文件描述符。他的作用是让read和write不再阻塞。
select的调用接口如下
参数 1 int nfds:值最大的文件描述符+1。
参数 2 fd_set* readfds:fd_set本质是一张位图。代表select需要关心的读事件
参数 3 fd_set* writefds:代表select需要关心的读事件
参数 4 fd_set* execptfdsfds:代表select需要关心的异常事件,我们暂时不考虑
参数 5 struct timeval* timeout:时间结构体,成员有秒和微秒,代表等待的时间
{n,m}为阻塞等待n秒m微秒,时间结束后返回
{0,0}为非阻塞等待
nullptr为阻塞等待
参数2,3,4类似,都是输入输出型参数,参数5也是输入输出型参数,输出的是剩余时间
以readfds为例
输入时:比特位的位置,表示文件描述符的值,比特位的内容(0/1),用户关心内核,是否关心这个fd的读事件。
输出时:比特位的位置,表示文件描述符的值,比特位的内容(0/1),内核告诉用户,哪些文件fd上的读事件是否就绪
返回值:
- ret > 0 :select等待的多个fd中,已经就需要的fd个数
- ret == 0 :select超时返回
- ret < 0 :select出错
同时,fd_set 是特定的类型,我们对其赋值时,是不方便赋值的,因此库里面也给提供的一个函数,方便我们处理。
FD_CLR 从文件描述符集合
set
中清除文件描述符fd。
FD_ISSET 检查文件描述符
fd
是否在文件描述符集合set
中。FD_SET 将文件描述符
fd
添加到文件描述符集合set
中。FD_ZERO 清空文件描述符集合
set
,将其所有位都设置为零。
Log.hpp
- #pragma once
-
- #include <iostream>
- #include <cstdarg>
- #include <unistd.h>
- #include <sys/stat.h>
- #include <sys/types.h>
- #include <pthread.h>
- using namespace std;
-
- enum
- {
- Debug = 0,
- Info,
- Warning,
- Error,
- Fatal
- };
-
- enum
- {
- Screen = 10,
- OneFile,
- ClassFile
- };
-
- string LevelToString(int level)
- {
- switch (level)
- {
- case Debug:
- return "Debug";
- case Info:
- return "Info";
- case Warning:
- return "Warning";
- case Error:
- return "Error";
- case Fatal:
- return "Fatal";
-
- default:
- return "Unknown";
- }
- }
-
- const int default_style = Screen;
- const string default_filename = "Log.";
- const string logdir = "log";
-
- class Log
- {
- public:
- Log(int style = default_style, string filename = default_filename)
- : _style(style), _filename(filename)
- {
- if (_style != Screen)
- mkdir(logdir.c_str(), 0775);
- }
-
- // 更改打印方式
- void Enable(int style)
- {
- _style = style;
- if (_style != Screen)
- mkdir(logdir.c_str(), 0775);
- }
-
- // 时间戳转化为年月日时分秒
- string GetTime()
- {
- time_t currtime = time(nullptr);
- struct tm *curr = localtime(&currtime);
- char time_buffer[128];
- snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
- curr->tm_year + 1900, curr->tm_mon + 1, curr->tm_mday, curr->tm_hour, curr->tm_min, curr->tm_sec);
- return time_buffer;
- }
-
- // 写入到文件中
- void WriteLogToOneFile(const string &logname, const string &message)
- {
- FILE *fp = fopen(logname.c_str(), "a");
- if (fp == nullptr)
- {
- perror("fopen failed");
- exit(-1);
- }
- fprintf(fp, "%s\n", message.c_str());
-
- fclose(fp);
- }
-
- // 打印日志
- void WriteLogToClassFile(const string &levelstr, const string &message)
- {
- string logname = logdir;
- logname += "/";
- logname += _filename;
- logname += levelstr;
- WriteLogToOneFile(logname, message);
- }
-
- pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
- void WriteLog(const string &levelstr, const string &message)
- {
- pthread_mutex_lock(&lock);
- switch (_style)
- {
- case Screen:
- cout << message << endl; // 打印到屏幕中
- break;
- case OneFile:
- WriteLogToClassFile("all", message); // 给定all,直接写到all里
- break;
- case ClassFile:
- WriteLogToClassFile(levelstr, message); // 写入levelstr里
- break;
- default:
- break;
- }
- pthread_mutex_unlock(&lock);
- }
-
- // 提供接口给运算符重载使用
- void _LogMessage(int level, const char *file, int line, char *rightbuffer)
- {
- char leftbuffer[1024];
- string levelstr = LevelToString(level);
- string currtime = GetTime();
- string idstr = to_string(getpid());
-
- snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s][%s:%d]", levelstr.c_str(), currtime.c_str(), idstr.c_str(), file, line);
-
- string messages = leftbuffer;
- messages += rightbuffer;
- WriteLog(levelstr, messages);
- }
-
- // 运算符重载
- void operator()(int level, const char *file, int line, const char *format, ...)
- {
- char rightbuffer[1024];
- va_list args; // va_list 是指针
- va_start(args, format); // 初始化va_list对象,format是最后一个确定的参数
- vsnprintf(rightbuffer, sizeof(rightbuffer), format, args); // 写入到rightbuffer中
- va_end(args);
- _LogMessage(level, file, line, rightbuffer);
- }
-
- ~Log()
- {
- }
-
- private:
- int _style;
- string _filename;
- };
-
- Log lg;
-
- class Conf
- {
- public:
- Conf()
- {
- lg.Enable(Screen);
- }
- ~Conf()
- {
- }
- };
-
- Conf conf;
-
- // 辅助宏
- #define lg(level, format, ...) lg(level, __FILE__, __LINE__, format, ##__VA_ARGS__)

Socket.hpp
- #pragma once
-
- #include <iostream>
- #include <string>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <cstring>
- #include <unistd.h>
- using namespace std;
- namespace Net_Work
- {
- static const int default_backlog = 5;
- static const int default_sockfd = -1;
- using namespace std;
-
- enum
- {
- SocketError = 1,
- BindError,
- ListenError,
- ConnectError,
- };
-
- // 封装套接字接口基类
- class Socket
- {
- public:
- // 封装了socket相关方法
- virtual ~Socket() {}
- virtual void CreateSocket() = 0;
- virtual void BindSocket(uint16_t port) = 0;
- virtual void ListenSocket(int backlog) = 0;
- virtual bool ConnectSocket(string &serverip, uint16_t serverport) = 0;
- virtual Socket *AcceptSocket(string *peerip, uint16_t *peerport) = 0;
- virtual int GetSockFd() = 0;
- virtual void SetSockFd(int sockfd) = 0;
- virtual void CloseSocket() = 0;
- virtual bool Recv(string *buff, int size) = 0;
- virtual void Send(string &send_string) = 0;
-
- // 方法的集中在一起使用
- public:
- void BuildListenSocket(uint16_t port, int backlog = default_backlog)
- {
- CreateSocket();
- BindSocket(port);
- ListenSocket(backlog);
- }
-
- bool BuildConnectSocket(string &serverip, uint16_t serverport)
- {
- CreateSocket();
- return ConnectSocket(serverip, serverport);
- }
-
- void BuildNormalSocket(int sockfd)
- {
- SetSockFd(sockfd);
- }
- };
-
- class TcpSocket : public Socket
- {
- public:
- TcpSocket(int sockfd = default_sockfd)
- : _sockfd(sockfd)
- {
- }
- ~TcpSocket() {}
-
- void CreateSocket() override
- {
- _sockfd = socket(AF_INET, SOCK_STREAM, 0);
- if (_sockfd < 0)
- exit(SocketError);
- }
- void BindSocket(uint16_t port) override
- {
- int opt = 1;
- setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
-
- struct sockaddr_in local;
- memset(&local, 0, sizeof(local));
- local.sin_family = AF_INET;
- local.sin_port = htons(port);
- local.sin_addr.s_addr = INADDR_ANY;
-
- int n = bind(_sockfd, (struct sockaddr *)&local, sizeof(local));
- if (n < 0)
- exit(BindError);
- }
- void ListenSocket(int backlog) override
- {
- int n = listen(_sockfd, backlog);
- if (n < 0)
- exit(ListenError);
- }
- bool ConnectSocket(string &serverip, uint16_t serverport) override
- {
- struct sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = AF_INET;
- addr.sin_port = htons(serverport);
- // addr.sin_addr.s_addr = inet_addr(serverip.c_str());
- inet_pton(AF_INET, serverip.c_str(), &addr.sin_addr);
- int n = connect(_sockfd, (sockaddr *)&addr, sizeof(addr));
-
- if (n == 0)
- return true;
- return false;
- }
- Socket *AcceptSocket(string *peerip, uint16_t *peerport) override
- {
- struct sockaddr_in addr;
- socklen_t len = sizeof(addr);
- int newsockfd = accept(_sockfd, (sockaddr *)&addr, &len);
- if (newsockfd < 0)
- return nullptr;
-
- // *peerip = inet_ntoa(addr.sin_addr);
-
- // INET_ADDRSTRLEN 是一个定义在头文件中的宏,表示 IPv4 地址的最大长度
- char ip_str[INET_ADDRSTRLEN];
- inet_ntop(AF_INET, &addr.sin_addr, ip_str, INET_ADDRSTRLEN);
- *peerip = ip_str;
-
- *peerport = ntohs(addr.sin_port);
-
- Socket *s = new TcpSocket(newsockfd);
- return s;
- }
- int GetSockFd() override
- {
- return _sockfd;
- }
- void SetSockFd(int sockfd) override
- {
- _sockfd = sockfd;
- }
- void CloseSocket() override
- {
- if (_sockfd > default_sockfd)
- close(_sockfd);
- }
-
- bool Recv(string *buff, int size) override
- {
- char inbuffer[size];
- ssize_t n = recv(_sockfd, inbuffer, size - 1, 0);
- if (n > 0)
- {
- inbuffer[n] = 0;
- *buff += inbuffer;
- return true;
- }
- else
- return false;
- }
-
- void Send(string &send_string) override
- {
- send(_sockfd, send_string.c_str(),send_string.size(),0);
- }
-
- private:
- int _sockfd;
- string _ip;
- uint16_t _port;
- };
- }

select只负责等待,不负责处理,最初我们有一个listen_sock需要交给select去管理,当有新链接到来是,listen_sock要去接受新链接,但是接受后,不能立刻read或者write,因为不确定当前事件是否就绪,需要将新链接也交给select管理。
如何将新链接交给select呢?我们得有一个数据结构(这里用的数组),把所有的fd都管理起来,新链接到来时,都可以往这个数组里面添加文件描述符fd。后面select遍历数组,就可以找到需要管理的fd了,但这样,我们需要经常遍历这个数组
- 添加时需要遍历找到空再插入
- select传参,需要遍历查找最大的文件描述符
- select等待成功后调用处理函数时,也需遍历查找就绪的文件描述符
同时,由于select的事件参数是一个输入输出型参数,因此我们每次都得重新对该参数重新赋值。
如下是SelectServer.hpp的核心代码
SelectServer.hpp
- #pragma once
- #include <iostream>
- #include <string>
- #include <sys/select.h>
- #include "Log.hpp"
- #include "Socket.hpp"
-
- using namespace Net_Work;
- const static int gdefaultport = 8888;
- const static int gbacklog = 8;
- const static int num = sizeof(fd_set) * 8;
-
- class SelectServer
- {
- public:
- SelectServer(int port) : _port(port), _listensock(new TcpSocket())
- {
- }
- void HandlerEvent(fd_set rfds)
- {
- for (int i = 0; i < num; i++)
- {
- if (_rfds_array[i] == nullptr)
- continue;
-
- int fd = _rfds_array[i]->GetSockFd();
- // 判断事件是否就绪
- if (FD_ISSET(fd, &rfds))
- {
- // 读事件分两类,一类是新链接到来,一类是新数据到来
- if (fd == _listensock->GetSockFd())
- {
- // 新链接到来
- lg(Info, "get a new link");
- // 获取连接
- std::string clientip;
- uint16_t clientport;
- Socket *sock = _listensock->AcceptSocket(&clientip, &clientport);
- if (!sock)
- {
- lg(Error, "accept error");
- return;
- }
- lg(Info, "get a client,client info is# %s:%d,fd: %d", clientip.c_str(), clientport, sock->GetSockFd());
- // 此时获取连接成功了,但是不能直接read write,sockfd仍需要交给select托管 -- 添加到数组_rfds_array中
- int pos = 0;
- for (; pos < num; pos++)
- {
- if (_rfds_array[pos] == nullptr)
- {
- _rfds_array[pos] = sock;
- lg(Info, "get a new link, fd is : %d", sock->GetSockFd());
- break;
- }
- }
- if (pos == num)
- {
- sock->CloseSocket();
- delete sock;
- lg(Warning, "server is full, be carefull...");
- }
- }
- else
- {
- // 普通的读事件就绪
- std::string buffer;
- bool res = _rfds_array[i]->Recv(&buffer, 1024);
- if (res)
- {
- lg(Info,"client say# %s",buffer.c_str());
- buffer+=": 你好呀,同志\n";
- _rfds_array[i]->Send(buffer);
- buffer.clear();
- }
- else
- {
- lg(Warning,"client quit ,maybe close or error,close fd: %d",fd);
- _rfds_array[i]->CloseSocket();
- delete _rfds_array[i];
- _rfds_array[i] = nullptr;
- }
- }
- }
- }
- }
- void InitServer()
- {
- _listensock->BuildListenSocket(_port, gbacklog);
- for (int i = 0; i < num; i++)
- {
- _rfds_array[i] = nullptr;
- }
- _rfds_array[0] = _listensock.get();
- }
-
- void Loop()
- {
- _isrunning = true;
- // 循环重置select需要的rfds
- while (_isrunning)
- {
- // 不能直接获取新链接,因为accpet可能阻塞
- // 所有的fd,都要交给select,listensock上面新链接,相当于读事件
- // 因此需要将listensock交给select
-
- // 遍历数组, 1.找最大的fd 2. 合法的fd添加到rfds集合中
- fd_set rfds;
- FD_ZERO(&rfds);
- int max_fd = _listensock->GetSockFd();
- for (int i = 0; i < num; i++)
- {
- if (_rfds_array[i] == nullptr)
- {
- continue;
- }
- else
- {
- // 添加fd到集合中
- int fd = _rfds_array[i]->GetSockFd();
- FD_SET(fd, &rfds);
- if (max_fd < fd) // 更新最大值
- {
- max_fd = fd;
- }
- }
- }
-
- // 定义时间
- struct timeval timeout = {0, 0};
-
- PrintDebug();
-
- // rfds是输入输出型参数,rfds是在select调用返回时,不断被修改,所以每次需要重置rfds
- int n = select(max_fd + 1, &rfds, nullptr, nullptr, /*&timeout*/ nullptr);
- switch (n)
- {
- case 0:
- lg(Info, "select timeout...,last time: %u.%u", timeout.tv_sec, timeout.tv_usec);
- break;
- case -1:
- lg(Error, "select error!!!");
- default:
- // 正常就绪的fd
- lg(Info, "select success,begin event handler,last time: %u.%u", timeout.tv_sec, timeout.tv_usec);
- HandlerEvent(rfds);
- break;
- }
- }
- _isrunning = false;
- }
-
- void Stop()
- {
- _isrunning = false;
- }
-
- void PrintDebug()
- {
- std::cout << "current select rfds list is :";
- for (int i = 0; i < num; i++)
- {
- if (_rfds_array[i] == nullptr)
- continue;
- else
- std::cout << _rfds_array[i]->GetSockFd() << " ";
- }
- std::cout << std::endl;
- }
-
- private:
- std::unique_ptr<Socket> _listensock;
- int _port;
- bool _isrunning;
-
- // select 服务器要被正确设计,需要程序员定义数据结构,来吧所有的fd管理起来
- Socket *_rfds_array[num];
- };

Main.cc
- #include <iostream>
- #include <memory>
- #include "SelectServer.hpp"
-
- void Usage(char* argv)
- {
-
- std::cout<<"Usage: \n\t"<<argv<<" port\n"<<std::endl;
- }
- // ./select_server 8080
- int main(int argc,char* argv[])
- {
- if(argc!=2)
- {
- Usage(argv[0]);
- return -1;
- }
- uint16_t localport = std::stoi(argv[1]);
- std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(localport);
- svr->InitServer();
- svr->Loop();
-
- return 0;
- }

优点:select只负责等待,可以等待多个fd,IO的时候,效率会比较高一些。
缺点:
- 由于select是输入输出型参数,因此我们每次都要对select的参数重新设置。
- 编写代码时,select因为要使用第三方数组,充满了遍历,这可能会影响select的效率。
- 用户到内核,内核到用户,每次select调用和返回,都要对位图重新设置,用户和内核之间,要一直进行数据拷贝。
- select让OS在底层遍历需要关心所有的fd,这也会造成效率低下,这也是为何第一个参数需要传入max_fd + 1,就是因为select的底层需要遍历。
- fd_set 是系统提供的类型,fd_set大小是固定的,就意味着位图的个数是固定的,也就是select最多能够检测到fd的总数是有上限的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。