赞
踩
网络编程的目的就是实现直接或间接地通过网络协议与其他计算机进行通讯。网络编程中
有两个主要的问题,一个是如何准确的定位网络上一台或多台主机,另一个就是找到主机后
如何可靠高效的进行数据传输。在TCP/IP协议中IP层主要负责网络主机的定位,数据传输的
路由,由IP地址可以唯一地确定Internet上的一台主机。而TCP层则提供面向应用的可靠的
或非可靠的数据传输机制。
目前较为流行的网络编程模型是客户机/服务器(C/S)结构。即通信双方一方作为服务
器等待客户提出请求并予以响应。客户则在需要服务时向服务器提出申请。服务器一般作为
守护进程始终运行,监听网络端口,一旦有客户请求,就会启动一个服务进程来响应该客
户,同时自己继续监听服务端口,使后来的客户也能及时得到服务。
通常一台主机上总是有很多个进程需要网络资源进行网络通讯。网络通讯的实体准确的讲
不是主机,而应该是主机中运行的进程。这时候光有主机名或IP地址来标识这么多个进程显然
是不够的。端口号就是为了在一台主机上提供更多的网络资源而采取得一种手段,也是TCP层
提供的一种机制。只有通过主机名或IP地址和端口号的组合才能唯一的确定网络通讯中的对象:
进程。
综上所述:IP地址和端口能够在广袤的互联网中定位到要通信的程序(进程),协议和数据传输方式规定了如何传输数据,有了这些,两台计算机就可以通信了。
03-协议_哔哩哔哩_bilibili03-协议是黑马程序员-Linux网络编程的第3集视频,该合集共计151集,视频收藏或关注UP主,及时了解更多相关视频内容。https://www.bilibili.com/video/BV1iJ411S7UA?p=3&vd_source=f443c8140671d1c361aa817ad11933121.09 端口_哔哩哔哩_bilibili1.09 端口是基于C的计算机网络编程的第9集视频,该合集共计135集,视频收藏或关注UP主,及时了解更多相关视频内容。https://www.bilibili.com/video/BV1pX4y1N7T4?p=9&spm_id_from=pageDriver&vd_source=f443c8140671d1c361aa817ad1193312
网络编程(1) --- 网络基础
https://blog.csdn.net/ghost_him/article/details/125708265
网络编程(2) --- socket编程
https://blog.csdn.net/ghost_him/article/details/125708294
网络编程(3) --- 高并发服务器
https://blog.csdn.net/ghost_him/article/details/125708299
特点:独占,通信效率低
TCP/IP协议族
TCP/IP 协议栈是一系列网络协议的总和,是构成网络通信的核心骨架,它定义了电子设备如何连入因特网,以及数据如何在它们之间进行传输。TCP/IP 协议采用4层结构,分别是应用层、传输层、网络层和链路层,每一层都呼叫它的下一层所提供的协议来完成自己的需求。这四层,每一层都由特定的协议与对方进行通信,而协议之间的通信最终都要转化为 0 和 1 的电信号,通过物理介质进行传输才能到达对方的电脑,因此物理介质是网络通信的基石。
协议:一组数据传输规则
各类协议的功能:
TCP协议 (程序员网络编程关键协议):传输控制协议, 面向连接, 可靠, 基于字节流的传输层通信协议
UDP协议:用户数据报协议, OSI参考模型中一种无连接(不需要建立连接,直接传)的传输层协议, 提供面向事物的简单不可靠信息传送服务
应用于:微信语音,视频通话
HTTP协议:超文本传输协议, 应用最广泛(通过浏览器访问网站)
FTP协议:文件传输协议 (APP中文件的上传下载会用到这个协议)
ICMP协议:inernet控制报文协议, 是TCP/IP协议族的一个协议,用于在IP主机,路由器之间传递控制信息
IGMP协议:是Internet组管理协议, 是因特网协议家族中的一个组播协议,用在主机和组播路由器之间
ARP协议:正向地址解析协议,用已知的ip, 寻找对应主机的mac地址
RARP协议:是反向地址转换协议, 通过mac地址确定ip地址
IP协议:
为了实现不同计算机/操作系统 终端进行通信,使用 TCP/IP协议通信,将 计算机网络传输过程的过程进行层次划分和层次统一。每一层都会有一些协议,数据到了这一层,就需要遵守这一层的协议。
OSI
七层模型(从理论角度):物,数,网,传,会,表,应【精选】Tcp/Ip四层模型---应用层、传输层、网络层和链路层以及他们的各自的主要工作_应用层网络层-CSDN博客文章浏览阅读6.2k次,点赞12次,收藏23次。Tcp/Ip四层模型TCP/IP 协议栈是一系列网络协议的总和,是构成网络通信的核心骨架,它定义了电子设备如何连入因特网,以及数据如何在它们之间进行传输。TCP/IP 协议采用4层结构,分别是应用层、传输层、网络层和链路层,每一层都呼叫它的下一层所提供的协议来完成自己的需求。这四层,每一层都由特定的协议与对方进行通信,而协议之间的通信最终都要转化为 0 和 1 的电信号,通过物理介质进行传输才能到达对方的电脑,因此物理介质是网络通信的基石。..._应用层网络层https://blog.csdn.net/weixin_44436675/article/details/114941553?ops_request_misc=&request_id=&biz_id=102&utm_term=%E5%9B%9B%E5%B1%82%E7%BD%91%E7%BB%9C%E6%A8%A1%E5%9E%8B%E6%AF%8F%E4%B8%80%E5%B1%82%E9%83%BD%E5%81%9A%E4%BA%86%E4%BB%80%E4%B9%88&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-2-114941553.142%5Ev96%5Epc_search_result_base7&spm=1018.2226.3001.4187OSI七层模型、TCP/IP四层模型(超详细!!!!!)-CSDN博客文章浏览阅读5.1w次,点赞117次,收藏651次。OSI七层模型七层模型,亦称OSI(Open System Interconnection)。参考模型是国际标准化组织(ISO)制定的一个用于计算机或通信系统间互联的标准体系,一般称为OSI参考模型或七层模型。它是一个七层的、抽象的模型体,不仅包括一系列抽象的术语或概念,也包括具体的协议。分层7. 应用层网络服务与最终用户的一个接口各种应用程序协议协议有:HTTP(超文本传输协议) FTP(文本传输协议) TFTP(简单文件传输协议) SMTP(简单邮件传输协议) SNMP(简单网络管_tcp/ip四层模型https://blog.csdn.net/wwy0324/article/details/109310658?spm=1001.2101.3001.6650.17&utm_medium=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromBaidu~Rate-17-109310658-blog-126115016.235%5Ev38%5Epc_relevant_anti_vip_base&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromBaidu~Rate-17-109310658-blog-126115016.235%5Ev38%5Epc_relevant_anti_vip_base&utm_relevant_index=21
ip是网络层面级别的通信,mac是设备层面级别的通信。
TCP通信就是进程到进程(不同主机)
应用层 HTTP(通过浏览器访问服务器), FTP协议
计算机网络知识点(应用层)_计算机网络应用层知识点-CSDN博客
传输层 TCP/UDP协议
网络层 IP, IGMP协议
链路层 ARP, RARP协议
传输层、网络层、链路层 的工作全是在内核实现
网卡的身份证号:
ipv4的是4个字节,人为点分十进制,字符串类型;计算机中是32bit
优点:1. 安全性好;2. 跨平台方便;3. 开发工作量小 (在线应用)
缺点:1. 不能缓存大量数据;2. 必须严格遵守http协议
优点:1. 可以缓存大量数据(需要下载应用);2. 可以自定义协议, 协议选择灵活(腾讯);3. 速度快
缺点:1. 安全性差;2. 开发工作量大;3. 跨平台难
————————————————
原文链接:https://blog.csdn.net/DanielSYC/article/details/118500642
小端法:int的低位字节存在内存低地址(本地字节序)
大端法:低位存在高地址(网络字节序)
想把本地数据发到网络上,或者接收,往往调用库函数对 ip和端口数据 做网络字节序和主机字节序的转换
- #include<arpa/inet.h>
- uint32_t htonl(uint32_t hostlong); //主要针对IP
- uint16_t htons(uint16_t hostshort); //主要针对port
- uint32_t ntohl(uint32_t netlong);
- uint16_t ntohs(uint16_t netshort);
- int inet_pton(int af,const char* src,void* dst); //p表示点分十进制的ip,n网络上的二进制ip
- //应用场景:客户端conect入参 绑定服务器ip地址的时候会用到
-
- //参数:
- af:AF_INET或AF_INET6
- src:传入参数,待转换的点分十进制的IP地址
- dst:传出参数,转换后符合网络字节序的IP地址
-
- //返回值:
- 成功返回1
- 若参2无效ip返回0(异常)
- 失败返回-1
-
-
- const char* inet_ntop(int af,const char* src,char* dst,socklen_t size);
- //参数:
- af:AF_INET或AF_INET6
- src:传入参数,待转换的网络字节序的IP地址
- dst:传出参数,转换后的点分十进制IP地址,是一块缓冲区
- size指定了缓冲区的大小
-
- //返回值:
- 成功返回dst指针
- 失败返回NULL指针,设置errorno
注意:只有应用层是用户空间
0X:0806 根据ip地址获取mac地址,这样传输数据时,不仅能找到子网,还能精确找到对应主机。
以太网帧:根据mac地址,完成数据传输
版本:ipv4/ipv6;
TTL生存期--:设置数据包在路由节点中的跳转结点上限。数据包进入到网络中,在限定时间到达不了目的地,到时间了所在路由器有义务把这个数据包丢弃。
目的ip:通过TCP或者UDP建立了长连接之后对方主机socket通信传过来的
- TCP(Transmission Control Protocol)是面向连接的传输层协议;UDP(User Datagram Protocol)是面向无连接的传输层协议。
- TCP是使用流量控制和拥塞控制的全双工可靠传输;UDP是不使用流量控制和拥塞控制的不可靠传输。
- TCP传输效率慢,所需资源多;UDP传输效率快,所需资源少。
- TCP面向字节流;UDP是面向报文的。
- TCP连接只能是一对一通信;UDP支持一对一,一对多,多对一和多对多的交互通信。
- TCP首部开销20字节;UDP的首部开销小,只有8个字节。
- TCP适用于可靠传输的应用,如:文件、邮件传输;UDP要求通讯速度快,如:音、视频。
————————————————
原文链接:https://blog.csdn.net/weixin_52386948/article/details/126996894
所谓socket通常也称作"套接字",在linux中是个 绑定了IP地址和一个端口的 伪文件,是计算机网络海洋中一个通信链的句柄。应用程序通常通过"套接字"向网络发出请求或者应答网络请求。
socket编程 的典型应用就是 Web 服务器和pc浏览器之间的通信:浏览器获取用户输入的URL,向服务器发起请求,服务器分析接收到的URL,将对应的网页内容返回给浏览器,浏览器再经过解析和渲染,就将文字、图片、视频等元素呈现给用户。
学习 socket,也就是学习计算机之间如何通信,并编写出实用的程序。
网络套接字(socket):在通信过程中,套接字(伪文件)一定是成对出现的
回顾:Linux的特殊文件类型(伪文件):管道;套接字;块设备;字符设备。对于套接字:一个fd可以索引读写2个缓冲区
一个文件描述符指向一个套接字(该套接字内部由内核借助两个缓冲区(读/写)实现)
- /*相关结构体定义,在man 7 ip*/
- struct sockaddr_in{
- sa_family_t sin_family;
- in_port_t sin_port;
- struct in_addr sin_addr;
- };
- struct in_addr{
- uint32_t s_addr;
- };
初始化结构体:
- addr.sin_family=AF_INET/AF_INET6;
- addr.sin_port=htons(9527); //端口号为short类型(16bit)
-
- int dst;
- inet_pton(AF_INET,"192.168.10.2",(void*)&dst);
- addr.sin_addr.s_addr=dst;
-
- /*或者采取下面的方法*/
- addr.sin_addr.s_addr=htonl(INADDR_ANY) //取出系统中任意有效的IP地址(检测你的联网网卡,取有效的ip地址)
- //注意取出的宏ip是 本地二进制,需要做字节序转换
结构体类型传参使用时,注意细化强转:
/*struct sockaddr是早已废弃的数据结构,已不再使用,用新的时注意强转一下*/
- /*struct sockaddr是早已废弃的数据结构,已不再使用,用新的时注意强转一下*/
- struct sockaddr_in addr;
- int ret = bind(sockfd,(struct sockaddr*)&addr,size);
-
- int bind(int sockfd,const struct sockaddr* addr,socklen_t addrlen);
模型中需要3个套接字(客户端/服务器端一对儿socket;服务器监听一个socket)
服务器上来建立的socket是迎宾小姐的角色。
即服务器两个socket:一个监听socket,一个通信socket
socket():创建一个套接字, 用fd索引
bind():绑定IP和port
listen():设置监听上限(同时与Server建立连接数)
accpet():阻塞监听客户端连接(传入一个上面创建的套接字, 传出一个连接的套接字)
在客户端中的connect()中绑定IP和port,并建立连接(阻塞)
创建一个套接字文件
- int socket(int domain,int type,int protocol);
- //入参:
- 1.domain指定使用的协议(IPv4或IPv6)
- AF_INET
- AF_INET6
- AF_UNIX或AF_LOCAL
-
- 2.type指定数据传输协议(流式或报式)
- SOCK_STREAM
- SOCK_DGRAM
-
- 3.指定代表协议(2参选择的协议的代表协议)
- 流式以TCP为代表
- 报式以UDP为代表
-
- //返回值
- 成功返回新套接字的fd,失败返回-1并设置errno
给socket绑定服务器的有效地址结构(IP+port)
- #include<sys/types.h>
- #include<sys/socket.h>
- int bind(int sockfd,const struct sockaddr* addr,socklen_t addrlen);
-
- struct sockaddr_in addr;
-
- addr.sin_family=AF_INET/AF_INET6;
- addr.sin_port=htons(9527); //端口号为short类型(16bit)
-
- //
- int dst;
- inet_pton(AF_INET,"192.168.10.2",(void*)&dst);
- addr.sin_addr.s_addr=dst;
- /*或者采取下面的方法*/
- addr.sin_addr.s_addr=htonl(INADDR_ANY) //取出系统中任意有效的IP地址
-
- /*struct sockaddr是早已废弃的数据结构,已不再使用,用新的时注意强转一下*/
- int bind(int sockfd,(struct sockaddr*)&addr,sizeof(addr));
listen设置同一时间与服务器的最大连接数 或者说 能同时进行三次握手的最大连接数
- int listen(int sockfd,int backlog);
-
- //sockfd:仍然传入socket函数的返回值
-
- backlog:上限数值, 最大128
-
- 成功返回0,失败返回-1并设置errno
accept阻塞等待客户端建立连接,成功的话返回一个与客户端成功连接的socket文件描述符(通信套接字)
- int accept(int sockfd,sockaddr* addr, socklen_t* addrlen);
-
- //入参:
- sockfd:socket函数的返回值(监听套接字)
- addr:传出参数,成功与Sever建立连接的那个客户端的地址结构
- addrlen:传入传出参数
- 入:传入addr的大小:socklen_t clit_addr_len=sizeof(addr)
- 出:客户端addr的实际大小
-
-
- //返回值:
- 成功,则返回能与Server进行通信的socket对应的文件描述符; 失败返回-1并设置errno
客户端调用connect使用现有的socket与服务器建立连接
- int connect(int sockfd,const struct sockaddr* addr,socklen_t addrlen);
-
- //入参:
- sockfd:socket函数返回值
-
- addr:传入参数,服务器的地址结构
-
- addrlen:服务器地址结构的长度
-
- //返回值
- 成功返回0,失败返回-1并设置errno
-
- 如果不使用bind()函数绑定客户端的地址结构,会采用"隐式绑定"
服务器程序实现
Server:socket():创建socket
bind():绑定Server地址结构 ( 服务器的ip可设置为:INADDR_ANY;端口自定义)
listen():设置服务器同一时间监听上限
accept():阻塞监听客户端建立连接
read():读socket获取客户端数据
toupper():事务处理
write():写回数据到客户端
close()
Client:socket():创建socket 客户端的地址结构由系统隐式绑定
connect():与服务器建立连接
write():向socket(Server)写入数据
read():读出处理后的数据
close()
- #include <stdio.h>
- #include <unistd.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <strings.h>
- #include <string.h>
- #include <ctype.h>
- #include <arpa/inet.h>
-
- #define SERVER_PORT 9527
-
- void perr_exit(const char* str) {
- perror(str);
- exit(1);
- }
-
- int main() {
- //创建监听套接字
- int sockfd = socket(AF_INET, SOCK_STREAM, 0);
- if (sockfd == -1) {
- perr_exit("socket error");
- }
-
- struct sockaddr_in server_addr;
- server_addr.sin_family = AF_INET;
- server_addr.sin_port = htons(SERVER_PORT);
- server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
-
- //绑定地址结构
- int ret = bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
- if (ret == -1) {
- perr_exit("bind error");
- }
-
- //设置并发监听上限
- ret = listen(sockfd, 128);
- if (ret == -1) {
- perr_exit("listen error");
- }
-
- struct sockaddr_in client_addr;
- socklen_t client_addr_len = sizeof(client_addr);
-
- //本机阻塞监听指定端口,等待其他主机的TCP连接请求
- int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_addr_len);
- if (clientfd == -1) {
- perr_exit("accept error");
- }
- char client_ip[128];
- printf("client ip = %s, client port = %d\n", inet_ntop(AF_INET, &(client_addr.sin_addr.s_addr), client_ip, sizeof(client_ip)), ntohs(client_addr.sin_port));
- char buf[128];
- while (1) {
- ssize_t len = read(clientfd, buf, sizeof(buf));
- for (int i = 0; i < len; ++i) {
- buf[i] = toupper(buf[i]);
- }
- write(clientfd, buf, len);
- sleep(1);
- }
- close(sockfd);
- close(clientfd);
-
- return 0;
- }
关于服务器的socket绑定自己的ip的宏:INADDR_ANY
当这个代码在某一台服务器上跑起来之后,这个宏会自动获取服务器可用的ip初始化这个宏,不用人为手动指定了。
nc工具可以模拟客户端,对已经写好的服务器代码进行测试。
1、一个shell窗口作服务器 ./server, 阻塞启监听,等待客户端连接。
2、另一个shell窗口是客户端,用nc工具去连接服务器。使用命令:nc 客户端ip(存疑?) 服务器端口号;
连接成功后,服务器端 accept拿到客户端地址结构(可以打印出来看),代码往下进入业务逻辑。
业务逻辑:客户端给个小写字符串,服务器写回成为大写。
返回客户端的ip地址和端口号
- #include <stdio.h>
- #include <unistd.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/socket.h>
- #include <arpa/inet.h>
-
- #define SERV_IP "127.0.0.1"
- #define SERV_PORT 9527
-
- void perr_exit(const char* str) {
- perror(str);
- exit(1);
- }
-
- int main() {
- //创建客户端socket
- int clientfd = socket(AF_INET, SOCK_STREAM, 0);
- if (clientfd < 0) {
- perr_exit("socket error");
- }
-
- //创建目的地即服务器的地址结构 ?ip可以随意指定吗
- struct sockaddr_in server_addr;
- server_addr.sin_family = AF_INET;
- server_addr.sin_port = htons(SERVER_PORT);
- int server_ip;
- inet_pton(AF_INET, "192.168.163.128", &server_ip);
- server_addr.sin_addr.s_addr = server_ip;
- //发起连接
- int ret = connect(clientfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
- if (ret < 0) {
- perr_exit("connect error");
- }
- const char* str = "hello, world";
- char buf[128];
- for (int i = 0; i < 10; ++i) {
- //向服务器写(发送)数据
- write(clientfd, str, strlen(str));
- //读(接受)服务器处理后的数据;注意都是自己的文件描述符。存疑?服务器处理这么快的吗??
- read(clientfd, buf, sizeof(buf));
- //读完显示在标准输出
- printf("%s\n", buf);
- sleep(1);
- }
- //最后关闭socket文件描述符
- close(clientfd);
- return 0;
- }
建立连接:
男:喂,听得到吗?
女:听得到,你听得到吗?
男:我也听得到
数据通信:
关闭连接:
男:我说完了,你还有的话你继续说
女:我知道了(男方继续听女方balabala) (这一步不能和下一步逻辑连接,因为不确定服务端是否还有话说)
女:我说完了,我也要关了
男:我知道了
电话挂断
关于MSS:
TCP的报文头部解析:
注意:一行一个int(4个字节),六行。
1、端口号
16位源端口(Source Port):16位(最大2^16 ,65536)的源端口字段包含初始化通信的端口号。源端口和IP地址的作用是标识报文的返回地址。
16位目的端口(Destination Port):16位的目的端口字段定义传输的目的地。这个端口指明接收方计算机上的应用程序接口。
TCP报头中的源端口号和目的端口号同IP数据报中的源IP与目的IP唯一确定一条TCP连接。
2、序列号和确认号
这两个字段是保证TCP可靠传输服务的关键部分。当报文被分解成多个报文段时,序列号就是报文段首字节基于整个报文的字节级的偏移量。确定序列号(接收方发给发送方)下一个期待的字节序列号。
32位数据序列号(Sequence Number):序列号是该报文段首字节的字节流编号(TCP把数据看成是有序的字节流,TCP隐式地对数据流的每个字节进行编号)。首序列号不会从零开始,而是在建立连接时通过计算机随机生成一个数,由SYN包传给接收端主机。
例如,假设主机A和主机B进行TCP通信,A发送给B的第一个TCP报文段中,序号值被系统初始化为某个随机值ISN(Initial Sequence Number,初始序号值)。那么在该传输方向上(从A到B),后续的TCP报文段中序号值将被系统设置成ISN加上该报文段所携带数据的第一个字节在整个字节流中的偏移。例如,某个TCP报文段传送的数据是字节流中的第1025~2048字节,那么该报文段的序号值就是ISN+1025.另外一个传输方向(从B到A)的TCP报文段的序号值也具有相同的含义。
32位确认数据序列号(Acknowledge Number)(接收方 发给 发送方):标识期望收到的下一个段的第一个字节的序列号,并声明此前的所有数据已经正确无误地收到。因此,确认号应该是上一次seq+数据包大小偏移(特别的,SYN和FIN标志位算1字节)。收到确认号的源计算机会知道特定的段已经被收到。确认号只有当ACK标志为1配合时才有效。
3、首部长度(数据偏移)
4位头部长度(header length):由于首部可能含有可选项内容,因此TCP报头的长度是不确定的,报头不包含任何任选字段则长度为20字节,4位首部长度字段所能表示的最大值为1111,转化为10进制,报头最大长度为60字节,因此,TCP报头的长度为20~60字节。首部长度也叫数据偏移,是因为首部长度实际上指示了数据区在报文段中的起始偏移值。
4、保留
保留(6位):为将来定义新的用途保留,现在一般置0。
5、控制位(6bits):URG、 ACK、PSH 、 RST 、 SYN、 FIN
1)URG:紧急指针标志,为1时表示紧急指针有效,该报文应该优先传送,为0则忽略紧急指针。
2)ACK:确认序号标志,为1时表示确认号有效,为0表示报文中不含确认信息,忽略确认号字段。我们称携带ACK标识的TCP报文段为确认报文段。
3)PSH:push标志,为1表示是带有push标志的数据,指示接收方在接收到该报文段以后,应优先将这个报文段交给应用程序,而不是在缓冲区排队。
4)RST:重置连接标志,用于重置由于主机崩溃或其他原因而出现错误的连接。或者用于拒绝非法的报文段和拒绝连接请求。我们称携带RST标志的TCP报文段为复位报文段。
5)SYN:表示请求建立一个连接。在连接请求中,SYN=1和ACK=0表示该数据段没有使用捎带的确认域,而连接应答捎带一个确认,即SYN=1和ACK=1。我们称携带SYN标志的TCP报文段为同步报文段。
6)FIN:finish标志,用于释放连接,为1时表示发送方已经没有数据发送了,即关闭本方数据流。我们称携带FIN标志的TCP报文段为结束报文段。
6、16位窗口
窗口大小(window size):是TCP流量控制的一个手段。这里说的窗口,指的是接收通告窗口(Receiver Window,RWND)。它告诉对方本端的TCP接收缓冲区还能容纳多少字节的数据,这样对方就可以控制发送数据的速度,从而达到流量控制。窗口大小为一个16bit字段,因而窗口大小最大为65535字节。
7、校验和
16位校验和(TCP check sum):由发送端填充,接收端对TCP报文段执行CRC算法以检验TCP报文段在传输过程中是否损坏。注意,这个校验不仅包括TCP头部,也包括数据部分。这也是TCP可靠传输的一个重要保障。
8、紧急指针
16位紧急指针(urgent pointer):只有当 URG 标志置 1 时紧急指针才有效。紧急指针是一个正的偏移量,和顺序号字段中的值相加表示紧急数据最后一个字节的序号。 TCP 的紧急方式是发送端向另一端发送紧急数据的一种方式。
9、选项和填充
TCP头部选项:TCP头部的最后一个选项字段(options)是可变长的可选信息。这部分最多包含40字节,因为TCP头部最长是60字节(其中还包含前面讨论的20字节的固定部分)。
最常见的可选字段是最长报文大小,又称为MSS(Maximum Segment Size),每个连接方通常都在通信的第一个报文段(为建立连接而设置SYN标志为1的那个段)中指明这个选项,它表示本端所能接受的最大报文段的长度。选项长度不一定是32位的整数倍,所以要加填充位,即在这个字段中加入额外的零,以保证TCP头是32的整数倍。
10、数据部分
TCP 报文段中的数据部分是可选的。在一个连接建立和一个连接终止时,双方交换的报文段仅有 TCP 首部。如果一方没有数据要发送,也使用没有任何数据的首部来确认收到的数据。在处理超时的许多情况中,也会发送不带任何数据的报文段。
————————————————
原文链接:https://blog.csdn.net/weixin_43142797/article/details/105647255
主动发起连接请求端:发送SYN=1标志位,请求建立连接。携带ISN(初始随机报文序号),数据字节数(0),滑动窗口大小
被动接受连接请求端:发送ACK=1标志位(确认收到),携带SYN=1请求标志位,携带接收端ISN(初始随机报文序号),确认序列号ack,数据字节数(0),滑动窗口大小
主动发起连接请求端:发送ACK=1标志位,应答服务器连接请求,携带确认序号
3次握手由内核完成,体现在用户态的是
accpt()
函数和connect()
函数调用成功
答:三次握手是最少能帮助我们保证双方发送/接收都没问题的次数。
问:SYN报文只是TCP头吗?
答:SYN报文不仅仅是TCP头,它是TCP三次握手中的第一个报文,用于建立TCP连接。SYN报文除了包含TCP头之外,还包含了一些TCP选项,如MSS、窗口缩放因子等。在TCP连接建立过程中,SYN报文的携带数据是不被处理的,因为SYN报文的主要目的是用于建立连接,而不是传输数据。
参照上图,假设连接成功后,客户端先发起数据传输
客户端:接着连接成功后的序列号,发出序列1001数据(20字节),ACK=1,确认序列号8001
服务器:ACK=1,确认序列号1021,并发出数据ack=8001(10字节)
客户端:ACK=1,确认序列号 8011
注意1:数据传输并非一定是一问一答,只要服务器能确认一句前面都收到了。
发送端性能好,短时间给接收端内核缓冲区发送了大量数据,有充慢缓冲区的风险。
因此数据发送端,也会告知,自己剩余缓冲区还有多大。以便于发送端控制发送节奏。
- 主动关闭连接请求端:发送FIN=1标志位、序列号 (客户端 写缓冲区关闭)
- 被动关闭连接请求端:发送ACK=1标志位(发起关闭段,半关闭完成,开始不说只听)
- 被动关闭连接请求端:发送FIN=1标志位,序列号
- 主动关闭连接请求端:发送ACK=1标志位,确认序列号(假如此时突然断网,客户端应答ACK没有发出去,服务器会周期重复发送FIN)
答:一方关闭也叫半关闭,需要两次挥手;不管另一方是否还有数据要发,最后关闭也是两次挥手。
可以查看客户端和服务器,在进行TCP通信时,处于哪个状态,协助TCP通信调试。
下图:粗线路,表示主动方一路状态;细线路,表示被动方对应一路状态。
图中按键形状:表征客户所处于的状态。
使用netstat -apn | grep client或netstat -apn | grep 9527查看TCP连接状态:
上图中是上一节并发线程服务器的状态查看。图中,服务器开启多线程,主动连接后,1个线程监听状态,2个线程通信状态。两个客户端处于通信状态。
ps:如果网络通常,主动方,SYN_SENT状态就是一瞬间,一般查不到;被动方,SYN_收到 一般也查不到。
主动关闭一个客户端,查看他的状态变为TIME_WAIT
注意:只有主动关闭连接的一方才会处于 TIME_WAIT 和等待2MSL时长。(关闭一个客户端,紧接着关闭服务器,服务器并不会经历)
对应现象:先关闭服务器,再关闭客户端。紧接着再打开服务器,是起不来的(反之,服务器能马上起来)。因为服务器在等待2MSL,地址结构还在被占用。
2MSL存在的意义(为什么主动关闭的一方需要等待2MSL时长?):保证最后一个ACK确实被被动方Server收到了;如果ACK丢失,短时间内,Server还可以在40S内重发FIN,保证最后一个ACK被妥善处理了(站好最后一班岗)
发送方知道对方收到的唯一方法就是接收方给发送方发收到的消息,但是最后一个ACK接收方不管接受没接收到,就不再回复了,因此最后一个ACK理论上发送方是没法知道对方收到ACK了没的。因此,设置40S的意义是等一个差不多的时间,如果在这个时间段内,一直没发SYN,就默认最后一个ACK对方收到了。
2MSL时常一定出现在主动关闭连接请求一端
其他状态:建立连接时,如果最后一个ACK没有收到,RST重启
服务器经过socket, bind, listen, accept
后,阻塞监听。
客户端用socket
建立套接字后,调用connect
函数,对应到内核就是发送SYN标志位,开始三次握手。
无论是谁调用close(fd),对应到kernel就是发了一个FIN数据包。
为了保证程序的健壮性,错误处理是必要的,但如果用以前的sys_err()函数会很零散,打乱了代码的整体逻辑,高频书写也降低了代码书写效率,于是提出错误处理函数封装:将原函数首字母大写进行错误处理,这样依然可以保证跳转到linuxmanPage的原函数。
不仅限于socket通信接口函数,read/write也可以封装,在原来的入参形式下,实现更强大的功能。
问:c库f开头的文件操作函数可以用来写socket通信吗?
答:不能。因为它的传参不是文件描述符,而是文件类型的指针。
常用封装:
- //wrap.c
- #include "wrap.h"
-
- void perr_exit(const char* str) {
- perror(str);
- exit(1);
- }
-
- int Accept(int sockfd, struct sockaddr* addr, socklen_t* addrlen) {
- int n;
-
- again:
- if ((n = accept(sockfd, addr, addrlen)) < 0) {
- /*If the error is caused by a singal,not due to the accept itself,try again*/
- if ((errno == EINTR) || (errno == ECONNABORTED)) {
- goto again;
- } else {
- perr_exit("accept error");
- }
- }
- return n;
- }
-
- int Bind(int sockfd, const struct sockaddr* addr, socklen_t addrlen) {
- int n = bind(sockfd, addr, addrlen);
- if (n < 0) {
- perr_exit("bind error");
- }
- return n;
- }
-
- int Connect(int sockfd, const struct sockaddr* addr, socklen_t addrlen) {
- int n = connect(sockfd, addr, addrlen);
- if (n < 0) {
- perr_exit("connect error");
- }
- return n;
- }
-
- int Listen(int sockfd, int backlog) {
- int n = listen(sockfd, backlog);
- if (n < 0) {
- perr_exit("listen error");
- }
- return n;
- }
-
- int Socket(int domain, int type, int protocol) {
- int n = socket(domain, type, protocol);
- if (n < 0) {
- perr_exit("listen error");
- }
- return n;
- }
-
- ssize_t Readn(int fd, void* vptr, size_t n) {
- size_t nleft = n;
- ssize_t nread;
- char* ptr = vptr;
-
- while (nleft > 0) {
- if ((nread = read(fd, ptr, nleft)) < 0) {
- if (errno == EINTR) {
- nread = 0;
- } else {
- return -1;
- }
- } else if (nread == 0) {
- break;
- }
- nleft = nleft - nread;
- ptr = ptr - nread;
- }
- return n - nleft;
- }
-
- ssize_t Writen(int fd, const void* vptr, size_t n) {
- size_t nleft = n;
- ssize_t nwritten;
- const char* ptr = vptr;
-
- while (nleft > 0) {
- if ((nwritten = write(fd, ptr, nleft)) <= 0) {
- if ((errno == EINTR) && (nwritten < 0)) {
- nwritten = 0;
- } else {
- return -1;
- }
- }
- nleft = nleft - nwritten;
- ptr = ptr - nwritten;
- }
- return n;
- }
- //wrap.h
- #ifndef __WRAP_H_
- #define __WRAP_H_
-
- void perr_exit(const char *s);
- int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr);
- int Bind(int fd, const struct sockaddr *sa, socklen_t salen);
- int Connect(int fd, const struct sockaddr *sa, socklen_t salen);
- int Listen(int fd, int backlog);
- int Socket(int family, int type, int protocol);
- ssize_t Read(int fd, void *ptr, size_t nbytes);
- ssize_t Write(int fd, const void *ptr, size_t nbytes);
- int Close(int fd);
- ssize_t Readn(int fd, void *vptr, size_t n);
- ssize_t Writen(int fd, const void *vptr, size_t n);
- ssize_t my_read(int fd, char *ptr);
- ssize_t Readline(int fd, void *vptr, size_t maxlen);
-
- #endif
1、两端都创建好自己的套接字,并绑定好地址结构
2、客户端直接写
3、服务端直接读
对比TCP:
服务器程序实现
Server:socket():创建socket
bind():绑定Server地址结构 ( 服务器的ip可设置为:INADDR_ANY;端口自定义)
listen():设置服务器同一时间监听上限
accept():阻塞监听客户端建立连接
read():读socket获取客户端数据
toupper():事务处理
write():写回数据到客户端
close()
Client:socket():创建socket 客户端的地址结构由系统隐式绑定
connect():与服务器建立连接
write():向socket(Server)写入数据
read():读出处理后的数据
close()
由于UDP无连接的特性,accept()和connect()过程被舍弃,流程分析如下:
Server:
- int sockfd = socket(AF_INET, SOCK_DGRAM, 0);//UDP的socket函数的参2传入SOCK_DGRAM,表示报式协议
- bind();
-
- while(1){
- recvfrom();
- 小写->大写;
- sendto();
- //ps: recv()和send()只能用于TCP通信,代替read()和write()
- }
- close(sockfd);
Client:
- int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
- sendto();//地址直接由sendto去发
- recvfrom();
- 写到屏幕;
- close();
recvfrom:
ssize_t recvfrom(int sockfd, void* buf, size_t len, int flags,struct sockaddr* src_addr, socklen_t* addrlen); //参数: recvfrom涵盖accpet传出地址结构的作用 src_addr是传出参数:传出对端地址结构 addrlen是传入传出参数:传入本端地址结构大小,传出对端地址结构大小 //返回值: 成功返回接受数据字节数,失败返回-1并设置errnosendto:
ssize_t sendto(int sockfd,const void* buf,size_t len,int flags,const struct sockaddr* dest_addr,socklen_t addrlen); //参数: dest_addr是传入参数:传入对端的地址结构 flags:默认传0 //返回值: 成功返回写出的字节数,失败返回-1并设置errno收发两个函数,都考虑的是对端的地址结构(一个传出,一个传入)
UDP服务器本身就是并发的,不需要accept()阻塞监听多客户端连接,因为它无需建立连接
- #include <string.h>
- #include <stdio.h>
- #include <unistd.h>
- #include <arpa/inet.h>
- #include <ctype.h>
-
- #define SERV_PORT 9527
-
- int main(void)
- {
- //直接创建服务器的套接字,绑定地址结构,端口设置为9527
- struct sockaddr_in serv_addr, clie_addr;
- socklen_t clie_addr_len;
- int sockfd;
- char buf[BUFSIZ];
- char str[INET_ADDRSTRLEN];
- int i, n;
-
- sockfd = socket(AF_INET, SOCK_DGRAM, 0);
-
- bzero(&serv_addr, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
- serv_addr.sin_port = htons(SERV_PORT);
-
- bind(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
- printf("Accepting connections ...\n");
-
- //轮询,等待客户端发送数据
- while (1) {
- //读数据,包括传来的客户端的地址结构
- clie_addr_len = sizeof(clie_addr);
- n = recvfrom(sockfd, buf, BUFSIZ,0, (struct sockaddr *)&clie_addr, &clie_addr_len);
- if (n == -1)
- perror("recvfrom error");
-
- printf("received from %s at PORT %d\n",
- inet_ntop(AF_INET, &clie_addr.sin_addr, str, sizeof(str)),
- ntohs(clie_addr.sin_port));
- //把读到缓冲区的数据,按照字节小写转大写
- for (i = 0; i < n; i++)
- buf[i] = toupper(buf[i]);
- //把buffer中的数据发回客户端,发回参数包括客户端地址。
- n = sendto(sockfd, buf, n, 0, (struct sockaddr *)&clie_addr, sizeof(clie_addr));
- if (n == -1)
- perror("sendto error");
- }
-
- close(sockfd);
-
- return 0;
- }
-
- #include <stdio.h>
- #include <string.h>
- #include <unistd.h>
- #include <arpa/inet.h>
- #include <ctype.h>
-
- #define SERV_PORT 9527
-
- int main(int argc, char *argv[])
- { //直接创建套接字,创建客户端的地址结构,(客户端自己的地址结构隐式绑定)
- struct sockaddr_in servaddr;
- int sockfd, n;
- char buf[BUFSIZ];
-
- sockfd = socket(AF_INET, SOCK_DGRAM, 0);
-
- bzero(&servaddr, sizeof(servaddr));
- servaddr.sin_family = AF_INET;
- inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
- servaddr.sin_port = htons(SERV_PORT);
-
- //客户端自己的地址还是隐式绑定,下面这句不生效
- //bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
-
- //轮询,跟客户端收发数据
- while (fgets(buf, BUFSIZ, stdin) != NULL) {
-
- //发的时候带对端地址
- n = sendto(sockfd, buf, strlen(buf), 0, (struct sockaddr *)&servaddr, sizeof(servaddr));
- if (n == -1)
- perror("sendto error");
-
- //读的时候不关心对端地址,服务器地址本来就知道。所以直接传NULL,不存
- n = recvfrom(sockfd, buf, BUFSIZ, 0, NULL, 0); //NULL:不关心对端信息
- if (n == -1)
- perror("recvfrom error");
-
- write(STDOUT_FILENO, buf, n);
- }
-
- close(sockfd);
-
- return 0;
- }
-
跟网络套接字相比,东西基本上还是一套。传输对象由网络主机之间变为单机的进程之间。因此主要传参整体改变。
- int socket(int domain, int type, int protocol);
- //入参:
- 此时domain传入AF_UNIX或AF_LOCAL
- type传入SOCK_STREAM或SOCK_DGRAM都可
- protocol传0
本地套接字的地址结构就是个文件
本地套接字绑定的地址结构变为sockaddr_un:
- struct sockaddr_un {
- __kernel_sa_family sun_family;//地址类型
- char sun_path[UNIX_PATH_MAX];//"sockt文件名 含路径"
- }
- servaddr.sun_family = AF_UNIX;
- strcpy(servaddr.sun_path, SERV_ADDR);
这个初始化不太明白???
1)地址结构体大小的求法:
=16位地址类型的大小(用地址偏移求)+路径名成员的大小(用strlen求)
2)调用bind之前调用,unlink(删除文件(硬链接,等所有打开这个文件的进程关闭后,文件彻底被释放)) ,保证文件不存在
如果当前目录下有这个文件,起服务器会失败
3)bind调用成功后,会把套接字文件"serv.socket"创建出来(本质上文件还是socket创建,bind让这个文件最终版生效),由于是伪文件,大小是0
- #include <stdio.h>
- #include <unistd.h>
- #include <sys/socket.h>
- #include <strings.h>
- #include <string.h>
- #include <ctype.h>
- #include <arpa/inet.h>
- #include <sys/un.h>
- #include <stddef.h>
-
- #include "wrap.h"
-
- #define SERV_ADDR "serv.socket"
-
- int main(void)
- {
- int lfd, cfd, len, size, i;
- struct sockaddr_un servaddr, cliaddr;
- char buf[4096];
- //1、socket创建套接字lfd
- lfd = Socket(AF_UNIX, SOCK_STREAM, 0);
-
- bzero(&servaddr, sizeof(servaddr));
- //2、初始化服务器地址结构
- servaddr.sun_family = AF_UNIX;
- strcpy(servaddr.sun_path, SERV_ADDR);
-
- len = offsetof(struct sockaddr_un, sun_path) + strlen(servaddr.sun_path); /* servaddr total len */
-
- unlink(SERV_ADDR); /* 确保bind之前serv.sock文件不存在,bind会创建该文件 */
- //3、绑定服务器地址结构
- Bind(lfd, (struct sockaddr *)&servaddr, len); /* 参3不能是sizeof(servaddr) */
- //4、设置监听上限;没什么用,但连接建立过程跟网络一样,所以要加
- Listen(lfd, 20);
-
- printf("Accept ...\n");
- while (1) {
- len = sizeof(cliaddr); //AF_UNIX大小+108B
- //5、循环中Accept监听创建cfd
- cfd = Accept(lfd, (struct sockaddr *)&cliaddr, (socklen_t *)&len);
-
- len -= offsetof(struct sockaddr_un, sun_path); /* 得到文件名的长度 */
- cliaddr.sun_path[len] = '\0'; /* 确保打印时,没有乱码出现 */
-
- printf("client bind filename %s\n", cliaddr.sun_path);
- //6、业务读写cfd
- while ((size = read(cfd, buf, sizeof(buf))) > 0) {
- for (i = 0; i < size; i++)
- buf[i] = toupper(buf[i]);
- write(cfd, buf, size);
- }
- close(cfd);
- }
- close(lfd);
-
- return 0;
- }
- #include <stdio.h>
- #include <unistd.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <strings.h>
- #include <string.h>
- #include <ctype.h>
- #include <arpa/inet.h>
- #include <sys/un.h>
- #include <stddef.h>
-
- #include "wrap.h"
-
- #define SERV_ADDR "serv.socket"
- #define CLIE_ADDR "clie.socket"
-
- int main(void)
- {
- int cfd, len;
- struct sockaddr_un servaddr, cliaddr;
- char buf[4096];
- //1、socket创建cfd
- cfd = Socket(AF_UNIX, SOCK_STREAM, 0);
-
- //2、初始化并绑定客户端地址结构
- bzero(&cliaddr, sizeof(cliaddr));
- cliaddr.sun_family = AF_UNIX;
- strcpy(cliaddr.sun_path,CLIE_ADDR);
- len = offsetof(struct sockaddr_un, sun_path) + strlen(cliaddr.sun_path); /* 计算客户端地址结构有效长度 */
- unlink(CLIE_ADDR);
- Bind(cfd, (struct sockaddr *)&cliaddr, len); /* 客户端也需要bind, 不能依赖自动绑定*/
-
- //3、初始化服务端地址结构
- bzero(&servaddr, sizeof(servaddr)); /* 构造server 地址 */
- servaddr.sun_family = AF_UNIX;
- strcpy(servaddr.sun_path, SERV_ADDR);
-
- len = offsetof(struct sockaddr_un, sun_path) + strlen(servaddr.sun_path); /* 计算服务器端地址结构有效长度 */
- //4、connect申请连接
- Connect(cfd, (struct sockaddr *)&servaddr, len);
- //5、业务读写cfd
- while (fgets(buf, sizeof(buf), stdin) != NULL) {
- write(cfd, buf, strlen(buf));
- len = read(cfd, buf, sizeof(buf));
- write(STDOUT_FILENO, buf, len);
- }
-
- close(cfd);
-
- return 0;
- }
试着把之前写的 单服务器-客户端 重新开启,然后针对客户端代码再开一个shell进程,然后运行客户端,nc 127.0.0.1,发现并不能直接再和服务器通信。
但是我们之前说,并发机制是存在的,如下。现在考虑实现并发tcp连接服务器。回顾“socket模型创建流程”和“服务器代码”,我们知道。想要新建一条tcp通信,关键在于accept()再新建成功一条套接字(子进程)。
即accept()之后,拿着返回的新的套接字,就可以重新起进程了。
- Socket(); //创建监听套接字lfd
- Bind(); //绑定服务器地址结构
- Listen(); //设置监听上限
- while(1){
- cfd=Accept();
- pid=fork();
- if(pid==0){
- close(lfd); //子进程用不到lfd
- read(cfd);
- 数据处理;
- write(cfd);
- }else if(pid>0){
- close(cfd); //父进程用不到cfd
- }
- }
-
-
- //还要考虑父进程回收子进程:
- 注册信号捕捉函数:SIGCHLD
- 在回调函数中完成子进程回收:while(waitpid())
-
- //server-multiprocess.c
- #include <stdio.h>
- #include <ctype.h>
- #include <stdlib.h>
- #include <sys/wait.h>
- #include <string.h>
- #include <strings.h>
- #include <unistd.h>
- #include <errno.h>
- #include <signal.h>
- #include <sys/socket.h>
- #include <arpa/inet.h>
- #include <pthread.h>
-
- #include "wrap.h"
-
- #define SERVER_PORT 9527
-
- //死循环非阻塞收集运行终止的子进程。(注意,如果阻塞,基于就卡在者这儿不能监听accept了)
- void catch_child(int signum) {
- while (waitpid(0, NULL, WNOHANG) > 0)
- ;
- }
-
- int main(int argc, char* argv[]) {
-
- //创建sockfd
- int sockfd = Socket(AF_INET, SOCK_STREAM, 0);
- struct sockaddr_in serveraddr;
- serveraddr.sin_family = AF_INET;
- serveraddr.sin_port = htons(SERVER_PORT);
- serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
-
- //绑定服务器地址结构
- Bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
-
- //设置服务器监听上限
- Listen(sockfd, 128);
-
- //循环监听连接
- pid_t pid;
- int clientfd;
- while (1) {
- struct sockaddr_in clientaddr;
- socklen_t clientaddr_len = sizeof(clientaddr);
- clientfd = Accept(sockfd, (struct sockaddr*)&clientaddr, &clientaddr_len);
-
- //监听到之后起子进程
- pid = fork();
-
- //父子进程分道扬镳
- if (pid < 0) {
- perr_exit("fork error");
- } else if (pid == 0) {
- close(sockfd);
- break;//注意,子进程就跳出循环了
- } else {
- //父进程关闭刚新增的套接字
- close(clientfd);
-
- //父进程用信号收集子进程
- struct sigaction act;
- act.sa_handler = catch_child;
- act.sa_flags = 0;
- sigemptyset(&act.sa_mask);
- int ret = sigaction(SIGCHLD, &act, NULL);
- if (ret < 0) {
- perr_exit("sigemptyset error");
- }
- }
- }
-
- //子进程创建break之后,专属执行流
- if (pid == 0) {
- char buf[128];
- while (1) {
- ssize_t n = read(clientfd, buf, sizeof(buf));
- if (n == 0) {
- close(clientfd);
- exit(1);
- }
- for (int i = 0; i < n; ++i) {
- buf[i] = toupper(buf[i]);
- }
- write(clientfd, buf, n);
- }
- }
- return 0;
- }
还是使用nc工具测试,两个客户端 - 一个服务器:
两个客户端都是本机,地址结构隐式绑定,访问服务器(还是本机)同一地址(127.1)和端口 9527
真多机客户端-远程服务器。需要将服务器代码上传到远程某个服务器(可以租个云服务器,做这件事情)运行起来。
- Socket(); //创建监听套接字lfd
- Bind(); //绑定服务器地址结构
- Listen(); //设置监听上限
- while(1){
- cfd=Accept(lfd,);
- pthread_create(&tid,NULL,&tfn,NULL);
- /*
- *detach设置线程分离,但是这样不能获取线程退出状态
- *如果想获取子线程退出状态,用pthread_join()函数,但是这样会造成主线程阻塞
- *解决方案:create出一个新的子线程调用pthread_join()专门用于回收
- //注意:兄弟线程之间是可以进行回收的,但是兄弟进程之间不可以进行回收,爷孙也不行
- */
- pthread_detach(tid);
- }
-
- //子线程:
- void* tfn(void* arg){
- close(lfd);
- read(cfd,);
- 数据处理;
- write(cfd,);
- pthread_exit((void*)out); //线程退出状态
- }
- //server-multithread.c
- #include <stdio.h>
- #include <ctype.h>
- #include <stdlib.h>
- #include <sys/wait.h>
- #include <string.h>
- #include <strings.h>
- #include <unistd.h>
- #include <errno.h>
- #include <signal.h>
- #include <sys/socket.h>
- #include <arpa/inet.h>
- #include <pthread.h>
- #include "wrap.h"
-
- #define SERVER_PORT 9527
-
- struct AddrFd {
- struct sockaddr_in clientaddr;
- int clientfd;
- };
- typedef struct AddrFd AddrFd;
-
- //线程回调函数
- void* work(void* arg) {
- AddrFd* p_addr_fd = (AddrFd*)arg;
- char client_ip[128];
- printf("new connection: ip = %s, port = %d\n", inet_ntop(AF_INET, &(p_addr_fd->clientaddr.sin_addr), client_ip, sizeof(client_ip)), htons(p_addr_fd->clientaddr.sin_port));
- char buf[128];
- while (1) {
- ssize_t n = read(p_addr_fd->clientfd, buf, sizeof(buf));
- if (n == 0) {
- printf("connection closed\n");
- break;
- }
- for (int i = 0; i < n; ++i) {
- buf[i] = toupper(buf[i]);
- }
- //把转的大写回写给客户端
- write(p_addr_fd->clientfd, buf, n);
- }
- close(p_addr_fd->clientfd);
- pthread_exit(0);
- }
-
- int main() {
- //创建服务器迎宾socket,绑定好地址结构,并listen
- int sockfd = Socket(AF_INET, SOCK_STREAM, 0);
- struct sockaddr_in serveraddr;
- serveraddr.sin_family = AF_INET;
- serveraddr.sin_port = htons(SERVER_PORT);
- serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
- Bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
- Listen(sockfd, 128);
-
- pthread_t tid;
-
- //最大监听128个客户端线程,直接把结构体数组定义好
- AddrFd addr_fd[128];
- int i = 0;
- while (1) {
- socklen_t clientaddr_len = sizeof(addr_fd[i].clientaddr);
- //将新连接的服务器的cfd和传出的客户端地址结构并成一个结构体指针传给线程回调函数
- addr_fd[i].clientfd = Accept(sockfd, (struct sockaddr*)&(addr_fd[i].clientaddr), &clientaddr_len);
- //注意参4:向线程回调函数传递 服务器新socket的cfd,以及客户端地址(用于在回调函数中打印出来)
- pthread_create(&tid, NULL, work, (void*)&addr_fd[i]);
- pthread_detach(tid);
- i = (i + 1) % 128;
- }
- close(sockfd);
- return 0;
- }
写代码注意点:
上面线程回调函数中,判断读结束的方法!!!
运行服务器,运行两个客户端
查看 server进程起了三个线程,一主两子
使用 单进程/线程 完成 服务器与多客户端通信
前面学习的 服务器-客户端模型,想完成这样一个模型通信,就要写一份 服务器-客户端 .c的代码。但是这样类似模型的需求应该有很多。内核能不能提供这样一个服务?
模型优化:把服务器中accept阻塞监听和等待IO请求的任务交给内核select帮忙完成,服务器只掌握核心功能。
这种思想很像CPU对IO的处理的发展历程,select的地位就像中断管理器,IO设备有中断请求时才通知CPU。对应的,只有当客户端有连接请求时才会通知server进行处理。也就是说只要server收到通知,就一定有数据待处理或连接待响应,不会再被阻塞而浪费资源了。
select函数参数简介:
#include <sys/time.h> #include <sys/types.h> #include <unistd.h> int select(int nfds, fd_set* readfds, fd_set* writefds,fd_set* exceptfds, struct timeval* timeout);入参:
nfds:所监听的最大套接字文件描述符+1,在select内部用作一个for循环的上限
fd_set*:都是传入传出参数;都是(文件描述符)位图(类似信号屏蔽字), 每个二进制位代表了一个文件描述符的状态。
- readfds:读文件描述符监听集合
- writefds:写文件描述符监听集合
- exceptfds:异常文件描述符监听集合
三种集合分别对应 读 写 监听 三种事件,都是作为入参,监听的事件对象。三种集合中的内容自定义,如下例子。
传入的是你想监听的文件描述符集合(对应位置一),传出来的时候意义变了,是实际有事件发生的文件描述符集合(当然也会将没有事件发生的位置零)
重点在于readfds:当客户端有数据发到服务器上时,触发服务器的可读事件,后面两个一般传NULL(空集合)
返回值:
- 所有你所监听的文件描述符当中有事件发生的总个数(读写异常三个参数综合考虑)
- -1说明发生异常,设置errno
select函数会在以下三种情况下返回:
- 监视的文件描述符中有数据可读,即文件描述符状态发生变化。
- 超时时间到,即指定的时间已经过去。
- 函数调用被信号中断,即收到了信号。
关于timeout:传入参数,定义阻塞监控的时间(函数执行时间)
1、阻塞
2、定时等待
3、非阻塞
构造三个集合的位图入参:
//判断客户端断开以后,可以用 void FD_CLR(int fd,fd_set* set); //将给定的套接字fd从位图set中清除出去 void FD_SET(int fd,fd_set* set); //将给定的套接字fd设置到位图set中 void FD_ZERO(fd_set* set); //将整个位图set置零 int FD_ISSET(int fd,fd_set* set); //检查给定的套接字fd是否在位图里面,返回0或1
- //accept()之前的内容相同
- listenFd = Socket(); // 创建套接字
- Bind(); // 绑定地址结构
- Listen(); // 设置监听上限
-
- // 创建读监听集合,准备入参
- fd_set rset;
- fd_set allset;
- FD_ZERO(&allset); // 将读监听集合清空
- FD_SET(listenFd, &allset); // 将listenFd添加到所有读集合当中
-
- //
- while (1) {
- //rset的传入传出参数性质,导致需要再定义一个备份变量
- rset = allset; // 保存监听集合
- ret = select(listenFd + 1, &rset, NULL, NULL, NULL); // 监听文件描述符集合对应事件
- if (ret > 0) {
- //此时的rset是传出的
- if (FD_ISSET(listenFd, &rset)) { //用这个来判断是否有新连接申请
- cfd = accept();
- FD_SET(cfd, &allset); // 将新连接 添加到监听通信描述符集合中
- }
- for (i = listenFd + 1; i <= cfd; ++i) {
- FD_ISSET(i, &rset); // 有read,write事件
- read();
- toupper();
- write();
- }
- }
- }
体会:没有起多进程、多线程,实现多客户端连接
- //server-select.c
- #include <stdio.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <string.h>
- #include <arpa/inet.h>
- #include <ctype.h>
- #include "wrap.h"
-
- #define SERVER_PORT 9527
-
- int main(int argc, char* argv[]) {
- //服务器起监听前面这坨一样
- int sockfd = Socket(AF_INET, SOCK_STREAM, 0);
- int opt = 1;
- //Setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&opt, sizeof(opt));
- struct sockaddr_in serveraddr;
- serveraddr.sin_family = AF_INET;
- serveraddr.sin_port = htons(SERVER_PORT);
- serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
-
- Bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
- Listen(sockfd, 128);
-
- //定义两个读集合,一个用,一个备份
- fd_set rdset, allset;
- FD_ZERO(&allset);
- //监听fd,先放到读集合里
- FD_SET(sockfd, &allset);
- //定义最大文件描述符,用于后面循环判断有没有来读的申请
- int maxfd = sockfd;
-
- while (1) {
- //每次循环开始,先重置rset
- rdset = allset;
- int N = select(maxfd + 1, &rdset, NULL, NULL, NULL);
- if (N == -1) {
- perr_exit("select error");
- }
- //循环中先查看select调用返回的rdset中的sockfd,是否有读请求事件(新连接请求)
- if (FD_ISSET(sockfd, &rdset)) {
- struct sockaddr_in clientaddr;
- socklen_t clientaddr_len = sizeof(clientaddr);
- //不会阻塞等待,因为确认新申请连接来了
- int clientfd = Accept(sockfd, (struct sockaddr*)&clientaddr, &clientaddr_len);
- FD_SET(clientfd, &allset);
- maxfd = (maxfd > clientfd) ? maxfd : clientfd;
- //顺便判断一下,是不是来的除了这个申请连接事件,没有别的读之类的事件了
- if (N == 1) {
- continue;//如果是,就不用往下执行处理读请求的东西了
- }
- }
- //上面如果判断rdset里面有读请求,下面挨个判断是谁请求的 和处理它
- for (int i = sockfd; i <= maxfd; ++i) {
- if (FD_ISSET(i, &rdset)) {
- char buf[128];
- ssize_t n = read(i, buf, sizeof(buf));
- if (n == 0) {
- printf("connection closed\n");
- FD_CLR(i, &allset);
- } else {
- write(STDOUT_FILENO, buf, n);
- for (int j = 0; j < n; ++j) {
- buf[j] = toupper(buf[j]);
- }
- write(i, buf, n);
- }
- }
- }
- }
- close(sockfd);//最后记得关闭监听cfd
- return 0;
- }
这个版本的缺陷:当你只需要监听几个指定的套接字时,需要对整个1024的数组进行轮询,效率降低
主要包括,添加一个自己定义数组挨个存新建的文件描述符,提高效率:
- //server-select.c
- #include "wrap.h"
-
- #define SERVER_PORT 9527
- #define MAX(a, b) ((a) > (b)) ? (a) : (b)
-
- int main(int argc, char *argv[]) {
- int i, j, n, maxi, maxfd;
- int client[FD_SETSIZE];
- char buf[BUFSIZ], str[INET_ADDRSTRLEN];
-
- struct sockaddr_in serveraddr, clientaddr;
- socklen_t clientaddr_len;
-
- int listen_fd = Socket(AF_INET, SOCK_STREAM, 0);
-
- fd_set rset, allset;
- FD_ZERO(&allset);
- FD_SET(listen_fd, &allset);
-
- /*Set the address can be reused*/
- int opt = 1;
- setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&opt, sizeof(opt));
-
- bzero(&serveraddr, sizeof(serveraddr));
- serveraddr.sin_family = AF_INET;
- serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
- serveraddr.sin_port = htons(SERVER_PORT);
-
- Bind(listen_fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
- Listen(listen_fd, 128);
-
- maxfd = listen_fd;
- maxi = -1;
-
- for (i = 0; i < FD_SETSIZE; ++i)
- client[i] = -1;
-
- while (1) {
- /*Initialize the rset by allset*/
- rset = allset;
- int N = select(maxfd + 1, &rset, NULL, NULL, NULL);
- if (N == -1)
- perr_exit("select error");
-
- /*If there is a new connection request*/
- if (FD_ISSET(listen_fd, &rset)) {
- clientaddr_len = sizeof(clientaddr);
- int connect_fd = Accept(listen_fd, (struct sockaddr *)&clientaddr, &clientaddr_len);
- printf("client address: %s port: %d\n", inet_ntop(AF_INET, &(clientaddr.sin_addr.s_addr), str, sizeof(str)), ntohs(clientaddr.sin_port));
-
- for (i = 0; i < FD_SETSIZE; ++i) {
- if (client[i] < 0) {
- client[i] = connect_fd;
- break;
- }
- }
-
- if (i == FD_SETSIZE) {
- fputs("too many clients\n", stderr);
- exit(1);
- }
-
- FD_SET(connect_fd, &allset);
-
- /*Renewal the maxfd*/
- maxfd = MAX(maxfd, connect_fd);
- maxi= MAX(maxi, i);
-
- /*Only the connection request,no data read request(no connect_fd is valid)*/
- if(--N == 0) continue;
- }
- int socket_fd = 0;
- /*There is data read request(s),traverse the fd set to corresponding them*/
- for (i = 0; i <= maxi; ++i) {
- if ((socket_fd = client[i]) < 0)
- continue;
-
- if (FD_ISSET(socket_fd, &rset)) {
- if ((n = read(socket_fd, buf, sizeof(buf))) == 0) {
- close(socket_fd);
- FD_CLR(socket_fd, &allset);
- client[i] = -1;
- } else if (n > 0) {
- for (j = 0; j < n; ++j)
- buf[j] = toupper(buf[j]);
- write(socket_fd, buf, n);
- //write(STDOUT_FILENO, buf, n);
- }
- if (--N == 0) break;
- }
- }
- }
- close(listen_fd);
- return 0;
- }
缺点:
1、监听上限受单进程文件描述符限制,最大1024个,要监听更多客户端,需要起多进程;
2、要检测for循环满足条件的fd,要自己优化算法
优点:
跨平台,各种系统都能支持。(如果后续确定使用linux,就用后面的epoll)
对select做出的小改进,没有新思想,函数稍微改了改,但是比较鸡肋,不如epoll
- #include <poll.h>
- //fds监听的文件描述符结构体数组
- int poll(struct pollfd* fds, nfds_t nfds, int timeout);
-
- //入参
- fds:监听的文件描述符结构体数组
- nfds:监听数组的实际有效监听个数
- timeout:含义同select,但是单位是ms;>0 0 -1 ;-1表示阻塞
-
- //返回值
- 返回满足对应监听事件的文件描述符总个数
-
- struct pollfd {
- int fd; /*待监听的文件描述符*/
- short events; /*requested events:待监听的文件描述符对应的监听事件->POLLIN,POLLOUT,POLLERR*/
- short revents; /*returned events:传入时给0,如果满足对应事件的话被置为非零->POLLIN,POLLOUT,POLLERR*/
- };
- /*自定义结构体数组并初始化*/
- struct pollfd pfds[1024];
-
- pfds[0].fd=lfd;
- pfds[0].events=POLLIN;
- pfds[0].revents=0;
-
- pfds[1].fd=cfd1;
- pfds[1].events=POLLIN;
- pfds[1].revents=0;
-
- pfds[2].fd=cfd2;
- pfds[2].events=POLLIN;
- pfds[2].revets=0;
- ...
- while(1){
- //pollfd=fds, nfds=5, timeout=-1
- int N=poll(pfds, 5, -1);//数据1024,实际上就用了5个
- /*轮询是否有POLLIN需求*/
- for(i = 0; i < 5; i++) {
- if(pfds[i].revents & POLLIN){ //挨个看每个结构体的revents成员
- if (i == 0) Accept();
- else Read()/Write();
- }
- }
- }
- //server-poll.c
- #include "wrap.h"
-
- #define MAXLINE 80
- #define SERVER_PORT 9527
- #define OPEN_MAX 1024
-
- int main(int argc, char* argv[]) {
- char buf[MAXLINE], str[INET_ADDRSTRLEN];
- //定义结构体数组
- struct pollfd client[OPEN_MAX];
-
- //前面这坨起socket到listen一样
- struct sockaddr_in clientaddr, serveraddr;
- int listen_fd = Socket(AF_INET, SOCK_STREAM, 0);
-
- int opt = 0;
- int ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (void*)&opt, sizeof(opt));
- if (ret == -1) perr_exit("setsockopt error");
-
- bzero(&serveraddr, sizeof(serveraddr));
- serveraddr.sin_family = AF_INET;
- serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
- serveraddr.sin_port = htons(SERVER_PORT);
-
- Bind(listen_fd, (const struct sockaddr*)&serveraddr, sizeof(serveraddr));
- Listen(listen_fd, 128);
-
- //把lfd初始化进去
- client[0].fd = listen_fd;
- client[0].events = POLLIN;
- int i = 0, j = 0, maxi = 0;
- for (i = 1; i < OPEN_MAX; ++i)
- client[i].fd = -1;
-
- while (1) {
- //直接开始传参、监听
- int N = poll(client, maxi + 1, -1);
- if (N == -1)
- perr_exit("poll error");
- //判断lfd是否被读(是否来新申请连接)
- if (client[0].revents & POLLIN) {
- //建立新cfd
- socklen_t clientaddr_len = sizeof(clientaddr);
- int connect_fd = Accept(listen_fd, (struct sockaddr*)&clientaddr, &clientaddr_len);
- printf("client address: %s, port: %d\n", inet_ntop(AF_INET, &(clientaddr.sin_addr.s_addr), str, sizeof(str)), ntohs(clientaddr.sin_port));
- //放到专门给cfd所在的数组里面
- for (i = 1; i < OPEN_MAX; ++i) {
- if (client[i].fd < 0) {
- client[i].fd = connect_fd;
- break;
- }
- }
- if (i == OPEN_MAX)
- perr_exit("too many clients, dying\n");
- client[i].events = POLLIN;
-
- if (i > maxi) maxi = i;
- if (--N <= 0) continue;
- }
- //处理通信申请
- for (i = 1; i <= maxi; ++i) {//挨个判断cfd中有没有读事件来了
- int socket_fd = 0, n = 0;
- if ((socket_fd = client[i].fd) < 0)
- continue;
- if (client[i].revents & POLLIN) {
- if ((n = read(socket_fd, buf, sizeof(buf))) < 0) {
- if (errno = ECONNRESET) {
- printf("client[%d] aborted connection\n", i);
- close(socket_fd);
- client[i].fd = -1;
- } else {
- perr_exit("read error");
- }
- } else if (n == 0) {
- printf("client[%d] closed connection\n", i);
- close(socket_fd);
- client[i].fd = -1;
- } else {
- for (j = 0; j < n; ++j)
- buf[j] = toupper(buf[j]);
- write(STDOUT_FILENO, buf, n);
- write(socket_fd, buf, n);
- }
- if (--N == 0)
- break;
- }
- }
- }
- close(listen_fd);
- return 0;
- }
优点:
1、自带数组结构(不用select里面的位图表示监听集合和有事件的集合)(相对于select的最大改变),突破1024文件描述符设置,进而拓展监听上限数(select不行);
2、可以将监听事件集合和返回事件集合分开;
缺点:不能跨平台,只适合于Linux系统;无法直接定位到满足监听事件的文件描述符,编码难度较大
poll/epoll修改监听客户端上限:突破1024文件描述符设置(修改系统配置文件):
cat /proc/sys/fs/file-max:查看当前计算机所能打开的最大文件个数,受硬件影响ulimit -a:当前用户进程所能打开的最大文件描述符个数(缺省为1024)
vi编辑器修改配置文件(可以改两个参数):
sudo vim /etc/security/limits.conf:
* soft nofile 65536:设置系统默认值(可以直接借助命令修改)
* hard nofile 100000:命令能修改的上限
借助命令修改:ulimit -n 17000:更改最大个数
epoll_create()
创建一个监听红黑树(根)
- #include <sys/epoll.h>
- int epoll_create(int size);
- //入参
- size:创建红黑树的监听节点数量(一个预计的规模数,仅供内核创建红黑树时参考)
- //返回值:
- 成功返回指向新创建的红黑树的根节点的文件描述符fd,失败返回-1并设置errno
epoll_ctl()
:操作监听红黑树用来增删改树上的 监听结点
- #include <sys/epoll.h>
- int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
-
- typedef union epoll_data {
- void* ptr;
- int fd;
- uint32_t u32;
- uint64_t u64;
- } epoll_data_t;
-
- struct epoll_event {
- uint32_t events; /* Epoll events */
- epoll_data_t data; /* User data variable */
- };
-
- //入参:
- 1、epfd:epoll_create的返回值
-
- 2、op:对该监听红黑树所做的操作选项宏:
- > EPOLL_ CTL_ADD:添加fd到监听红黑树
- > EPOLL_CTL_MOD:修改fd在监听红黑树上的监听事件
- > EPOLL_CTL_DEL:将一个fd从监听红黑树上摘下(取消监听)
-
- 3、fd:待操作的fd
-
- 4、event:struct poll_event* 传入参数
- 1)uint_3 events:EPOLLIN,EPOLLOUT,EPOLLERR //新增结点希望监听的事件
- 2) data:联合体:
- > int fd:对应监听事件的fd
- > void* ptr
- > uint_32 u32 (不用)
- > uint_64 u64 (不用)
实际监听者,保存并返回监听到的信息,交给 epoll_ctl()做处理
。
- int epoll_wait(int epfd, struct epoll_event* events,int maxevents, int timeout);
- //入参:
- 1、epfd:
- 2、events:传出参数,是一个数组,存放的是满足监听条件(有事件来了)的那些fd结构体
- 3、maxevents:数组中元素的总个数,例如如果定义了struct epoll_events events[1024],那么他就是1024,也就是数组大小
- 4、timeout:
- -1阻塞
- 0非阻塞
- >0超时时长(ms)
-
- //返回值:
- >0:满足监听的总个数,可以作为后面的循环上限
- 0:没有fd满足监听事件
- -1:出错
疑问:epoll监听的文件描述符读事件具体是怎么发生的?
答:epoll监听的文件描述符读事件是在文件描述符上有数据可读时发生的。当文件描述符上有数据可读时,内核会将该文件描述符标记为可读,并将其添加到就绪队列中,等待应用程序处理。应用程序可以通过调用epoll_wait()函数来获取就绪的文件描述符,并进行相应的处理。在ET模式下,只有在文件描述符上有新的数据到达时才会触发读事件,而在LT模式下,只要文件描述符上还有数据可读,就会一直触发读事件。
问:TCP连接上后,如果想写数据,写事件是怎么被触发的?
答:TCP连接上后,如果想写数据,写事件是由操作系统内核触发的。当应用程序调用write()函数向TCP连接写入数据时,数据被先写入到操作系统内核的发送缓冲区中,然后内核会监视该连接的发送缓冲区,当发送缓冲区变为可写状态时,内核会触发写事件,通知应用程序可以继续向该连接写入数据。
写事件的触发方式有两种:边缘触发和水平触发。边缘触发是指只有在发送缓冲区从不可写状态变为可写状态时才会触发写事件,而水平触发是指只要发送缓冲区可写,就会一直触发写事件,直到发送缓冲区被写满。
在实际应用中,一般使用边缘触发方式,因为水平触发可能会导致频繁的写事件触发,增加系统开销。
- //server-epoll.c
- #include "wrap.h"
-
- #define SERVER_PORT 9527
- #define MAXLINE 80
- #define OPEN_MAX 1024
-
- int main(int argc, char* argv[]) {
- int i = 0;
- char buf[MAXLINE], str[INET_ADDRSTRLEN];
-
- struct sockaddr_in serveraddr, clientaddr;
-
- int listen_fd = Socket(AF_INET, SOCK_STREAM, 0);
- int opt = 1;
- setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (void*)&opt, sizeof(opt));
-
- bzero(&serveraddr, sizeof(serveraddr));
- serveraddr.sin_family = AF_INET;
- serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
- serveraddr.sin_port = htons(SERVER_PORT);
-
- Bind(listen_fd, (const struct sockaddr*)&serveraddr, sizeof(serveraddr));
- Listen(listen_fd, 128);
-
- //创建epoll红黑树
- int efd = Epoll_create(OPEN_MAX);
-
- //将listenFd加入监听红黑树中
- struct epoll_event node; // 临时节点
- node.events = EPOLLIN;
- node.data.fd = listen_fd; // 构造临时节点
- int res = Epoll_ctl(efd, EPOLL_CTL_ADD, listen_fd, &node);
- //创建epoll 监听客户端 数组
- struct epoll_event ep[OPEN_MAX];//记住:结构体是给ctrl用;结构体数组是给wait用
- while (1) {
- //阻塞监听,将新事件的类型(新连接申请? 或者是 数据处理申请 ?),并且返回满足的监听总个数
- int N = Epoll_wait(efd, ep, OPEN_MAX, -1); // 阻塞监听(类似 select/poll)
-
- for (i = 0; i < N; ++i) {
- if (!(ep[i].events & EPOLLIN)) continue;//代码健壮性,只关注读事件
-
- if (ep[i].data.fd == listen_fd) { //lfd被请求读,即建立连接请求
- socklen_t clientaddr_len = sizeof(clientaddr);
- int connect_fd = Accept(listen_fd, (struct sockaddr*)&clientaddr, &clientaddr_len);
-
- node.events = EPOLLIN;
- node.data.fd = connect_fd;
- res = Epoll_ctl(efd, EPOLL_CTL_ADD, connect_fd, &node);
- } else { //数据处理请求
- int socket_fd = ep[i].data.fd;
- int n = read(socket_fd, buf, sizeof(buf));
-
- if (n == 0) {
- res = Epoll_ctl(efd, EPOLL_CTL_DEL, socket_fd, NULL);//摘
- close(socket_fd);
- printf("client[%d] closed connection\n", socket_fd);
- } else if (n < 0) {
- perr_exit("read n < 0 error");
- res = Epoll_ctl(efd, EPOLL_CTL_DEL, socket_fd, NULL);
- close(socket_fd);
- } else {
- for (i = 0; i < n; ++i) buf[i] = toupper(buf[i]);
- write(STDOUT_FILENO, buf, n);
- write(socket_fd, buf, n);
- }
- }
- }
- }
-
- close(listen_fd);//两个关闭
- close(efd);
- return 0;
- }
实际上是将(select/poll)做的必须边缘非函数操作,epoll用三个函数整合了出来,且(用红黑树)优化了 结点存储结构,会提升操作效率。
epoll有两种事件模型:
EdgeTriggered:边沿触发,只有数据到来才触发,不管缓冲区中是否还有数据
LevelTriggered:水平触发,只要有数据都会触发(默认模式)
写端 aaaa\n,紧接着bbbb\n,睡5s;下轮循环,写端 cccc\n,紧接着dddd\n,睡5s;依次类推,,,
- //epoll-ETLT-demo.c
- #include "wrap.h"
-
- #define MAXLINE 10
-
- int main() {
- char buf[MAXLINE], ch = 'a';
- memset(buf, 0, sizeof(buf));
-
- int pfd[2];
- pipe(pfd);
- pid_t pid = fork();
-
- int i = 0;
- if (pid == 0) { //child process, write
- close(pfd[0]);
- while (1) {
- //aaaa\n
- for (i = 0; i < (MAXLINE >> 1); ++i) buf[i] = ch;
- buf[i - 1] = '\n';
- ch++;
- //bbbb\n
- for (; i < MAXLINE; ++i) buf[i] = ch;
- buf[i - 1] = '\n';
- ch++;
- //aaaa\nbbbb\n
- write(pfd[1], buf, sizeof(buf));
- sleep(3);
- }
- close(pfd[1]);
- } else if (pid > 0) { // parent process, read
- struct epoll_event resevent[10];
-
- close(pfd[1]);
- int efd = epoll_create(10);
-
- struct epoll_event event;
- //两种模式切换
- //event.events = EPOLLIN | EPOLLET; // edge triger
- event.events = EPOLLIN; // level triger, by default
- event.data.fd = pfd[0];
- epoll_ctl(efd, EPOLL_CTL_ADD, pfd[0], &event);
-
- while (1) {
- int N = epoll_wait(efd, resevent, 10, -1);
- //printf("N = %d\n", N); //N = 1
-
- if (resevent[0].data.fd == pfd[0]) {
- int len = read(pfd[0], buf, MAXLINE / 2);
- write(STDOUT_FILENO, buf, len);
- }
- }
-
- close(pfd[0]);
- close(efd);
- } else {
- perr_exit("fork error");
- }
- return 0;
- }
现象:
当设置水平触发时,只要管道中有数据,epoll_wait就会返回,触发父进程读取数据,所以虽然父进程虽然每次只读取一半的数据,但读完一半后剩下的一半又会马上触发父进程wait读取,所以10个字节的数据基本上一下子显示出来。
当设置边沿触发时,父进程阻塞读取,而只有当子进程向管道中进行一次写入时才会触发父进程进行读取,所以每次只会打印一半的数据。节奏慢一半。
总结:
边沿触发:文件缓冲区未读尽的数据不会导致epoll_wait返回,新的数据写入才会触发。
水平触发:文件缓冲区未读尽的数据会导致epoll_wait返回。
顺便纠正认识误区:epoll本质是监听文件描述符,因此并不仅限于用在网络编程中的socket文件,也可监听系统编程中 的文件描述符,比如管道文件。
服务器:
- //server-epollET.c
- #include "wrap.h"
-
- #define SERVER_PORT 9527
- #define MAXLINE 10
-
- int main(int argc, char* argv[]) {
- struct sockaddr_in serveraddr, clientaddr;
- char buf[MAXLINE], str[INET_ADDRSTRLEN];
-
- int listen_fd = Socket(AF_INET, SOCK_STREAM, 0);
-
- bzero(&serveraddr, sizeof(serveraddr));
- serveraddr.sin_family = AF_INET;
- serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
- serveraddr.sin_port = htons(SERVER_PORT);
-
- Bind(listen_fd, (const struct sockaddr*)&serveraddr, sizeof(serveraddr));
- Listen(listen_fd, 128);
-
- struct epoll_event event;
- struct epoll_event resevent[10];
-
- int efd = epoll_create(10);
- event.events = EPOLLIN | EPOLLET; //edge triger
- //event.events=EPOLLIN; //level triger, by default
-
- socklen_t clientaddr_len = sizeof(clientaddr);
- int connect_fd = Accept(listen_fd, (struct sockaddr*)&clientaddr, &clientaddr_len);
- printf("accepting connections...\n");
-
- event.data.fd = connect_fd;
- epoll_ctl(efd, EPOLL_CTL_ADD, connect_fd, &event);
-
- while (1) {
- int res = epoll_wait(efd, resevent, 10, -1); //res = 1
-
- if (resevent[0].data.fd = connect_fd) {
- int len = read(connect_fd, buf, MAXLINE / 2);
- write(STDOUT_FILENO, buf, len);
- }
- }
- return 0;
- }
客户端:
- //client-epollET.c
- #include "wrap.h"
-
- #define SERVER_PORT 9527
- #define MAXLINE 10
-
- int main(int argc, char* argv[]) {
- struct sockaddr_in serveraddr;
- char buf[MAXLINE];
- char ch = 'a';
-
- int connect_fd = Socket(AF_INET, SOCK_STREAM, 0);
- bzero(&serveraddr, sizeof(serveraddr));
- serveraddr.sin_family = AF_INET;
- serveraddr.sin_port = htons(SERVER_PORT);
- inet_pton(AF_INET, "192.168.93.11", &serveraddr.sin_addr);
-
- Connect(connect_fd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
- int i = 0;
- while (1) {
- for (i = 0; i < (MAXLINE >> 1); ++i) buf[i] = ch;
- buf[i - 1] = '\n';
- ch++;
-
- for (; i < MAXLINE; ++i) buf[i] = ch;
- buf[i - 1] = '\n';
- ch++;
-
- write(connect_fd, buf, sizeof(buf));
- sleep(5);
- }
- close(connect_fd);
- return 0;
- }
如果epoll_wait设置为阻塞模式,则当你调用Readn/Readline这种自己会阻塞的函数时,会出大问题:阻塞在了Read函数上,不会被唤醒了
- //server-epollET-nonblock.c
- #include "wrap.h"
-
- #define SERVER_PORT 9527
- #define MAXLINE 10
-
- int main(int argc, char* argv[]) {
- struct sockaddr_in serveraddr, clientaddr;
- char buf[MAXLINE], str[INET_ADDRSTRLEN];
-
- int listen_fd = Socket(AF_INET, SOCK_STREAM, 0);
-
- bzero(&serveraddr, sizeof(serveraddr));
- serveraddr.sin_family = AF_INET;
- serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
- serveraddr.sin_port = htons(SERVER_PORT);
-
- Bind(listen_fd, (const struct sockaddr*)&serveraddr, sizeof(serveraddr));
- Listen(listen_fd, 128);
-
- struct epoll_event event;
- struct epoll_event resevent[10];
-
- int efd = epoll_create(10);
- event.events = EPOLLIN | EPOLLET;
- // event.events=EPOLLIN;
-
- socklen_t clientaddr_len = sizeof(clientaddr);
- int connect_fd = Accept(listen_fd, (struct sockaddr*)&clientaddr, &clientaddr_len);
- printf("accepting connections...\n");
-
- int flag = fcntl(connect_fd, F_GETFL);
- flag = flag | O_NONBLOCK;
- fcntl(connect_fd, F_SETFL, flag);
-
- event.data.fd = connect_fd;
- epoll_ctl(efd, EPOLL_CTL_ADD, connect_fd, &event);
-
- while (1) {
- int res = epoll_wait(efd, resevent, 10, -1); // res = 1
-
- int len = 0;
- if (resevent[0].data.fd = connect_fd) {
- while (len = read(connect_fd, buf, MAXLINE / 2) > 0) {
- write(STDOUT_FILENO, buf, len);
- }
- }
- }
- return 0;
- }
LT(默认): Level Triggered是缺省的工作方式,同时支持block和none-block模式。
ET:Edge Triggered是高速工作方式,只支持none-block模式。
在ET模式下,只有在文件描述符上有新的数据到达时才会触发读事件,而在LT模式下,只要文件描述符上还有数据可读,就会一直触发读事件。
使用ET模式的两个要求:
要求一:必须要一次读完/写入所有的数据(当然,只想要第一次读发那部分,那没事,尤其是大文件)。因为ET模式只会通知一次,下一次读取只能是缓冲区接收到了新的数据。要求二:必须设为非阻塞模式。循环读取的时候,如果缓冲区没有数据或者低于水位线,recv/read就会阻塞等待读事件就绪(循环阻塞在其他地方,epoll wait不能实现监听),这会影响到epoll模型中其他文件描述符的操作。
优点:高效;能突破1024文件描述符限制
缺点:只支持Linux,不能跨平台
在前面,我们所写的服务器对客户端发来数据进行处理后直接写会客户端,但这样的做法时不对的。我们并不能保证,数据每次都已可以写回,客户端使用的是滑动窗口机制,当客户端的缓冲区已经满了,就会发生死锁。epoll 反应堆在接收到客户端的数据后,会把红黑树中值为 fd 的节点去掉,重新设置 fd 的事件为写事件,当接收到客户端的可写时,在进行数据处理和回写。之后在把红黑树中写的这个节点去掉,加上检测读的节点。这样会使程序变得更加的严谨。
原来的步骤:
int listen_fd = socket(); bind(); listen(); int epfd = epoll_create(size); struct epoll_event node; epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &node); struct epoll_event ep[size]; while(1){ int N = epoll_wait(epfd, ep, size, -1); for(int i = 0; i < N; ++i) { if(ep[i].data.fd == listen_fd) { int connect_fd = accept(); epoll_ctl(epfd, EPOLL_CTL_ADD, connect_fd, &node); } else { read(); toupper(); //原来后面是这么直接往客户端写 write(); } } }考虑到实际的网络情况,对端可能半关闭或滑动窗口已满,可能写失败,所以在写之前有必要检查可写与否
步骤健壮优化:
int listen_fd = socket(); bind(); listen(); int epfd = epoll_create(size); struct epoll_event node; epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &node); struct epoll_event ep[size]; while(1){ int N = epoll_wait(epfd, ep, size, -1); for(int i = 0; i < N; ++i) { if(ep[i].data.fd == listen_fd) { int connect_fd = accept(); epoll_ctl(epfd, EPOLL_CTL_ADD, connect_fd, &node); } else { read(); toupper(); //现在是先别直接往回写,先判断是否可写 epoll_ctl(epfd, EPOLL_CTL_DEL, connect_fd, &node); //将cfd从监听红黑树上摘下 node.events = EPOLLOUT; //POLLIN改为EPOLLOUT监听写事件 //执行节点的回调函数,执行自定义写动作 node.data.call_back(); epoll_ctl(epfd, EPOLL_CTL_ADD, connect_fd, &node); //重新放到红黑上监听写事件 epoll_wait(epfd, ep, size, -1); //等待cfd可写 //再写 write(); //写完恢复到读监听 epoll_ctl(epfd, EPOLL_CTL_DEL, connect_fd, &node); //再将cfd从监听红黑树上摘下 node.events = EPOLLIN; //POLLIN改为EPOLLIN监听写事件 epoll_ctl(epfd, EPOLL_CTL_ADD, connect_fd, &node); //重新放到红黑上监听读事件 } }优化疑问解答:
1、如此频繁的增加删除不是浪费CPU资源吗?
答:对于同一个socket而言,完成收发至少占用两个树上的位置。而交替只需要一个。任何一种设计方式都会有浪费CPU资源的时候,关键看你浪费得值不值,此处的耗费能否换来更大的收益才是衡量是否浪费的标准。和第二个问题综合来看,这里不算浪费。2、接上面问题,那么不是能通过EPOLL_CTL_MOD直接修改节点监听的事件类型吗?
答:epoll反应堆模型需要反复将读监听节点摘下挂上写监听节点,是因为在Linux内核中,修改监听节点的事件类型需要重新注册,而重新注册会导致性能下降。因此,epoll反应堆模型采用了摘下节点再挂上节点的方式,避免了重新注册的性能损耗。同时,这种方式也可以避免在多线程环境下出现竞态条件的问题。
[附:EPOLL_CTL_MOD内核是怎么实现的?
EPOLL_CTL_MOD是用来修改已经注册的fd的监听事件的函数。当我们需要修改一个已经注册的fd的监听事件时,可以使用EPOLL_CTL_MOD函数。内核实现这个函数的过程是:首先,内核会根据fd找到对应的epitem,然后将epitem的events成员修改为新的监听事件类型。最后,内核会根据epitem的状态,将其从旧的红黑树中删除,并重新插入到新的红黑树中。这样就完成了对已经注册的fd的监听事件的修改。]
3、 为什么要可读以后设置可写,然后一直交替?
答:服务器的基本工作无非数据的收发,epoll反应堆模型准从TCP模式,一问一答。服务器收到了数据,再给与回复,是目前绝大多数服务器的情况。但是服务器能收到数据并不是一定能写数据:
假设一 :服务器接收到客户端数据,刚好此时客户端的接收滑动窗口满,我们假设不进行可写事件设置,并且客户端是有意让自己的接收滑动窗口满的情况(黑客)。那么,当前服务器将随客户端的状态一直阻塞在可写事件,除非你自己在写数据时设置非阻塞+错误处理
假设二 :客户端在发送完数据后突然由于异常原因停止,这将导致一个FIN发送至服务器,如果服务器不设置可写事件监听,那么在接收数据后写入数据会引发异常SIGPIPE,最终服务器进程终止。
————————————————参考:
【精选】epoll的两种模式和epoll反应堆_epoll模式和非epoll有什么区别-CSDN博客
网络高并发服务器之epoll接口、epoll反应堆模型详解及代码实现-CSDN博客【精选】epoll原理详解及epoll反应堆模型_epoll模型-CSDN博客
epoll反应堆模型:ET模式 + 非阻塞 + 回调函数void* ptr
- struct myevent_s {
- int fd; //要监听的文件描述符
- int events; //对应的监听事件
- void *arg; //泛型参数
- void (*call_back)(int fd, int events, void *arg); //回调函数
- int status; //是否在监听:1->在红黑树上(监听), 0->不在(不监听)
- char buf[BUFLEN];
- int len;
- long last_active; //记录每次加入红黑树 g_efd 的时间值
- }*ptr;
给监听结点结构体中联合体中的泛型指针ptr初始化一个结构体指针,这个结构体指针中封装很多成员,其中也包括监听节点的fd和一个回调函数。这样,当判断这个结点有读写事件来临时,结点本身可以自动回调,进行一些自定义动作(用C实现面向对象)
- /*
- *epoll基于非阻塞I/O事件驱动
- */
- #include <stdio.h>
- #include <sys/socket.h>
- #include <sys/epoll.h>
- #include <arpa/inet.h>
- #include <fcntl.h>
- #include <unistd.h>
- #include <errno.h>
- #include <string.h>
- #include <stdlib.h>
- #include <time.h>
-
- #define MAX_EVENTS 1024 //监听上限数
- #define BUFLEN 4096
- #define SERV_PORT 8080
-
- void recvdata(int fd, int events, void *arg);
- void senddata(int fd, int events, void *arg);
-
- /* 描述就绪文件描述符相关信息 */
-
- struct myevent_s {
- int fd; //要监听的文件描述符
- int events; //回调动作的事件,不一定和监听事件events是同一类型
- void *arg; //泛型参数,指向自己,将来用于给回调函数传参
- void (*call_back)(int fd, int events, void *arg); //回调函数
- int status; //是否在监听:1->在红黑树上(监听), 0->不在(不监听)
- char buf[BUFLEN]; //缓冲区
- int len; //缓冲区大小
- long last_active; //记录每次加入红黑树 g_efd 的时间值
- };
- //红黑树树根
- int g_efd; //全局变量, 保存epoll_create返回的文件描述符
- struct myevent_s g_events[MAX_EVENTS+1]; //自定义结构体类型数组,为放回调函数等的ev结构体服务, +1的含义-->listen fd
-
-
- /*将结构体 myevent_s 成员变量 初始化*/
-
- void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg)
- {
- ev->fd = fd;
- ev->call_back = call_back;
- ev->events = 0;
- ev->arg = arg;
- ev->status = 0;
- memset(ev->buf, 0, sizeof(ev->buf));
- ev->len = 0;
- ev->last_active = time(NULL); //调用eventset函数的时间
-
- return;
- }
-
- /* 向 epoll监听的红黑树 添加一个 文件描述符 */
-
- //eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);
- void eventadd(int efd, int events, struct myevent_s *ev)
- {
- struct epoll_event epv = {0, {0}};
- int op;
- epv.data.ptr = ev;
- epv.events = ev->events = events; //EPOLLIN 或 EPOLLOUT
-
- if (ev->status == 0) { //已经在红黑树 g_efd 里
- op = EPOLL_CTL_ADD; //将其加入红黑树 g_efd, 并将status置1
- ev->status = 1;
- }
-
- if (epoll_ctl(efd, op, ev->fd, &epv) < 0) //实际添加/修改
- printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
- else
- printf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events);
-
- return ;
- }
-
- /* 从epoll 监听的 红黑树中删除一个 文件描述符*/
-
- void eventdel(int efd, struct myevent_s *ev)
- {
- struct epoll_event epv = {0, {0}};
-
- if (ev->status != 1) //不在红黑树上
- return ;
-
- //epv.data.ptr = ev;
- epv.data.ptr = NULL;
- ev->status = 0; //修改状态
- epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv); //从红黑树 efd 上将 ev->fd 摘除
-
- return ;
- }
-
- /* 当有文件描述符(包括lfd和其他cfd)就绪, epoll返回, 调用该函数 与客户端建立链接 */
-
- void acceptconn(int lfd, int events, void *arg)
- {
- struct sockaddr_in cin;
- socklen_t len = sizeof(cin);
- int cfd, i;
- //新建cfd
- if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) {
- if (errno != EAGAIN && errno != EINTR) {
- /* 暂时不做出错处理 */
- }
- printf("%s: accept, %s\n", __func__, strerror(errno));//__func__是当前函数的名字
- return ;
- }
-
- do {
- for (i = 0; i < MAX_EVENTS; i++) //从全局数组g_events中找一个空闲坑,放新的cfd
- if (g_events[i].status == 0) //类似于select中找值为-1的元素
- break; //跳出 for
- //下面是健壮处理,新建连接超了
- if (i == MAX_EVENTS) {
- printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS);
- break; //跳出do while(0) 不执行后续代码
- }
-
- int flag = 0;
- //非常重要,将cfd文件属性设置为非阻塞
- if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) { //将cfd也设置为非阻塞
- printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno));
- break;
- }
-
- /* 给cfd设置一个 myevent_s 结构体, 回调函数 设置为 recvdata */
- //ev结构体里面成员的设置是通过下面两个函数先后设置完的
- eventset(&g_events[i], cfd, recvdata, &g_events[i]);
- eventadd(g_efd, EPOLLIN, &g_events[i]); //将cfd添加到红黑树g_efd中,监听读事件
-
- } while(0);
-
- printf("new connect [%s:%d][time:%ld], pos[%d]\n",
- inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i);
- return ;
- }
-
- void recvdata(int fd, int events, void *arg)
- {
- struct myevent_s *ev = (struct myevent_s *)arg;
- int len;
-
- len = recv(fd, ev->buf, sizeof(ev->buf), 0); //读文件描述符,读到ev的buffer中, 一口气读完,数据存入myevent_s成员buf中
-
- eventdel(g_efd, ev); //将该节点从红黑树上摘除
-
- if (len > 0) {
-
- ev->len = len;
- ev->buf[len] = '\0'; //手动添加字符串结束标记
- printf("C[%d]:%s\n", fd, ev->buf);
- //读函数负责读完、摘除当前节点、新挂上写监听节点、写节点回调函数设置为senddata;senddata则做相反动作
- //将摘下来的读监听节点(文件),改下属性,再挂上去用来写
- eventset(ev, fd, senddata, ev); //设置该 fd 对应的回调函数为 senddata
- eventadd(g_efd, EPOLLOUT, ev); //将fd加入红黑树g_efd中,监听其写事件
-
- } else if (len == 0) {
- close(ev->fd);
- /* ev-g_events 地址相减得到偏移元素位置 */
- printf("[fd=%d] pos[%ld], closed\n", fd, ev-g_events);
- } else {
- close(ev->fd);
- printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
- }
-
- return;
- }
-
- void senddata(int fd, int events, void *arg)
- {
- struct myevent_s *ev = (struct myevent_s *)arg;
- int len;
-
- len = send(fd, ev->buf, ev->len, 0); //直接将数据 回写给客户端。未作处理
-
- eventdel(g_efd, ev); //从红黑树g_efd中移除
-
- if (len > 0) {
-
- printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf);
- eventset(ev, fd, recvdata, ev); //将该fd的 回调函数改为 recvdata
- eventadd(g_efd, EPOLLIN, ev); //从新添加到红黑树上, 设为监听读事件
-
- } else {
- close(ev->fd); //关闭链接
- printf("send[fd=%d] error %s\n", fd, strerror(errno));
- }
-
- return ;
- }
-
- /*创建 socket, 初始化lfd */
-
- void initlistensocket(int efd, short port)
- {
- struct sockaddr_in sin;
-
- int lfd = socket(AF_INET, SOCK_STREAM, 0);
- //非常重要,设置文件属性为非阻塞
- fcntl(lfd, F_SETFL, O_NONBLOCK); //将socket设为非阻塞
-
- memset(&sin, 0, sizeof(sin)); //bzero(&sin, sizeof(sin))
- sin.sin_family = AF_INET;
- sin.sin_addr.s_addr = INADDR_ANY;
- sin.sin_port = htons(port);
-
- bind(lfd, (struct sockaddr *)&sin, sizeof(sin));
-
- listen(lfd, 20);
-
- /* void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg); */
- //注意第一个参数,给lfd放在g_events的最后一个
- //注意第三个参数,回调函数lfd的回调函数自然定义为acceptconn
- eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]);
-
- /* void eventadd(int efd, int events, struct myevent_s *ev) */
- eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);
-
- return ;
- }
-
- int main(int argc, char *argv[])
- {//初始化监听服务器(包括监听端口、建立树根、挂lfd)
- unsigned short port = SERV_PORT;//端口号默认8080,也可以传一个自定义端口号
- if (argc == 2)
- port = atoi(argv[1]); //使用用户指定端口.如未指定,用默认端口
- g_efd = epoll_create(MAX_EVENTS+1); //创建红黑树,返回给全局 g_efd
- if (g_efd <= 0)
- printf("create efd in %s err %s\n", __func__, strerror(errno));
-
- initlistensocket(g_efd, port); //初始化监听socket,挂lfd
- printf("server running:port[%d]\n", port);
-
- //其他
- struct epoll_event events[MAX_EVENTS+1]; //给epoll_wait服务,保存已经满足就绪事件的文件描述符数组
- int checkpos = 0, i;
- while (1) {
- /* 超时验证,每次测试100个链接,不测试listenfd 当客户端60秒内没有和服务器通信,则关闭此客户端链接 */
- long now = time(NULL); //当前时间
- for (i = 0; i < 100; i++, checkpos++) { //一次循环检测100个。 使用checkpos记住下一次检测段的起点
- if (checkpos == MAX_EVENTS)
- checkpos = 0;
- if (g_events[checkpos].status != 1) //不在红黑树 g_efd 上
- continue;
- //last_active只要通信就会调用evenset,从而当前节点的最后一次通信活跃时间被更新
- long duration = now - g_events[checkpos].last_active; //客户端不活跃的世间
-
- if (duration >= 60) {
- close(g_events[checkpos].fd); //关闭与该客户端链接
- printf("[fd=%d] timeout\n", g_events[checkpos].fd);
- eventdel(g_efd, &g_events[checkpos]); //将该客户端 从红黑树 g_efd移除
- }
- }
-
- /*监听红黑树g_efd, 将满足的事件的文件描述符加至events数组中*/
- //写申请,对方滑动窗口没满,wait会收到可写信号
- int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000);//定时1秒没有事件满足, 返回0
- if (nfd < 0) {
- printf("epoll_wait error, exit\n");
- break;
- }
- //检查events,包括检查lfd和其他cfd
- for (i = 0; i < nfd; i++) {
- /*使用自定义结构体myevent_s类型局部变量指针, 接收 联合体data的void *ptr成员,这个ptr指针开始发挥威力*/
- struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr;
- //这个里面把原来读事件中的lfd和cfd合并,抽象为一个分支,处理动作则由节点(lfd的回调函数默认为accept)的回调函数做不同反应(读或者新建cfd)。
- if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { //想监听读事件,且读事件就绪
- ev->call_back(ev->fd, events[i].events, ev->arg);//回调函数和参数都在ptr结构体装着,在这里都倒出来用了;注意参3:传的还是ptr结构体
- //lfd EPOLLIN
- }
- if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { //写就绪事件
- ev->call_back(ev->fd, events[i].events, ev->arg);
- }
- }
- }
-
- /* 退出前释放所有资源 */
- return 0;
- }
回顾前面健壮之后的代码:
1、反应堆写法主要是for循环检查events部分,大括号里面的内容写法不一样了
原来是自己写处理,根据来的是什么事件,分情况自己写执行
1)把对节点的处理动作绑在节点本身属性上(ev),节点根据提前监听的事件做相应的处理动作;做完相应动作,把自身监听事件和对应处理动作切换(读->写;写->读)。
- int listen_fd = socket();
- bind();
- listen();
- int epfd = epoll_create(size);
- struct epoll_event node;
- epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &node);
-
- struct epoll_event ep[size];
-
- while(1){
- int N = epoll_wait(epfd, ep, size, -1);
- for(int i = 0; i < N; ++i) {
- if(ep[i].data.fd == listen_fd) {
- int connect_fd = accept();
- epoll_ctl(epfd, EPOLL_CTL_ADD, connect_fd, &node);
- } else {
- read();
- toupper();
- //现在是先别直接往回写,先判断是否可写
- epoll_ctl(epfd, EPOLL_CTL_DEL, connect_fd, &node); //将cfd从监听红黑树上摘下
- node.events = EPOLLOUT; //POLLIN改为EPOLLOUT监听写事件
- //执行节点的回调函数,执行自定义写动作
- node.data.call_back();
- epoll_ctl(epfd, EPOLL_CTL_ADD, connect_fd, &node); //重新放到红黑上监听写事件
- epoll_wait(epfd, ep, size, -1); //等待cfd可写
- //再写
- write();
- //写完恢复到读监听
- epoll_ctl(epfd, EPOLL_CTL_DEL, connect_fd, &node); //再将cfd从监听红黑树上摘下
- node.events = EPOLLIN; //POLLIN改为EPOLLIN监听写事件
- epoll_ctl(epfd, EPOLL_CTL_ADD, connect_fd, &node); //重新放到红黑上监听读事件
- }
- }
并发服务器的的服务模式(多线程、多进程):当有一个Client建立连接,服务器就Create一个线程进行数据处理,处理完后就销毁
//多路IO转接相比多线程/多进程模型的优势:需要创建的线程数大大减小,节省系统资源
由于每次线程创建和销毁的开销很大,而维护成本较低,于是提出一次创建适量个线程备着(等一堆客户端先后来了就能分配使用了), 构成线程池(线程聚集的虚拟的地方,预线程化)
刚开始所有线程没任务时候,这些初始活跃线程阻塞在任务队列为空的状态(没饼吃),这刚好也是条件变量的消费者生产者模型。服务器的任务队列上挂着一个条件变量,有任务到来需要处理时Server唤醒一个线程进行数据处理,处理完毕后让他回到线程池,等待下次调用。
回顾:条件变量的消费者生产者模型
1)上来先创建线程池,线程们阻塞在任务队列为空的条件变量上
2)当客户端产生一个事件,比如申请链接、读、写,任务队列就有任务产生了,就会有条件变量被唤醒的信号
3)线程被唤醒就会执行其回调函数,执行完就会再回到线程池,等着(继续阻塞在条件变量上)
4)除上面以外,还有一个管理者线程,根据当前线程池忙现成的 比例对线程池进行扩容/瘦身/销毁等。
Firefox例子:一打开浏览器,啥活也不干,直接查浏览器的线程,就有这么一堆,这就是用了线程池。
回顾知识关系:其实前面关注的多路IO转接关注的是上图右半部分,实际上后端怎么去处理这些来访的客户端,我们可以用线程池。两个知识点是结合的关系。
线程池规划必要概念:
线程池初始线程数:thread_init_num=38
线程池最大线程数:thread_max_num=500(进程地址空间有限,线程在里面共享资源自然有上限)
线程池中忙的线程数:thread_busy_num
线程池中存活的线程数:thread_live_num
当忙线程数和存活线程数的比例达到一定范围时,要对线程池进行"扩容(发现忙起来了)"和"瘦身(发现过了高峰期)"
扩容或瘦身的步长:thread_step,不能一个一个的起线程或销毁线程了,效率太低上面的这些任务("扩容"和"瘦身)都是专门的管理者线程完成线程池管理和销毁
引入线程池(其实服务器的任务队列也包括)描述结构体:
- typedef struct {
- void *(*function)(void *); /* 函数指针,回调函数 */ //表示一个任务
- void *arg; /* 上面函数的参数 */
- } threadpool_task_t; /* 各子线程任务结构体 */
-
- struct threadpool_t {
- pthread_mutex_t lock; /* 用于锁住本结构体 */
- pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */
-
- pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
- pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */
-
- pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */
- pthread_t adjust_tid; /* 存管理线程tid */
- threadpool_task_t *task_queue; /* 任务队列(数组首地址) */
-
- int min_thr_num; /* 线程池最小线程数 */
- int max_thr_num; /* 线程池最大线程数 */
- int live_thr_num; /* 当前存活线程个数 */
- int busy_thr_num; /* 忙状态线程个数 */
- int wait_exit_thr_num; /* 要销毁的线程个数 */
-
- //任务队列借助环形队列数据结构打造
- int queue_front; /* task_queue队头下标 */
- int queue_rear; /* task_queue队尾下标 */
- int queue_size; /* task_queue队中实际任务数 */
- int queue_max_size; /* task_queue队列可容纳任务数上限 */
-
- int shutdown; /* 标志位,线程池使用状态,true或false ,使用就是false*/
- };
线程池模块分析:
1. main();
创建线程池。
向线程池中添加任务。 借助回调处理任务。
销毁线程池。
2. pthreadpool_create();
创建线程池结构体 指针。
初始化线程池结构体 { N 个成员变量 }
创建 N 个任务线程。
创建 1 个管理者线程。
失败时,销毁开辟的所有空间。(释放)
3. threadpool_thread()
进入子线程回调函数。
接收参数 void *arg --》 pool 结构体
加锁 --》lock --》 整个结构体锁
判断条件变量 --》 wait -------------------170
4. adjust_thread()
循环 10 s 执行一次。
进入管理者线程回调函数
接收参数 void *arg --》 pool 结构体
加锁 --》lock --》 整个结构体锁
获取管理线程池要用的到 变量。 task_num, live_num, busy_num
根据既定算法,使用上述3变量,判断是否应该 创建、销毁线程池中 指定步长的线程。
5. threadpool_add ()
总功能:
模拟产生任务。 num[20]
设置回调函数, 处理任务。 sleep(1) 代表处理完成。
内部实现:
加锁初始化 任务队列结构体成员。 回调函数 function, arg
利用环形队列机制,实现添加任务。 借助队尾指针挪移 % 实现环形队列。
唤醒阻塞在 条件变量上的线程。
解锁6. 从 3. 中的wait之后继续执行,处理任务。
加锁
获取 任务处理回调函数,及参数利用环形队列机制,实现处理任务。 借助队头指针挪移 % 实现。
唤醒阻塞在 条件变量 上的 server。
解锁
加锁
改忙线程数++
解锁
执行处理任务的线程
加锁
改忙线程数——
解锁
7. 创建 销毁线程
管理者线程根据 task_num, live_num, busy_num
根据既定算法,使用上述3变量,判断是否应该 创建、销毁线程池中 指定步长的线程。
如果满足 创建条件
pthread_create(); 回调 任务线程函数。 live_num++
如果满足 销毁条件
wait_exit_thr_num = 10;
signal 给 阻塞在条件变量上的线程 发送 假条件满足信号
跳转至 --170 wait阻塞线程会被 假信号 唤醒。判断: wait_exit_thr_num > 0 pthread_exit();
- #include <stdlib.h>
- #include <pthread.h>
- #include <unistd.h>
- #include <assert.h>
- #include <stdio.h>
- #include <string.h>
- #include <signal.h>
- #include <errno.h>
- #include "threadpool.h"
-
- #define DEFAULT_TIME 10 /*10s检测一次*/
- #define MIN_WAIT_TASK_NUM 10 /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/
- #define DEFAULT_THREAD_VARY 10 /*每次创建和销毁线程的个数*/
- #define true 1
- #define false 0
-
- typedef struct {
- void *(*function)(void *); /* 函数指针,回调函数 */
- void *arg; /* 上面函数的参数 */
- } threadpool_task_t; /* 各子线程任务结构体 */
-
- /* 描述线程池相关信息 */
-
- struct threadpool_t {
- pthread_mutex_t lock; /* 用于锁住本结构体 */
- pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */
-
- pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
- pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */
-
- pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */
- pthread_t adjust_tid; /* 存管理线程tid */
- threadpool_task_t *task_queue; /* 任务队列(数组首地址) */
-
- int min_thr_num; /* 线程池最小线程数 */
- int max_thr_num; /* 线程池最大线程数 */
- int live_thr_num; /* 当前存活线程个数 */
- int busy_thr_num; /* 忙状态线程个数 */
- int wait_exit_thr_num; /* 要销毁的线程个数 */
-
- int queue_front; /* task_queue队头下标 */
- int queue_rear; /* task_queue队尾下标 */
- int queue_size; /* task_queue队中实际任务数 */
- int queue_max_size; /* task_queue队列可容纳任务数上限 */
-
- int shutdown; /* 标志位,线程池使用状态,true或false */
- };
-
- void *threadpool_thread(void *threadpool);
-
- void *adjust_thread(void *threadpool);
-
- int is_thread_alive(pthread_t tid);
- int threadpool_free(threadpool_t *pool);
-
- //threadpool_create(3,100,100);
- /*threadpool_create
- 1、创建线程池(包括任务队列池)结构体
- 2、初始化线程结构体(包括传入的三个参数、线程数组指针、任务队列指针、两个线程池锁两个条件变量)
- 3、创建N个任务线程
- 4、创建1个管理者线程
- 5、失败时,销毁开辟的所有空间
- */
- threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
- {
- int i;
- threadpool_t *pool = NULL; /* 线程池 结构体 */
-
- do {
- if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {
- printf("malloc threadpool fail");
- break; /*跳出do while*/
- }
-
- pool->min_thr_num = min_thr_num;
- pool->max_thr_num = max_thr_num;
- pool->busy_thr_num = 0;
- pool->live_thr_num = min_thr_num; /* 活着的线程数 初值=最小线程数 */
- pool->wait_exit_thr_num = 0;
- pool->queue_size = 0; /* 有0个产品 */
- pool->queue_max_size = queue_max_size; /* 最大任务队列数 */
- pool->queue_front = 0;
- pool->queue_rear = 0;
- pool->shutdown = false; /* 不关闭线程池 */
-
- /* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
- pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
- if (pool->threads == NULL) {
- printf("malloc threads fail");
- break;
- }
- memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
-
- /* 给 任务队列 开辟空间 */
- pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
- if (pool->task_queue == NULL) {
- printf("malloc task_queue fail");
- break;
- }
-
- /* 初始化互斥琐、条件变量 */
- if (pthread_mutex_init(&(pool->lock), NULL) != 0
- || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
- || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
- || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
- {
- printf("init the lock or cond fail");
- break;
- }
-
- /* 启动 min_thr_num 个 work thread */
- for (i = 0; i < min_thr_num; i++) {
- pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);//回调函数threadpool_thread,参数pool /*pool指向当前线程池*/
- printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);
- }
- pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);//每个线程的回调函数adjust_thread,参数pool /* 创建管理者线程 */
-
- return pool;
-
- } while (0);//顶替goto语句,一旦括号中的语句break了,能跳到这个位置
-
- threadpool_free(pool); /* 前面代码调用失败时,释放poll存储空间 */
-
- return NULL;
- }
-
- /* 向线程池中 添加一个任务 */
- //threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 process: 小写---->大写*/
-
- //threadpool_addtask
- 1、加锁
- 2、初始化任务队列结构体成员
- 3、利用环形队列机制,实现添加任务
- 4、唤醒阻塞在条件变量上的线程
- 5、解锁
- int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
- {
- pthread_mutex_lock(&(pool->lock));//还是先
-
- /* ==为真,队列已经满, 调wait阻塞 */
- while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
- pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
- }
-
- if (pool->shutdown) {
- pthread_cond_broadcast(&(pool->queue_not_empty));
- pthread_mutex_unlock(&(pool->lock));
- return 0;
- }
-
- /* 清空 工作线程 调用的回调函数 的参数arg */
- if (pool->task_queue[pool->queue_rear].arg != NULL) {
- pool->task_queue[pool->queue_rear].arg = NULL;
- }
-
- /*添加任务到任务队列里*/
- pool->task_queue[pool->queue_rear].function = function;//真正的任务,传进来的process
- pool->task_queue[pool->queue_rear].arg = arg;//后续线程再从任务队列结构体里面拿任务
- pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 队尾指针移动, 模拟环形 */
- pool->queue_size++;
-
- /*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/
- pthread_cond_signal(&(pool->queue_not_empty));
- pthread_mutex_unlock(&(pool->lock));
-
- return 0;
- }
-
- /* 线程池中各个工作线程的 回调函数 */
- void *threadpool_thread(void *threadpool)
- {
- threadpool_t *pool = (threadpool_t *)threadpool;
- threadpool_task_t task;//每个线程都有个任务结构体
-
- while (true) {
- /* Lock must be taken to wait on conditional variable */
- /*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
- pthread_mutex_lock(&(pool->lock));//先对整个任务结构体加锁,看看筐里有没有饼
-
- /*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/
- while ((pool->queue_size == 0) && (!pool->shutdown)) {
- printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
- pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));//来了说一声,待会儿要来吃饼
-
- /*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
- if (pool->wait_exit_thr_num > 0) {
- pool->wait_exit_thr_num--;
-
- /*如果线程池里线程个数大于最小值时可以结束当前线程*/
- if (pool->live_thr_num > pool->min_thr_num) {
- printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
- pool->live_thr_num--;
- pthread_mutex_unlock(&(pool->lock));
-
- pthread_exit(NULL);//线程本质上是自己退出的,而不是被销毁的
- }
- }
- }
-
- /*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/
- if (pool->shutdown) {
- pthread_mutex_unlock(&(pool->lock));
- printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
- pthread_detach(pthread_self());
- pthread_exit(NULL); /* 线程自行结束 */
- }
-
- /*从任务队列里获取任务(任务具象化就是:一个待执行的函数和要给这个函数传的参数。因此,线程就是一个待执行函数的资源体), 是一个出队操作*/
- task.function = pool->task_queue[pool->queue_front].function;
- task.arg = pool->task_queue[pool->queue_front].arg;
-
- pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; /* 出队,模拟环形队列 */
- pool->queue_size--;
-
- /*通知可以有新的任务添加进来*/
- pthread_cond_broadcast(&(pool->queue_not_full));
-
- /*任务取出后,立即将 线程池琐 释放*/
- pthread_mutex_unlock(&(pool->lock));
-
- /*执行任务*/
- printf("thread 0x%x start working\n", (unsigned int)pthread_self());
- pthread_mutex_lock(&(pool->thread_counter)); /*忙状态线程数变量琐*/
- pool->busy_thr_num++; /*忙状态线程数+1*/
- pthread_mutex_unlock(&(pool->thread_counter));
-
- (*(task.function))(task.arg); /*执行回调函数任务*/
- //task.function(task.arg); /*执行回调函数任务*/
-
- /*任务结束处理*/
- printf("thread 0x%x end working\n", (unsigned int)pthread_self());
- pthread_mutex_lock(&(pool->thread_counter));
- pool->busy_thr_num--; /*处理掉一个任务,忙状态数线程数-1*/
- pthread_mutex_unlock(&(pool->thread_counter));
- }
-
- pthread_exit(NULL);
- }
-
- /* 管理线程 */
- void *adjust_thread(void *threadpool)
- {
- int i;
- threadpool_t *pool = (threadpool_t *)threadpool;
- while (!pool->shutdown) {
-
- sleep(DEFAULT_TIME);//隔一段时间管理一下 /*定时 对线程池管理*/
-
- pthread_mutex_lock(&(pool->lock));//线程们都在阻塞等待任务,结构体锁已经解了
- int queue_size = pool->queue_size; /* 拿到关注 任务数 */
- int live_thr_num = pool->live_thr_num; /* 拿到存活的线程数 */
- pthread_mutex_unlock(&(pool->lock));
-
- pthread_mutex_lock(&(pool->thread_counter)); //拿到忙着的线程数
- int busy_thr_num = pool->busy_thr_num; /* 忙着的线程数 */
- pthread_mutex_unlock(&(pool->thread_counter));
-
- /* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
- if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {
- pthread_mutex_lock(&(pool->lock));
- int add = 0;
-
- /*一次增加 DEFAULT_THREAD 个线程*/
- for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
- && pool->live_thr_num < pool->max_thr_num; i++) {
- if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {
- pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
- add++;
- pool->live_thr_num++;
- }
- }
-
- pthread_mutex_unlock(&(pool->lock));
- }
-
- /* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
- if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {
-
- /* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */
- pthread_mutex_lock(&(pool->lock));
- pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /* 要销毁的线程数 设置为10 */
- pthread_mutex_unlock(&(pool->lock));
-
- for (i = 0; i < DEFAULT_THREAD_VARY; i++) {
- /* 通知处在空闲状态的线程, 他们会自行终止*/
- pthread_cond_signal(&(pool->queue_not_empty));
- }
- }
- }
-
- return NULL;
- }
-
- int threadpool_destroy(threadpool_t *pool)
- {
- int i;
- if (pool == NULL) {
- return -1;
- }
- pool->shutdown = true;
-
- /*先销毁管理线程*/
- pthread_join(pool->adjust_tid, NULL);
-
- for (i = 0; i < pool->live_thr_num; i++) {
- /*通知所有的空闲线程*/
- pthread_cond_broadcast(&(pool->queue_not_empty));
- }
- for (i = 0; i < pool->live_thr_num; i++) {
- pthread_join(pool->threads[i], NULL);
- }
- threadpool_free(pool);
-
- return 0;
- }
-
- int threadpool_free(threadpool_t *pool)
- {
- if (pool == NULL) {
- return -1;
- }
-
- if (pool->task_queue) {
- free(pool->task_queue);
- }
- if (pool->threads) {
- free(pool->threads);
- pthread_mutex_lock(&(pool->lock));
- pthread_mutex_destroy(&(pool->lock));
- pthread_mutex_lock(&(pool->thread_counter));
- pthread_mutex_destroy(&(pool->thread_counter));
- pthread_cond_destroy(&(pool->queue_not_empty));
- pthread_cond_destroy(&(pool->queue_not_full));
- }
- free(pool);
- pool = NULL;
-
- return 0;
- }
-
- int threadpool_all_threadnum(threadpool_t *pool)
- {
- int all_threadnum = -1; // 总线程数
-
- pthread_mutex_lock(&(pool->lock));
- all_threadnum = pool->live_thr_num; // 存活线程数
- pthread_mutex_unlock(&(pool->lock));
-
- return all_threadnum;
- }
-
- int threadpool_busy_threadnum(threadpool_t *pool)
- {
- int busy_threadnum = -1; // 忙线程数
-
- pthread_mutex_lock(&(pool->thread_counter));
- busy_threadnum = pool->busy_thr_num;
- pthread_mutex_unlock(&(pool->thread_counter));
-
- return busy_threadnum;
- }
-
- int is_thread_alive(pthread_t tid)
- {
- int kill_rc = pthread_kill(tid, 0); //发0号信号,测试线程是否存活
- if (kill_rc == ESRCH) {
- return false;
- }
- return true;
- }
-
- /*测试*/
-
- #if 1
-
- /* 线程池中的线程,模拟处理业务 */
- void *process(void *arg)
- {
- printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),(int)arg);
- sleep(1); //模拟 小---大写
- printf("task %d is end\n",(int)arg);
-
- return NULL;
- }
-
- int main(void)
- {
- /*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/
-
- threadpool_t *thp = threadpool_create(3,100,100); /*创建线程池,池里最小3个线程,最大100,队列最大100*/
- printf("pool inited");
-
- //int *num = (int *)malloc(sizeof(int)*20);
- int num[20], i;
- for (i = 0; i < 20; i++) {
- num[i] = i;
- printf("add task %d\n",i);
-
- /*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) */
-
- threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 */ //当然这个函数也负责通知任务不空的条件变量成熟,让模型运作起来
- }
-
- sleep(10);//模拟服务器的其他事务,除了让线程池工作起来 ,比如跟客户端连接等等 /* 等子线程完成任务 */
- threadpool_destroy(thp);
-
- return 0;
- }
-
- #endif
CSDNhttps://mp.csdn.net/mp_blog/creation/editor/134241870?not_checkout=1
CSDNhttps://mp.csdn.net/mp_blog/creation/editor/134260942?not_checkout=1
RPC
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。