赞
踩
我们将TCP服务器封装成一个类,当我们定义出一个服务器对象后需要马上对服务器进行初始化,而初始化TCP服务器要做的第一件事就是创建套接字。
TCP服务器在调用socket
函数创建套接字时,参数设置如下:
AF_INET
,因为我们要进行的是网络通信。SOCK_STREAM
,因为我们编写的是TCP服务器,SOCK_STREAM
提供的就是一个有序的、可靠的、全双工的、基于连接的流式服务。如果创建套接字后获得的文件描述符是小于0的,说明套接字创建失败,此时也就没必要进行后续操作了,直接终止程序即可。
class TcpServer { public: void InitServer() { // 1. 创建流式套接字 _sock = ::socket(AF_INET, SOCK_STREAM, 0); if (_sock < 0) { LOG(FATAL, "socket error"); exit(SOCKET_ERROR); } LOG(DEBUG, "socket create success,sockfd is : %d", _sock); } ~TcpServer() { if (_sock >= 0){ close(_sock); } } private: int _sock; //套接字 };
套接字创建完成,此时的_sock
只是一个文件描述符,并未与网络进行关联,所以我们需要调用bind
函数将该套接字绑定对应的协议家族、IP和PORT等信息。
而协议家族、IP和PORT信息存放在struct sockaddr_in
这样的结构体中,所以我们需要创建一个该结构体,并将对应的数据进行填充。
class TcpServer { public: TcpServer(int port) : _sock(-1), _port(port) {} void InitServer() { // 1. 创建流式套接字 _sock = ::socket(AF_INET, SOCK_STREAM, 0); if (_sock < 0) { LOG(FATAL, "socket error"); exit(SOCKET_ERROR); } LOG(DEBUG, "socket create success,sockfd is : %d", _sock); // 2. 绑定 struct sockaddr_in local; // struct sockaddr_in 系统提供的数据类型。local是变量,用户栈上开辟空间。 bzero(&local, sizeof(local)); // 将从&local开始的sizeof(local)大小的内存区域置零 local.sin_family = AF_INET; // 设置网络通信方式 local.sin_port = htons(_port); // port要经过网络传输给对面,所有需要从主机序列转换为网络序列 local.sin_addr.s_addr = INADDR_ANY; int n = bind(_sock, (struct sockaddr *)&local, sizeof(local)); if (n < 0) { LOG(FATAL, "bind error"); exit(BIND_ERROR); } LOG(DEBUG, "bind success,sockfd is : %d", _sock); } ~TcpServer() { if (_sock >= 0){ close(_sock); } } private: int _sock; //监听套接字 uint16_t _port; //端口号 };
具体细节在网络变成套接字(一)已经讲解过,这里就不重复了。
以上过程TcpServer与UdpServer唯一的区别就在于创建套接字时TcpServer是字节流式SOCK_STREAM
,而UdpServer是数据报式SOCK_DGRAM
。
UDP服务器的初始化操作只有两步,第一步就是创建套接字,第二步就是绑定。
而TCP服务器是面向连接的,客户端在正式向TCP服务器发送数据之前,需要先与TCP服务器建立连接,然后才能与服务器进行通信。
因此TCP服务器需要时刻注意是否有客户端发来连接请求,此时就需要将TCP服务器创建的套接字设置为监听状态。
int listen(int sockfd, int backlog);
参数说明:
sockfd
:需要设置为监听状态的套接字对应的文件描述符。backlog
:全连接队列的最大长度。如果有多个客户端同时发来连接请求,此时未被服务器处理的连接就会放入连接队列,该参数代表的就是这个全连接队列的最大长度,一般不要设置太大,设置为5或10即可。返回值说明:
const static int gbacklog = 5; class TcpServer { public: TcpServer(int port) : _listensockfd(-1), _port(port) {} void InitServer() { // 1. 创建流式套接字 _listensockfd = ::socket(AF_INET, SOCK_STREAM, 0); if (_listensockfd < 0) { LOG(FATAL, "socket error"); exit(SOCKET_ERROR); } LOG(DEBUG, "socket create success,sockfd is : %d", _listensockfd); // 2. 绑定 struct sockaddr_in local; // struct sockaddr_in 系统提供的数据类型。local是变量,用户栈上开辟空间。 bzero(&local, sizeof(local)); // 将从&local开始的sizeof(local)大小的内存区域置零 local.sin_family = AF_INET; // 设置网络通信方式 local.sin_port = htons(_port); // port要经过网络传输给对面,所有需要从主机序列转换为网络序列 local.sin_addr.s_addr = INADDR_ANY; int n = bind(_listensock, (struct sockaddr *)&local, sizeof(local)); if (n < 0) { LOG(FATAL, "bind error"); exit(BIND_ERROR); } LOG(DEBUG, "bind success,sockfd is : %d", _listensockfd); // 3. tcp是面向连接的,所以通信之前,必须先建立连接,服务器是被链接的 // tcpserver启动,未来首先要一直等待客户端的连接,listen n = listen(_listensockfd, gbacklog); if (n < 0) { LOG(FATAL, "listen error"); exit(LISTEN_ERROR); } LOG(DEBUG, "listen success,sockfd is : %d", _listensockfd); } ~TcpServer() { if (_listensockfd >= 0){ close(_listensockfd); } } private: int _listensockfd; //监听套接字 uint16_t _port; //端口号 };
初始化TCP服务器时创建的套接字并不是普通的套接字,而应该叫做监听套接字。为了表明寓意,我们将代码中套接字的名字由_sock
改为_listensockfd
。
在初始化TCP服务器时,只有创建套接字成功、绑定成功、监听成功,此时TCP服务器的初始化才算完成。
TCP服务器初始化后就可以开始运行了,但TCP服务器在与客户端进行网络通信之前,服务器需要先获取到客户端的连接请求。
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
参数说明:
sockfd
:特定的监听套接字,表示从该监听套接字中获取连接。addr
:对端网络相关的属性信息,包括协议家族、IP地址、端口号等。addrlen
:调用时传入期望读取的addr
结构体的长度,返回时代表实际读取到的addr
结构体的长度,这是一个输入输出型参数。返回值说明:
注意:我们发现accept函数返回的也是套接字的文件描述符,那么监听套接字和该套接字的区别在哪呢?
调用accept函数获取连接时,是从监听套接字当中获取的。如果accept函数获取连接成功,此时会返回接收到的套接字对应的文件描述符。
监听套接字与accept函数返回的套接字的作用:
- 监听套接字:用于获取客户端发来的连接请求。accept函数会不断从监听套接字当中获取新连接。
- accept函数返回的套接字:用于为本次accept获取到的连接提供服务。监听套接字的任务只是不断获取新连接,而真正为这些连接提供服务的套接字是accept函数返回的套接字,而不是监听套接字。
accept函数获取连接时可能会失败,但TCP服务器不会因为获取某个连接失败而退出,因此服务端获取连接失败后应该继续获取连接。
void Loop() { _isrunning = true; // 4. 不能直接接收数据,先获取连接 while (_isrunning) { struct sockaddr_in peer; socklen_t len = sizeof(peer); // accept会阻塞等待,直到有客户端连接 int sockfd = ::accept(_listensockfd, (struct sockaddr *)&peer, &len); if (sockfd < 0) { LOG(WARNING, "accept error"); continue; // 失败了就继续获取就行,不需要退出 }; // 处理请求 } _isrunning = false; }
获取链接后,我们就可以拿着该套接字sockfd
进行数据传输了。
此时为客户端提供服务的不是监听套接字,因为监听套接字获取到一个连接后会继续获取下一个请求连接,为对应客户端提供服务的套接字实际是accept函数返回的套接字,即服务套接字。
为了让通信双方都能看到对应的现象,我们这里就实现一个简单的回声TCP服务器,服务端在为客户端提供服务时就简单的将客户端发来的数据进行输出,并且将客户端发来的数据重新发回给客户端即可。当客户端拿到服务端的响应数据后再将该数据进行打印输出,此时就能确保服务端和客户端能够正常通信了。
由于TCP面向字节流,所以通信接口我们可以使用read、write,或者使用recv、send。
ssize_t read(int fd, void *buf, size_t count);
参数说明:
fd
:特定的文件描述符,表示从该文件描述符中读取数据。buf
:数据的存储位置,表示将读取到的数据存储到该位置。count
:数据的个数,表示从该文件描述符中读取数据的字节数。返回值说明:
或者使用recv函数。
ssize_t recv(int sockfd, void *buf, size_t len, int flags);
参数说明:
sockfd
:套接字描述符,表示要接收数据的套接字。buf
:指向缓冲区的指针,接收到的数据将被存储在这个缓冲区中。len
:缓冲区的大小,即最多可以接收多少字节的数据。flags
:指定接收操作的行为,通常是0,但在某些情况下可以指定特殊的行为,如非阻塞模式。返回值说明:
recv
返回接收到的字节数。如果连接正常关闭,且没有数据可读,则返回0。当服务端调用read函数收到客户端的数据后,就可以再调用write函数将该数据再响应给客户端。
ssize_t write(int fd, const void *buf, size_t count);
参数说明:
fd
:特定的文件描述符,表示将数据写入该文件描述符对应的套接字。buf
:需要写入的数据。count
:需要写入数据的字节个数。返回值说明:
或者使用send函数。
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
参数说明:
sockfd
:套接字描述符,表示要发送数据的套接字。buf
:指向包含要发送数据的缓冲区的指针。len
:要发送的字节数。flags
:指定发送操作的行为,通常是0,但在某些情况下可以指定特殊的行为,如非阻塞模式。返回值说明:
send
返回实际发送的字节数。这通常等于请求发送的字节数,但在某些情况下(如非阻塞套接字且缓冲区已满时),它可能小于请求发送的字节数。需要注意的是,服务端读取数据是服务套接字中读取的,而写入数据的时候也是写入进服务套接字的。也就是说这里为客户端提供服务的套接字,既可以读取数据也可以写入数据,这就是TCP全双工的通信的体现。
在从服务套接字中读取客户端发来的数据时,注意及时关闭服务套接字对应的文件描述符。因为文件描述符本质就是数组的下标,因此文件描述符的资源是有限的,如果我们一直占用,那么可用的文件描述符就会越来越少,因此服务完客户端后要及时关闭对应的文件描述符,否则会导致文件描述符泄漏。
void Service(int sockfd, InetAddr client) { LOG(DEBUG, "get a new link ,info %s:%d,fd:%d", client.Ip(), client.Port(), sockfd); std::string clientaddr = "[" + client.Ip() + ":" + std::to_string(client.Port()) + "]#"; while (true) { // tcp连接面向字节流,可以使用文件接口:read,write char inbuffer[1024]; ssize_t n = read(sockfd, inbuffer, sizeof(inbuffer) - 1); if (n > 0) { inbuffer[n] = 0; std::cout << clientaddr << inbuffer << std::endl; std::string echo_string = "[server echo]# "; echo_string += inbuffer; write(sockfd, echo_string.c_str(), echo_string.size()); } else if (n == 0) // read返回值如果为0,表示读到了文件结尾,即client退出&&关闭连接了 { LOG(INFO, "%s quit", clientaddr.c_str()); break; } else { LOG(ERROR, "read error"); break; } } close(sockfd); // 文件描述符泄露 } void Loop() { _isrunning = true; // 4. 不能直接接收数据,先获取连接 while (_isrunning) { struct sockaddr_in peer; socklen_t len = sizeof(peer); // accept会阻塞等待,直到有客户端连接 int sockfd = ::accept(_listensockfd, (struct sockaddr *)&peer, &len); if (sockfd < 0) { LOG(WARNING, "accept error"); continue; // 失败了就继续获取就行,不需要退出 }; // version 0 :一次只能处理一个请求 Service(sockfd, InetAddr(peer)); } _isrunning = false; }
简单些,我们这里不对客户端进行封装了。
上篇文章我们提到过:客户端是否需要绑定的问题:
客户端要不要绑定?
答案是肯定的,因为网络通信的前提就是需要客户端的IP和PORT,服务端的IP和PORT,通过他们两个网络中的进程才可以进行通信。但是客户端不能像服务端一样显式的bind,设想一个场景,淘宝写了一个客户端,显示绑定了端口号8080,而微信写的客户端也显示绑定的8080端口号,那此时就会因为端口冲突导致你只能使用一项服务,这很明显是不现实的,所以客户端绑定端口的操作由操作系统自动完成,就是为了防止客户端端口号冲突,一般在首次发送数据的时候绑定。
客户端必须要知道它要连接的服务端的IP地址和端口号,因此客户端除了要有自己的套接字之外,还需要知道服务端的IP地址和端口号,这样客户端才能够通过套接字向指定服务器进行通信。
void Usage(std::string proc) { std::cout << "Usage:\n\t" << proc << " serverip serverport\n" << std::endl; } // ./tcpclient serverip serverport int main(int argc, char *argv[]) { if (argc != 3) { Usage(argv[0]); exit(1); } std::string serverip = argv[1]; uint16_t serverport = std::stoi(argv[2]); int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { std::cerr << "socket error" << std::endl; exit(2); } close(sockfd); return 0; }
客户端不需要显式绑定,也不需要监听,因此当客户端创建完套接字后就可以向服务端发起连接请求。
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
参数说明:
sockfd
:特定的套接字,表示通过该套接字发起连接请求。addr
:对端网络相关的属性信息,包括协议家族、IP地址、端口号等。addrlen
:传入的addr
结构体的长度。返回值说明:
需要注意的是,客户端不是不需要进行绑定,而是不需要我们自己进行绑定操作,当客户端向服务端发起连接请求时,系统会给客户端随机指定一个端口号进行绑定。因为通信双方都必须要有IP地址和端口号,否则无法唯一标识通信双方。也就是说,如果connect函数调用成功了,客户端本地会随机给该客户端绑定一个端口号发送给对端服务器。
此外,调用connect函数向服务端发起连接请求时,需要传入服务端对应的网络信息,否则connect函数也不知道该客户端到底是要向哪一个服务端发起连接请求。
void Usage(std::string proc) { std::cout << "Usage:\n\t" << proc << " serverip serverport\n" << std::endl; } // ./tcpclient serverip serverport int main(int argc, char *argv[]) { if (argc != 3) { Usage(argv[0]); exit(1); } std::string serverip = argv[1]; uint16_t serverport = std::stoi(argv[2]); int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { std::cerr << "socket error" << std::endl; exit(2); } // 构建目标主机的socket信息 struct sockaddr_in server; memset(&server, 0, sizeof(server)); // bzero server.sin_family = AF_INET; server.sin_port = htons(serverport); server.sin_addr.s_addr = inet_addr(serverip.c_str()); int n = connect(sockfd, (struct sockaddr *)&server, sizeof(server)); if (n < 0) { std::cerr << "connect error" << std::endl; exit(3); } close(sockfd); return 0; }
由于我们实现的是一个简单的回声服务器,因此当客户端连接到服务端后,客户端就可以向服务端发送数据了,这里我们可以让客户端将用户输入的数据发送给服务端,发送时调用send
函数向套接字当中写入数据即可。
当客户端将数据发送给服务端后,由于服务端读取到数据后还会进行回显,因此客户端在发送数据后还需要调用recv
函数读取服务端的响应数据,然后将该响应数据进行打印,以确定双方通信无误。
void Usage(std::string proc) { std::cout << "Usage:\n\t" << proc << " serverip serverport\n" << std::endl; } // ./tcpclient serverip serverport int main(int argc, char *argv[]) { if (argc != 3) { Usage(argv[0]); exit(1); } std::string serverip = argv[1]; uint16_t serverport = std::stoi(argv[2]); int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { std::cerr << "socket error" << std::endl; exit(2); } // 构建目标主机的socket信息 struct sockaddr_in server; memset(&server, 0, sizeof(server)); // bzero server.sin_family = AF_INET; server.sin_port = htons(serverport); server.sin_addr.s_addr = inet_addr(serverip.c_str()); int n = connect(sockfd, (struct sockaddr *)&server, sizeof(server)); if (n < 0) { std::cerr << "connect error" << std::endl; exit(3); } while (true) { std::cout << "Please Enter# "; std::string outstring; std::getline(std::cin, outstring); ssize_t s = send(sockfd, outstring.c_str(), outstring.size(), 0); // write if (s > 0) { char inbuffer[1024]; ssize_t m = recv(sockfd, inbuffer, sizeof(inbuffer) - 1, 0); if (m > 0) { inbuffer[m] = 0; std::cout << inbuffer << std::endl; } else break; } else { break; } } close(sockfd); return 0; }
经过测试,服务正常运行。
当我们仅用一个客户端连接服务端时,这一个客户端能够正常享受到服务端的服务。
但在这个客户端正在享受服务端的服务时,我们让另一个客户端也连接服务器,此时虽然在客户端显示连接是成功的,但这个客户端发送给服务端的消息既没有在服务端进行打印,服务端也没有将该数据回显给该客户端。
只有当第一个客户端退出后,服务端才会将第二个客户端发来是数据进行打印,并回显该第二个客户端。
通过实验现象可以看到,这服务端只有服务完一个客户端后才会服务另一个客户端。因为我们目前所写的是一个单执行流版的服务器,这个服务器一次只能为一个客户端提供服务。
当服务端调用accept函数获取到连接后就给该客户端提供服务,但在服务端提供服务期间可能会有其他客户端发起连接请求,但由于当前服务器是单执行流的,只能服务完当前客户端后才能继续服务下一个客户端。
实际上当服务端在给第一个客户端提供服务期间,第二个客户端向服务端发起的连接请求时是成功的,只不过服务端没有调用accept函数将该连接获取上来罢了。
在底层会为我们维护一个连接队列,服务端没有accept的新连接就会放到这个连接队列当中,而这个连接队列的最大长度就是通过
listen
函数的第二个gbacklog
来指定的,因此服务端虽然没有获取第二个客户端发来的连接请求,但是在第二个客户端那里显示是连接成功的。
所以我们需要多执行流,即多进程 | 多线程。
当服务端调用accept函数获取到新连接后不是由当前执行流为该连接提供服务,而是当前执行流调用fork函数创建子进程,然后让子进程为父进程获取到的连接提供服务。
由于父子进程是两个不同的执行流,当父进程调用fork创建出子进程后,父进程就可以继续从监听套接字当中获取新连接,而不用关心获取上来的连接是否服务完毕。
需要注意的问题:
(1)有关套接字
当创建了子进程后,子进程继承父进程的文件描述符表,注意这个文件描述符表父子是独立拥有的,即父子进程现在都持有监听套接字对应的文件描述符_listensockfd
和服务套接字sockfd
。
那么此时我们建议:
_listensockfd
,防止子进程误写_listensockfd
。要求:
sockfd
,如果服务进程不及时关掉不用的文件描述符,最终服务进程中可用的文件描述符就会越来越少。(2)有关等待子进程
当父进程创建出子进程后,父进程是需要等待子进程退出的,否则子进程会变成僵尸进程,进而造成内存泄漏。因此服务端创建子进程后需要调用wait
或waitpid
函数对子进程进行等待。
阻塞式等待与非阻塞式等待:
总之,服务端要等待子进程退出,无论采用阻塞式等待还是非阻塞式等待,都不尽人意。此时我们可以考虑让服务端不等待子进程退出。
不等待子进程退出的方式:
首先是捕捉SIGCHLD信号,将其处理动作设置为忽略。
当子进程退出时会给父进程发送SIGCHLD信号,如果父进程将SIGCHLD信号进行捕捉,并将该信号的处理动作设置为忽略,此时父进程就只需专心处理自己的工作,不必关心子进程了。
实现方式很简单,我们也推荐这样做:
void Loop() { signal(SIGCHLD, SIG_IGN); //忽略SIGCHLD信号 _isrunning = true; // 4. 不能直接接收数据,先获取连接 while (_isrunning) { struct sockaddr_in peer; socklen_t len = sizeof(peer); // accept会阻塞等待,直到有客户端连接 int sockfd = ::accept(_listensockfd, (struct sockaddr *)&peer, &len); if (sockfd < 0) { LOG(WARNING, "accept error"); continue; // 失败了就继续获取就行,不需要退出 }; // version 1 :多进程版 pid_t id = fork(); if (id == 0) { // child :关心sockfd,不关心listensockfd close(_listensockfd); // 建议关闭,防止子进程误写_listensockfd Service(sockfd, InetAddr(peer)); exit(0); } // father :关心listensockfd,不关心sockfd,因为父进程已经将sockfd交给了子进程 close(sockfd);// 必须关闭,防止父进程打开过多的文件描述符而不关闭 } _isrunning = false; }
其次是让父进程创建子进程,子进程再创建孙子进程,最后让孙子进程为客户端提供服务。。
让孙子进程为客户端提供服务, 此时我们就不用等待孙子进程退出了。
我先将代码贴出来:
void Loop() { signal(SIGCHLD, SIG_IGN); //忽略SIGCHLD信号 _isrunning = true; // 4. 不能直接接收数据,先获取连接 while (_isrunning) { struct sockaddr_in peer; socklen_t len = sizeof(peer); // accept会阻塞等待,直到有客户端连接 int sockfd = ::accept(_listensockfd, (struct sockaddr *)&peer, &len); if (sockfd < 0) { LOG(WARNING, "accept error"); continue; // 失败了就继续获取就行,不需要退出 }; // version 1 :采用多进程 pid_t id = fork(); if (id == 0) { // child :关心sockfd,不关心listensockfd close(_listensockfd); // 建议关闭,防止子进程误写_listensockfd if (fork() > 0) exit(0);// 子进程直接退出,留下孙子进程(孤儿),被系统领养,执行完service自动回收 Service(sockfd, InetAddr(peer)); // Service由孙子进程执行 exit(0); } // father :关心listensockfd,不关心sockfd,因为父进程已经将sockfd交给了子进程 close(sockfd); // 必须关闭,防止父进程打开过多的文件描述符而不关闭 waitpid(id, nullptr, 0); // 子进程直接退出,所以这里直接瞬间等待成功,所以可以继续accept } _isrunning = false; }
在子进程中创建孙子进程,然后让孙子进程执行服务,子进程直接退出,此时孙子进程就变成了孤儿进程,会被系统领养,等待的问题也不需要我们考虑了。
多进程版本的TCP网络程序在面对大量请求时也会显得力不从心,因为创建进程的成本是很高的,创建进程时需要创建该进程对应的进程控制块task_struct
、进程地址空间mm_struct
、页表等数据结构。
而创建线程的成本比创建进程的成本会小得多,因为线程本质是在进程地址空间内运行,创建出来的线程会共享该进程的大部分资源,因此在实现多执行流的服务器时最好采用多线程进行实现。
当服务进程调用accept
函数获取到一个新连接后,就可以直接创建一个线程,让该线程为对应客户端提供服务。
当然,主线程(服务进程)创建出新线程后,也是需要等待新线程退出的,否则也会造成类似于僵尸进程这样的问题。但对于线程来说,如果不想让主线程等待新线程退出,可以让创建出来的新线程调用pthread_detach
函数进行线程分离,当这个线程退出时系统会自动回收该线程所对应的资源。此时主线程(服务进程)就可以继续调用accept
函数获取新连接,而让新线程去服务对应的客户端。
参数传递的问题:
新线程在为客户端提供服务时就是调用Service
函数,而调用Service
函数时是需要传入两个参数的,分别是客户端对应的套接字和InetAddr
。因此主线程创建新线程时需要给新线程传入两个参数,但实际在调用pthread_create
函数创建新线程时,只能传入一个类型为void*
的参数。
所以根据我们之前学习线程创建的知识,我们一般是定义一个ThreadData
结构体,用来存放需要传递给执行函数的参数。
当主线程创建新线程时就可以定义一个ThreadData
对象,将客户端对应的套接字、InetAddr
设计进这个ThreadData
对象当中,然后将ThreadData
对象的地址作为新线程执行例程的参数进行传入。
此时新线程在执行例程当中再将这个void*
类型的参数强转为ThreadData*
类型,然后就能够拿到客户端对应的套接字,InetAddr
,进而调用Service
函数为对应客户端提供服务。
注意:新线程的执行例程是一个参数为void*
,返回值为void*
的函数。但由于执行例程函数我们放在了类内,它隐藏的第一个参数为this指针,所以我们需要将线程例程函数HandlerSock
设置为静态成员函数,即在函数前方加static
修饰,但是如果加static
修饰后,就无法调用类内的Service
函数了(因为没有this
指针了),所以我们需要将this
指针设为ThreadData
的类内成员,再通过这个this
调用Service
,或者将Service
也设置为静态成员函数。
class ThreadData { public: ThreadData(int fd, InetAddr addr, TcpServer *s) : sockfd(fd), clientaddr(addr), self(s) {} public: int sockfd; InetAddr clientaddr; TcpServer *self; }; class TcpServer { public: TcpServer(int port) : _port(port), _listensockfd(sockfddefault), _isrunning(false) {} void InitServer() { //略 } void Service(int sockfd, InetAddr client) { LOG(DEBUG, "get a new link ,info %s:%d,fd:%d", client.Ip(), client.Port(), sockfd); std::string clientaddr = "[" + client.Ip() + ":" + std::to_string(client.Port()) + "]#"; while (true) { // tcp连接面向字节流,可以使用文件接口:read,write char inbuffer[1024]; ssize_t n = read(sockfd, inbuffer, sizeof(inbuffer) - 1); if (n > 0) { inbuffer[n] = 0; std::cout << clientaddr << inbuffer << std::endl; std::string echo_string = "[server echo]# "; echo_string += inbuffer; write(sockfd, echo_string.c_str(), echo_string.size()); } else if (n == 0) // read返回值如果为0,表示读到了文件结尾,即client退出&&关闭连接了 { LOG(INFO, "%s quit", clientaddr.c_str()); break; } else { LOG(ERROR, "read error"); break; } } close(sockfd); // 文件描述符泄露 } static void *HandlerSock(void *args) { pthread_detach(pthread_self()); // 线程分离 ThreadData *td = static_cast<ThreadData *>(args); // 需要调用Service函数,但是Service函数是类内函数,静态成员函数没有this指针无法调用,如何解决? // 将this指针设为ThreadData的类内成员,再通过这个this调用Service td->self->Service(td->sockfd, td->clientaddr); delete td; return nullptr; } void Loop() { _isrunning = true; // 4. 不能直接接收数据,先获取连接 while (_isrunning) { struct sockaddr_in peer; socklen_t len = sizeof(peer); // accept会阻塞等待,直到有客户端连接 int sockfd = ::accept(_listensockfd, (struct sockaddr *)&peer, &len); if (sockfd < 0) { LOG(WARNING, "accept error"); continue; // 失败了就继续获取就行,不需要退出 }; // version 2 :采用多线程 pthread_t t; ThreadData *td = new ThreadData(sockfd, InetAddr(peer), this); pthread_create(&t, nullptr, HandlerSock, td); } _isrunning = false; } ~TcpServer() {//略} private: uint16_t _port; int _listensockfd; bool _isrunning; };
每当有新连接到来时,服务端的主线程都会重新为该客户端创建为其提供服务的新线程,而当服务结束后又会将该新线程销毁。这样做不仅麻烦,而且效率低下,每当连接到来的时候服务端才创建对应提供服务的线程。
如果有大量的客户端连接请求,此时服务端要为每一个客户端创建对应的服务线程。计算机当中的线程越多,CPU的压力就越大,因为CPU要不断在这些线程之间来回切换,此时CPU在调度线程的时候,线程和线程之间切换的成本就会变得很高。
此外,一旦线程太多,每一个线程再次被调度的周期就变长了,而线程是为客户端提供服务的,线程被调度的周期变长,客户端也迟迟得不到应答。
所以为了解决以上问题,我们引入线程池,线程池的存在就是为了避免处理短时间任务时创建与销毁线程的代价,此外,线程池还能够保证内核充分利用,防止过分调度。
在线程池设计中我们维护了一个任务队列,并利用vector容器管理了多个线程,这些线程轮询查看任务队列中是否存在任务,并执行任务。
在TcpServer中我们只需要将Service作为任务入队到线程池的任务队列中即可。
所以我们使用bind将Service需要的参数绑定给Service构建一个无参无返回值的函数对象。入队时将该函数对象入队即可。
线程池代码:
using namespace ThreadModule; const static int DefaultThreadNum = 5; template <typename T> class ThreadPool { private: void LockQueue() { pthread_mutex_lock(&_mutex); } void UnlockQueue() { pthread_mutex_unlock(&_mutex); } void ThreadSleep() { pthread_cond_wait(&_cond, &_mutex); } void ThreadWake() { pthread_cond_signal(&_cond); } void ThreadWakeAll() { pthread_cond_broadcast(&_cond); } ThreadPool(int threadnum = DefaultThreadNum) : _threadnum(threadnum), _waitnum(0), _isrunning(false) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); LOG(INFO, "ThreadPool Construct()"); } void initThreadPool() { for (int num = 0; num < _threadnum; num++) { std::string name = "thread -" + std::to_string(num + 1); // _threads.emplace_back(Print, name) _threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name); // 绑定 LOG(INFO, "Init Thread %s done", name.c_str()); } _isrunning = true; } void Start() { for (auto &thread : _threads) { thread.Start(); } } // 类的成员方法也可以成为另一个类(Thread)的回调方法 void HandlerTask(std::string name) // 包含this指针 { LOG(INFO, "Thread %s is running...", name.c_str()); while (true) { // 1.保证队列安全 LockQueue(); // 2.队列中不一定有数据 while (_task_queue.empty() && _isrunning) { _waitnum++; ThreadSleep(); _waitnum--; } // 2.1如果任务队列为空并且线程池已经退出 if (_task_queue.empty() && !_isrunning) { UnlockQueue(); break; } // 2.2如果任务队列非空并且线程池未退出 // 2.3如果任务队列非空并且线程池已退出 --处理完任务再退出 // 3.到这一定有任务,处理任务 T t = _task_queue.front(); _task_queue.pop(); UnlockQueue(); LOG(DEBUG, "%s get a task", name.c_str()); // 4.处理任务,这个任务属于线程私有(独占)任务,所以不放到加锁解锁之间 t(); // LOG(DEBUG, "%s handler a task,result is %s", name.c_str(), t.ResultToString().c_str()); } } // 禁止赋值拷贝 ThreadPool(const ThreadPool<T> &) = delete; ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; public: static ThreadPool<T> *GetInstance() { // 只有第一次会创建对象,后续都是获取 // 双判断的方式,可以有效减少获取单例的加锁成本,而且保证线程安全 if (_instance == nullptr) { LockGuard lockguard(&_lock); if (_instance == nullptr) { _instance = new ThreadPool<T>(); _instance->initThreadPool(); _instance->Start(); LOG(DEBUG, "创建线程池实例"); return _instance; } } LOG(DEBUG, "获取线程池实例"); return _instance; } void Stop() { LockQueue(); _isrunning = false; ThreadWakeAll(); UnlockQueue(); } void Wait() { for (auto &thread : _threads) { thread.Join(); LOG(INFO, "Thread %s is quit...", thread.name().c_str()); } } bool Enqueue(const T &t) { bool ret = false; LockQueue(); if (_isrunning) { _task_queue.push(t); if (_waitnum > 0) { ThreadWake(); } LOG(DEBUG, "enqueue task success"); ret = true; } UnlockQueue(); return ret; } ~ThreadPool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } private: int _threadnum; std::vector<Thread> _threads; // 管理线程 std::queue<T> _task_queue; // 任务队列 pthread_mutex_t _mutex; pthread_cond_t _cond; int _waitnum; bool _isrunning; // 添加单例模式 static ThreadPool<T> *_instance; static pthread_mutex_t _lock; }; template <typename T> ThreadPool<T> *ThreadPool<T>::_instance = nullptr; template <typename T> pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;
TcpServer代码:
using task_t = std::function<void()>; class TcpServer { public: TcpServer(int port) : _port(port), _listensockfd(sockfddefault), _isrunning(false) { } void InitServer() { //略 } void Service(int sockfd, InetAddr client) { //略 } void Loop() { _isrunning = true; // 4. 不能直接接收数据,先获取连接 while (_isrunning) { struct sockaddr_in peer; socklen_t len = sizeof(peer); // accept会阻塞等待,直到有客户端连接 int sockfd = ::accept(_listensockfd, (struct sockaddr *)&peer, &len); if (sockfd < 0) { LOG(WARNING, "accept error"); continue; // 失败了就继续获取就行,不需要退出 }; // version 3 : 采用线程池 task_t t = std::bind(&TcpServer::Service, this, sockfd, InetAddr(peer)); ThreadPool<task_t>::GetInstance()->Enqueue(t); } _isrunning = false; } ~TcpServer() { //略 } private: uint16_t _port; int _listensockfd; bool _isrunning; };
选择多线程实现还是线程池实现,取决于具体的应用场景和需求。
对于需要处理大量并发连接但每个连接处理时间较短的场景,线程池通常是一个更好的选择。而对于连接数较少或每个连接处理时间较长的场景,直接使用多线程可能更简单直接。
路漫漫其修远兮,吾将上下而求索。 —屈原
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。