当前位置:   article > 正文

Linux——多路复用之select

Linux——多路复用之select

目录

前言

一、select的认识

二、select的接口

三、select的使用

四、select的优缺点


前言

在前面,我们学习了五种IO模型,对IO有了基本的认识,知道了select效率很高,可以等待多个文件描述符,那他是如何等待的呢?我们又该如何使用呢?

一、select的认识

系统提供select函数来实现多路复用输入/输出模型

  • select系统调用是用来让我们的程序监视多个文件描述符的状态变化的
  • 程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

select只负责等待,不负责拷贝,一次可以等待多个文件描述符。他的作用是让read和write不再阻塞

二、select的接口

select的调用接口如下

参数 1 int nfds:值最大的文件描述符+1。

参数 2 fd_set* readfds:fd_set本质是一张位图。代表select需要关心的读事件

参数 3 fd_set* writefds:代表select需要关心的读事件

参数 4 fd_set* execptfdsfds:代表select需要关心的异常事件,我们暂时不考虑

参数 5 struct timeval* timeout:时间结构体,成员有秒和微秒,代表等待的时间

                                                  {n,m}为阻塞等待n秒m微秒,时间结束后返回

                                                  {0,0}为非阻塞等待

                                                  nullptr为阻塞等待

参数2,3,4类似,都是输入输出型参数,参数5也是输入输出型参数,输出的是剩余时间

以readfds为例

输入时:比特位的位置,表示文件描述符的值,比特位的内容(0/1),用户关心内核,是否关心这个fd的读事件。

输出时:比特位的位置,表示文件描述符的值,比特位的内容(0/1),内核告诉用户,哪些文件fd上的读事件是否就绪

返回值:

  1. ret  >  0 :select等待的多个fd中,已经就需要的fd个数
  2. ret == 0 :select超时返回
  3. ret  <  0 :select出错

同时,fd_set 是特定的类型,我们对其赋值时,是不方便赋值的,因此库里面也给提供的一个函数,方便我们处理。

FD_CLR                    从文件描述符集合 set 中清除文件描述符 fd。

FD_ISSET                 检查文件描述符 fd 是否在文件描述符集合 set 中。

FD_SET                    将文件描述符 fd 添加到文件描述符集合 set 中。

FD_ZERO                 清空文件描述符集合 set,将其所有位都设置为零。

三、select的使用

Log.hpp

  1. #pragma once
  2. #include <iostream>
  3. #include <cstdarg>
  4. #include <unistd.h>
  5. #include <sys/stat.h>
  6. #include <sys/types.h>
  7. #include <pthread.h>
  8. using namespace std;
  9. enum
  10. {
  11. Debug = 0,
  12. Info,
  13. Warning,
  14. Error,
  15. Fatal
  16. };
  17. enum
  18. {
  19. Screen = 10,
  20. OneFile,
  21. ClassFile
  22. };
  23. string LevelToString(int level)
  24. {
  25. switch (level)
  26. {
  27. case Debug:
  28. return "Debug";
  29. case Info:
  30. return "Info";
  31. case Warning:
  32. return "Warning";
  33. case Error:
  34. return "Error";
  35. case Fatal:
  36. return "Fatal";
  37. default:
  38. return "Unknown";
  39. }
  40. }
  41. const int default_style = Screen;
  42. const string default_filename = "Log.";
  43. const string logdir = "log";
  44. class Log
  45. {
  46. public:
  47. Log(int style = default_style, string filename = default_filename)
  48. : _style(style), _filename(filename)
  49. {
  50. if (_style != Screen)
  51. mkdir(logdir.c_str(), 0775);
  52. }
  53. // 更改打印方式
  54. void Enable(int style)
  55. {
  56. _style = style;
  57. if (_style != Screen)
  58. mkdir(logdir.c_str(), 0775);
  59. }
  60. // 时间戳转化为年月日时分秒
  61. string GetTime()
  62. {
  63. time_t currtime = time(nullptr);
  64. struct tm *curr = localtime(&currtime);
  65. char time_buffer[128];
  66. snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
  67. curr->tm_year + 1900, curr->tm_mon + 1, curr->tm_mday, curr->tm_hour, curr->tm_min, curr->tm_sec);
  68. return time_buffer;
  69. }
  70. // 写入到文件中
  71. void WriteLogToOneFile(const string &logname, const string &message)
  72. {
  73. FILE *fp = fopen(logname.c_str(), "a");
  74. if (fp == nullptr)
  75. {
  76. perror("fopen failed");
  77. exit(-1);
  78. }
  79. fprintf(fp, "%s\n", message.c_str());
  80. fclose(fp);
  81. }
  82. // 打印日志
  83. void WriteLogToClassFile(const string &levelstr, const string &message)
  84. {
  85. string logname = logdir;
  86. logname += "/";
  87. logname += _filename;
  88. logname += levelstr;
  89. WriteLogToOneFile(logname, message);
  90. }
  91. pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
  92. void WriteLog(const string &levelstr, const string &message)
  93. {
  94. pthread_mutex_lock(&lock);
  95. switch (_style)
  96. {
  97. case Screen:
  98. cout << message << endl; // 打印到屏幕中
  99. break;
  100. case OneFile:
  101. WriteLogToClassFile("all", message); // 给定all,直接写到all里
  102. break;
  103. case ClassFile:
  104. WriteLogToClassFile(levelstr, message); // 写入levelstr里
  105. break;
  106. default:
  107. break;
  108. }
  109. pthread_mutex_unlock(&lock);
  110. }
  111. // 提供接口给运算符重载使用
  112. void _LogMessage(int level, const char *file, int line, char *rightbuffer)
  113. {
  114. char leftbuffer[1024];
  115. string levelstr = LevelToString(level);
  116. string currtime = GetTime();
  117. string idstr = to_string(getpid());
  118. snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s][%s:%d]", levelstr.c_str(), currtime.c_str(), idstr.c_str(), file, line);
  119. string messages = leftbuffer;
  120. messages += rightbuffer;
  121. WriteLog(levelstr, messages);
  122. }
  123. // 运算符重载
  124. void operator()(int level, const char *file, int line, const char *format, ...)
  125. {
  126. char rightbuffer[1024];
  127. va_list args; // va_list 是指针
  128. va_start(args, format); // 初始化va_list对象,format是最后一个确定的参数
  129. vsnprintf(rightbuffer, sizeof(rightbuffer), format, args); // 写入到rightbuffer中
  130. va_end(args);
  131. _LogMessage(level, file, line, rightbuffer);
  132. }
  133. ~Log()
  134. {
  135. }
  136. private:
  137. int _style;
  138. string _filename;
  139. };
  140. Log lg;
  141. class Conf
  142. {
  143. public:
  144. Conf()
  145. {
  146. lg.Enable(Screen);
  147. }
  148. ~Conf()
  149. {
  150. }
  151. };
  152. Conf conf;
  153. // 辅助宏
  154. #define lg(level, format, ...) lg(level, __FILE__, __LINE__, format, ##__VA_ARGS__)

Socket.hpp 

  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. #include <sys/types.h>
  5. #include <sys/socket.h>
  6. #include <netinet/in.h>
  7. #include <arpa/inet.h>
  8. #include <cstring>
  9. #include <unistd.h>
  10. using namespace std;
  11. namespace Net_Work
  12. {
  13. static const int default_backlog = 5;
  14. static const int default_sockfd = -1;
  15. using namespace std;
  16. enum
  17. {
  18. SocketError = 1,
  19. BindError,
  20. ListenError,
  21. ConnectError,
  22. };
  23. // 封装套接字接口基类
  24. class Socket
  25. {
  26. public:
  27. // 封装了socket相关方法
  28. virtual ~Socket() {}
  29. virtual void CreateSocket() = 0;
  30. virtual void BindSocket(uint16_t port) = 0;
  31. virtual void ListenSocket(int backlog) = 0;
  32. virtual bool ConnectSocket(string &serverip, uint16_t serverport) = 0;
  33. virtual Socket *AcceptSocket(string *peerip, uint16_t *peerport) = 0;
  34. virtual int GetSockFd() = 0;
  35. virtual void SetSockFd(int sockfd) = 0;
  36. virtual void CloseSocket() = 0;
  37. virtual bool Recv(string *buff, int size) = 0;
  38. virtual void Send(string &send_string) = 0;
  39. // 方法的集中在一起使用
  40. public:
  41. void BuildListenSocket(uint16_t port, int backlog = default_backlog)
  42. {
  43. CreateSocket();
  44. BindSocket(port);
  45. ListenSocket(backlog);
  46. }
  47. bool BuildConnectSocket(string &serverip, uint16_t serverport)
  48. {
  49. CreateSocket();
  50. return ConnectSocket(serverip, serverport);
  51. }
  52. void BuildNormalSocket(int sockfd)
  53. {
  54. SetSockFd(sockfd);
  55. }
  56. };
  57. class TcpSocket : public Socket
  58. {
  59. public:
  60. TcpSocket(int sockfd = default_sockfd)
  61. : _sockfd(sockfd)
  62. {
  63. }
  64. ~TcpSocket() {}
  65. void CreateSocket() override
  66. {
  67. _sockfd = socket(AF_INET, SOCK_STREAM, 0);
  68. if (_sockfd < 0)
  69. exit(SocketError);
  70. }
  71. void BindSocket(uint16_t port) override
  72. {
  73. int opt = 1;
  74. setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
  75. struct sockaddr_in local;
  76. memset(&local, 0, sizeof(local));
  77. local.sin_family = AF_INET;
  78. local.sin_port = htons(port);
  79. local.sin_addr.s_addr = INADDR_ANY;
  80. int n = bind(_sockfd, (struct sockaddr *)&local, sizeof(local));
  81. if (n < 0)
  82. exit(BindError);
  83. }
  84. void ListenSocket(int backlog) override
  85. {
  86. int n = listen(_sockfd, backlog);
  87. if (n < 0)
  88. exit(ListenError);
  89. }
  90. bool ConnectSocket(string &serverip, uint16_t serverport) override
  91. {
  92. struct sockaddr_in addr;
  93. memset(&addr, 0, sizeof(addr));
  94. addr.sin_family = AF_INET;
  95. addr.sin_port = htons(serverport);
  96. // addr.sin_addr.s_addr = inet_addr(serverip.c_str());
  97. inet_pton(AF_INET, serverip.c_str(), &addr.sin_addr);
  98. int n = connect(_sockfd, (sockaddr *)&addr, sizeof(addr));
  99. if (n == 0)
  100. return true;
  101. return false;
  102. }
  103. Socket *AcceptSocket(string *peerip, uint16_t *peerport) override
  104. {
  105. struct sockaddr_in addr;
  106. socklen_t len = sizeof(addr);
  107. int newsockfd = accept(_sockfd, (sockaddr *)&addr, &len);
  108. if (newsockfd < 0)
  109. return nullptr;
  110. // *peerip = inet_ntoa(addr.sin_addr);
  111. // INET_ADDRSTRLEN 是一个定义在头文件中的宏,表示 IPv4 地址的最大长度
  112. char ip_str[INET_ADDRSTRLEN];
  113. inet_ntop(AF_INET, &addr.sin_addr, ip_str, INET_ADDRSTRLEN);
  114. *peerip = ip_str;
  115. *peerport = ntohs(addr.sin_port);
  116. Socket *s = new TcpSocket(newsockfd);
  117. return s;
  118. }
  119. int GetSockFd() override
  120. {
  121. return _sockfd;
  122. }
  123. void SetSockFd(int sockfd) override
  124. {
  125. _sockfd = sockfd;
  126. }
  127. void CloseSocket() override
  128. {
  129. if (_sockfd > default_sockfd)
  130. close(_sockfd);
  131. }
  132. bool Recv(string *buff, int size) override
  133. {
  134. char inbuffer[size];
  135. ssize_t n = recv(_sockfd, inbuffer, size - 1, 0);
  136. if (n > 0)
  137. {
  138. inbuffer[n] = 0;
  139. *buff += inbuffer;
  140. return true;
  141. }
  142. else
  143. return false;
  144. }
  145. void Send(string &send_string) override
  146. {
  147. send(_sockfd, send_string.c_str(),send_string.size(),0);
  148. }
  149. private:
  150. int _sockfd;
  151. string _ip;
  152. uint16_t _port;
  153. };
  154. }

        select只负责等待,不负责处理,最初我们有一个listen_sock需要交给select去管理,当有新链接到来是,listen_sock要去接受新链接,但是接受后,不能立刻read或者write,因为不确定当前事件是否就绪,需要将新链接也交给select管理

        如何将新链接交给select呢?我们得有一个数据结构(这里用的数组),把所有的fd都管理起来,新链接到来时,都可以往这个数组里面添加文件描述符fd。后面select遍历数组,就可以找到需要管理的fd了,但这样,我们需要经常遍历这个数组

  1. 添加时需要遍历找到空再插入
  2. select传参,需要遍历查找最大的文件描述符
  3. select等待成功后调用处理函数时,也需遍历查找就绪的文件描述符

        同时,由于select的事件参数是一个输入输出型参数,因此我们每次都得重新对该参数重新赋值。

如下是SelectServer.hpp的核心代码 

SelectServer.hpp

  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. #include <sys/select.h>
  5. #include "Log.hpp"
  6. #include "Socket.hpp"
  7. using namespace Net_Work;
  8. const static int gdefaultport = 8888;
  9. const static int gbacklog = 8;
  10. const static int num = sizeof(fd_set) * 8;
  11. class SelectServer
  12. {
  13. public:
  14. SelectServer(int port) : _port(port), _listensock(new TcpSocket())
  15. {
  16. }
  17. void HandlerEvent(fd_set rfds)
  18. {
  19. for (int i = 0; i < num; i++)
  20. {
  21. if (_rfds_array[i] == nullptr)
  22. continue;
  23. int fd = _rfds_array[i]->GetSockFd();
  24. // 判断事件是否就绪
  25. if (FD_ISSET(fd, &rfds))
  26. {
  27. // 读事件分两类,一类是新链接到来,一类是新数据到来
  28. if (fd == _listensock->GetSockFd())
  29. {
  30. // 新链接到来
  31. lg(Info, "get a new link");
  32. // 获取连接
  33. std::string clientip;
  34. uint16_t clientport;
  35. Socket *sock = _listensock->AcceptSocket(&clientip, &clientport);
  36. if (!sock)
  37. {
  38. lg(Error, "accept error");
  39. return;
  40. }
  41. lg(Info, "get a client,client info is# %s:%d,fd: %d", clientip.c_str(), clientport, sock->GetSockFd());
  42. // 此时获取连接成功了,但是不能直接read write,sockfd仍需要交给select托管 -- 添加到数组_rfds_array中
  43. int pos = 0;
  44. for (; pos < num; pos++)
  45. {
  46. if (_rfds_array[pos] == nullptr)
  47. {
  48. _rfds_array[pos] = sock;
  49. lg(Info, "get a new link, fd is : %d", sock->GetSockFd());
  50. break;
  51. }
  52. }
  53. if (pos == num)
  54. {
  55. sock->CloseSocket();
  56. delete sock;
  57. lg(Warning, "server is full, be carefull...");
  58. }
  59. }
  60. else
  61. {
  62. // 普通的读事件就绪
  63. std::string buffer;
  64. bool res = _rfds_array[i]->Recv(&buffer, 1024);
  65. if (res)
  66. {
  67. lg(Info,"client say# %s",buffer.c_str());
  68. buffer+=": 你好呀,同志\n";
  69. _rfds_array[i]->Send(buffer);
  70. buffer.clear();
  71. }
  72. else
  73. {
  74. lg(Warning,"client quit ,maybe close or error,close fd: %d",fd);
  75. _rfds_array[i]->CloseSocket();
  76. delete _rfds_array[i];
  77. _rfds_array[i] = nullptr;
  78. }
  79. }
  80. }
  81. }
  82. }
  83. void InitServer()
  84. {
  85. _listensock->BuildListenSocket(_port, gbacklog);
  86. for (int i = 0; i < num; i++)
  87. {
  88. _rfds_array[i] = nullptr;
  89. }
  90. _rfds_array[0] = _listensock.get();
  91. }
  92. void Loop()
  93. {
  94. _isrunning = true;
  95. // 循环重置select需要的rfds
  96. while (_isrunning)
  97. {
  98. // 不能直接获取新链接,因为accpet可能阻塞
  99. // 所有的fd,都要交给select,listensock上面新链接,相当于读事件
  100. // 因此需要将listensock交给select
  101. // 遍历数组, 1.找最大的fd 2. 合法的fd添加到rfds集合中
  102. fd_set rfds;
  103. FD_ZERO(&rfds);
  104. int max_fd = _listensock->GetSockFd();
  105. for (int i = 0; i < num; i++)
  106. {
  107. if (_rfds_array[i] == nullptr)
  108. {
  109. continue;
  110. }
  111. else
  112. {
  113. // 添加fd到集合中
  114. int fd = _rfds_array[i]->GetSockFd();
  115. FD_SET(fd, &rfds);
  116. if (max_fd < fd) // 更新最大值
  117. {
  118. max_fd = fd;
  119. }
  120. }
  121. }
  122. // 定义时间
  123. struct timeval timeout = {0, 0};
  124. PrintDebug();
  125. // rfds是输入输出型参数,rfds是在select调用返回时,不断被修改,所以每次需要重置rfds
  126. int n = select(max_fd + 1, &rfds, nullptr, nullptr, /*&timeout*/ nullptr);
  127. switch (n)
  128. {
  129. case 0:
  130. lg(Info, "select timeout...,last time: %u.%u", timeout.tv_sec, timeout.tv_usec);
  131. break;
  132. case -1:
  133. lg(Error, "select error!!!");
  134. default:
  135. // 正常就绪的fd
  136. lg(Info, "select success,begin event handler,last time: %u.%u", timeout.tv_sec, timeout.tv_usec);
  137. HandlerEvent(rfds);
  138. break;
  139. }
  140. }
  141. _isrunning = false;
  142. }
  143. void Stop()
  144. {
  145. _isrunning = false;
  146. }
  147. void PrintDebug()
  148. {
  149. std::cout << "current select rfds list is :";
  150. for (int i = 0; i < num; i++)
  151. {
  152. if (_rfds_array[i] == nullptr)
  153. continue;
  154. else
  155. std::cout << _rfds_array[i]->GetSockFd() << " ";
  156. }
  157. std::cout << std::endl;
  158. }
  159. private:
  160. std::unique_ptr<Socket> _listensock;
  161. int _port;
  162. bool _isrunning;
  163. // select 服务器要被正确设计,需要程序员定义数据结构,来吧所有的fd管理起来
  164. Socket *_rfds_array[num];
  165. };

 Main.cc

  1. #include <iostream>
  2. #include <memory>
  3. #include "SelectServer.hpp"
  4. void Usage(char* argv)
  5. {
  6. std::cout<<"Usage: \n\t"<<argv<<" port\n"<<std::endl;
  7. }
  8. // ./select_server 8080
  9. int main(int argc,char* argv[])
  10. {
  11. if(argc!=2)
  12. {
  13. Usage(argv[0]);
  14. return -1;
  15. }
  16. uint16_t localport = std::stoi(argv[1]);
  17. std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(localport);
  18. svr->InitServer();
  19. svr->Loop();
  20. return 0;
  21. }

四、select的优缺点

优点:select只负责等待,可以等待多个fd,IO的时候,效率会比较高一些。

缺点:

  1. 由于select是输入输出型参数,因此我们每次都要对select的参数重新设置。
  2. 编写代码时,select因为要使用第三方数组,充满了遍历,这可能会影响select的效率。
  3. 用户到内核,内核到用户,每次select调用和返回,都要对位图重新设置,用户和内核之间,要一直进行数据拷贝。
  4. select让OS在底层遍历需要关心所有的fd,这也会造成效率低下,这也是为何第一个参数需要传入max_fd + 1,就是因为select的底层需要遍历。
  5. fd_set 是系统提供的类型,fd_set大小是固定的,就意味着位图的个数是固定的,也就是select最多能够检测到fd的总数是有上限的。
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号