赞
踩
Socket本身有“插座”的意思,在Linux环境下,用于表示进程间网络通信的特殊文件类型。本质为内核借助缓冲区形成的伪文件。
网络数据流的地址应这样规定:先发出的数据是低地址,后发出的数据是高地址。
发送主机通常将发送缓冲区中的数据按内存地址从低到高的顺序发出,
接收主机把从网络上接到的字节依次保存在接收缓冲区中,也是按内存地址从低到高的顺序保存。
TCP/IP协议规定,网络数据流应采用大端字节序,即低地址高字节。计算机用的小端存储。
原因:在UNIX年代诞生的TCP/IP,也就是IBM公司的大型机主要用的大端存储。此后在windows后才流行小端存储,intel架构。
网络字节序和主机字节序的转换
#include <arpa/inet.h>
uint32_t htonl(uint32_t hostlong); # host主机 to net网络 long型 ip,4字节
uint16_t htons(uint16_t hostshort); # host主机 to net网络 short型 端口号,2字节
uint32_t ntohl(uint32_t netlong);
uint16_t ntohs(uint16_t netshort);
htonl(INADDR_ANY): 网络地址为INADDR_ANY,这个宏表示本地的任意IP地址。
IP地址转换函数
#include <arpa/inet.h>
# p字符串ip(点分十进制式) to转 net网络ip
int inet_pton(int af, const char *src, void *dst);
参数:
af:指定ip地址版本,AF_INET -> ipv4 ; AF_INET6 -> ipv6
src:点分十进制 ip,192.168.1.24
dst:传出参数
const char *inet_ntop(int af, const void *src, char *dst, socklen_t size);
参数:
size: sizeof(dst)
IPv4和IPv6的地址格式定义在netinet/in.h中
IPv4地址用sockaddr_in结构体表示,包括16位端口号和32位IP地址
IPv6地址用sockaddr_in6结构体表示,包括16位端口号、128位IP地址和一些控制字段。
UNIX Domain Socket的地址格式定义在sys/un.h中,用sock-addr_un结构体表示。
早期ipv4用的是 strcut sockaddr类型。 现在 sockaddr类型已经退化为(void *)的作用,用于给函数传递地址。 定义时为:sockaddr_in 或者 sockaddr_in6 调用函数时:强转为 (struct sockaddr *)&addr。 bind、accept、connect需要强转。 #include <arpa/inet.h> struct sockaddr { sa_family_t sa_family; /* address family, AF_xxx */ char sa_data[14]; /* 14 bytes of protocol address */ }; struct sockaddr_in { sa_family_t sin_family; /* Address family */ 协议AF_INET in_port_t sin_port; /* Port number */ 端口号 struct in_addr sin_addr; /* Internet address */ IP地址 }; struct in_addr{ uint32_t s_addr; # IP地址 }
#include <sys/types.h> /* See NOTES */ #include <sys/socket.h> 1、创建socket int socket(int domain, int type, int protocol); 返回值: 成功:socket文件描述符。 失败:-1,设置errno 参数: domain: ip地址协议。AF_INET、AF_INET6、AF_UNIX本地套接字 type: 通信协议。 SOCK_STREAM:流式协议。按序、可靠、数据完整、基于字节流。 使用TCP SOCK_DGRAM: 报式协议。无连接、固定长度、不可靠。 使用UDP 其他:SOCK_SEQPACKET、SOCK_RAW、SOCK_RDM protocol: 传 0 表示默认协议。 2、将sockfd 绑定 ip和端口 int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen); 返回值: 成功返回0,失败返回-1, 设置errno 参数: sockfd: socket文件描述符 addr: 构造出IP地址加端口号 addrlen: sizeof(addr)长度 3、指定监听上限数,同时客户端建立连接(处于和刚建立三次握手的数量和)。 int listen(int sockfd, int backlog); 参数: sockfd: socket文件描述符 backlog: 排队建立3次握手队列和刚刚建立3次握手队列的链接数和 4、服务端调用,阻塞等待客户端连接 int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen); 返回值: 成功返回一个 新的 socket文件描述符,用于和客户端通信。失败返回-1,设置errno 参数: sockdf: socket文件描述符 addr: 传出参数,返回客户端socket,服务端不用初始化。 addrlen: 传入传出参数。传入sizeof(addr)大小,传出接收到地址结构体的大小。不关心填NULL 5、客户端调用,建立连接 int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen); 返回值: 成功返回0。 失败返回-1,设置errno 参数: sockdf: socket文件描述符 addr: 传入参数,指定服务器端地址信息,含IP地址和端口号C++ addrlen:传入参数,传入sizeof(addr)大小
由于客户端不需要固定的端口号,因此不必调用bind(),客户端的端口号由内核自动分配。
客户端不是不允许调用bind(),只是没有必要调用bind()固定一个端口号,服务器也不是必须调用bind(),但如果服务器不调用bind(),内核会自动给服务器分配监听端口,每次启动服务器时端口号都不一样,客户端要连接服务器就会遇到麻烦。
客户端和服务器启动后可以使用netstat命令查看链接情况:
netstat -apn|grep 6666
#include <sys/socket.h>
int shutdown(int sockfd, int how);
sockfd: 需要关闭的socket的描述符
how: 允许为shutdown操作选择以下几种方式:
SHUT_RD(0): 关闭sockfd上的读功能,此选项将不允许sockfd进行读操作。
该套接字不再接受数据,任何当前在套接字接受缓冲区的数据将被无声的丢弃掉。
SHUT_WR(1): 关闭sockfd的写功能,此选项将不允许sockfd进行写操作。进程不能在对此套接字发出写操作。
SHUT_RDWR(2): 关闭sockfd的读写功能。相当于调用shutdown两次:首先是以SHUT_RD,然后以SHUT_WR。
使用close中止一个连接,但它只是减少描述符的引用计数,并不直接关闭连接,只有当描述符的引用计数为0时才关闭连接。
shutdown不考虑描述符的引用计数,直接关闭描述符。
注意:
如果有多个进程共享一个套接字,close每被调用一次,计数减1,直到计数为0时,也就是所用进程都调用了close,套接字将被释放。
在多进程中如果一个进程调用了shutdown(sfd, SHUT_RDWR)后,其它的进程将无法进行通信。但,如果一个进程close(sfd)将不会影响到其它进程。
(1)让4次握手关闭流程更加可靠;4次握手的最后一个ACK是是由主动关闭方发送出去的,若这个ACK丢失,被动关闭方会再次发一个FIN过来。若主动关闭方能够保持一个2MSL的TIME_WAIT状态,则有更大的机会让丢失的ACK被再次发送出去。
(2)防止lost duplicate对后续新建正常链接的传输造成破坏。lost uplicate在实际的网络中非常常见,经常是由于路由器产生故障,路径无法收敛,导致一个packet在路由器A,B,C之间做类似死循环的跳转。IP头部有个TTL,限制了一个包在网络中的最大跳数,因此这个包有两种命运,要么最后TTL变为0,在网络中消失;要么TTL在变为0之前路由器路径收敛,它凭借剩余的TTL跳数终于到达目的地。但非常可惜的是TCP通过超时重传机制在早些时候发送了一个跟它一模一样的包,并先于它达到了目的地,因此它的命运也就注定被TCP协议栈抛弃。
另外一个概念叫做incarnation connection,指跟上次的socket pair一摸一样的新连接,叫做incarnation of previous connection。lost uplicate加上incarnation connection,则会对我们的传输造成致命的错误。
TCP是流式的,所有包到达的顺序是不一致的,依靠序列号由TCP协议栈做顺序的拼接;假设一个incarnation connection这时收到的seq=1000, 来了一个lost duplicate为seq=1000,len=1000, 则TCP认为这个lost duplicate合法,并存放入了receive buffer,导致传输出现错误。通过一个2MSL TIME_WAIT状态,确保所有的lost duplicate都会消失掉,避免对新连接造成错误。
(1)发最后ACK的是主动关闭一方。
(2)只要有一方保持TIME_WAIT状态,就能起到避免incarnation connection在2MSL内的重新建立,不需要两方都有。
如何正确对待2MSL TIME_WAIT?
RFC要求socket pair在处于TIME_WAIT时,不能再起一个incarnation connection。但绝大部分TCP实现,强加了更为严格的限制。在2MSL等待期间,socket中使用的本地端口在默认情况下不能再被使用。
若A 10.234.5.5 : 1234和B 10.55.55.60 : 6666建立了连接,A主动关闭,那么在A端只要port为1234,无论对方的port和ip是什么,都不允许再起服务。这甚至比RFC限制更为严格,RFC仅仅是要求socket pair不一致,而实现当中只要这个port处于TIME_WAIT,就不允许起连接。这个限制对主动打开方来说是无所谓的,因为一般用的是临时端口;但对于被动打开方,一般是server,就悲剧了,因为server一般是熟知端口。比如http,一般端口是80,不可能允许这个服务在2MSL内不能起来。
解决方案是给服务器的socket设置SO_REUSEADDR选项,这样的话就算熟知端口处于TIME_WAIT状态,在这个端口上依旧可以将服务启动。当然,虽然有了SO_REUSEADDR选项,但sockt pair这个限制依旧存在。比如上面的例子,A通过SO_REUSEADDR选项依旧在1234端口上起了监听,但这时我们若是从B通过6666端口去连它,TCP协议会告诉我们连接失败,原因为Address already in use.
RFC 793中规定MSL为2分钟,实际应用中常用的是30秒,1分钟和2分钟等。
RFC (Request For Comments),是一系列以编号排定的文件。收集了有关因特网相关资讯,以及UNIX和因特网社群的软件文件。
服务端主动关闭后,属于TIME_WAIT 状态,端口不可用,2MSL后才能用。
在server的TCP连接没有完全断开之前不允许重新监听是不合理的。因为,TCP连接没有完全断开指的是connfd(127.0.0.1:6666)没有完全断开,而我们重新监听的是lis-tenfd(0.0.0.0:6666),虽然是占用同一个端口,但IP地址不同,connfd对应的是与某个客户端通讯的一个具体的IP地址,而listenfd对应的是wildcard address。
解决这个问题的方法是使用setsockopt()设置socket描述符的选项SO_REUSEADDR为1,表示允许创建端口号相同但IP地址不同的多个socket描述符。
服务器仍是 TIME_WAIT ,但是端口允许复用。
在server代码的socket()和bind()调用之间插入如下代码:
int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
有关setsockopt可以设置的其它选项请参考UNP第7章。
在TCP网络通信中,经常会出现客户端和服务器之间的非正常断开,需要实时检测查询链接状态。
keepAlive = 1;
setsockopt(listenfd, SOL_SOCKET, SO_KEEPALIVE, (void*)&keepAlive, sizeof(keepAlive));
暂略
TCP如何建立链接
TCP如何通信
TCP如何关闭链接
什么是滑动窗口
什么是半关闭
局域网内两台机器如何利用TCP/IP通信
internet上两台主机如何进行通信
如何在internet上识别唯一一个进程
答:通过“IP地址+端口号”来区分不同的服务
为什么说TCP是可靠的链接,UDP不可靠
路由器和交换机的区别
点到点,端到端
多路IO转接服务器:也叫做多任务IO服务器。该类服务器实现的主旨思想是,不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件。主要使用的方法有三种
内核发送反馈。常用的accept函数会阻塞。如果是内核的话,直接反馈建立连接。 读写数据时,也是一样的,不用阻塞等待客户端写入数据。
select的问题:
select能监听的个数 受限于 文件描述符个数上限 FD_SETSIZE,一般为1024,单纯改变进程打开的文件描述符个数并不能改变select监听文件个数
解决1024以下客户端时使用select是很合适的,但如果链接客户端过多,select采用的是轮询模型,会大大降低服务器响应效率,不应在select上投入更多精力
因为select会修改传入参数,所以需要额外保存监听的集合。
#include <sys/select.h> /* According to earlier standards */ #include <sys/time.h> #include <sys/types.h> #include <unistd.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); 返回值:监听的所有集合中,满足条件的总数。 失败 -1,设置errno 参数: nfds: 监控的文件描述符集里最大文件描述符加1,告诉内核检测前多少个文件描述符的状态 readfds: 监控有读数据到达文件描述符集合,传入传出参数 writefds: 监控写数据到达文件描述符集合,传入传出参数 exceptfds: 监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数 timeout: 定时阻塞监控时间,3种情况 1.NULL,永远等下去 2.设置timeval,等待固定时间 3.设置timeval里时间均为0,检查描述字后立即返回,轮询 struct timeval { long tv_sec; /* seconds */ long tv_usec; /* microseconds */ }; 文件描述符集合fd_set 的操作函数: void FD_ZERO(fd_set *set); //把文件描述符集合里所有位清0 void FD_CLR(int fd, fd_set *set); //把文件描述符集合里fd清0 int FD_ISSET(int fd, fd_set *set); //测试文件描述符集合里fd是否置1 void FD_SET(int fd, fd_set *set); //把文件描述符集合里fd位置1
优点:
缺点:不能跨平台
如果不再监控某个文件描述符时,可以把pollfd中,fd设置为-1,poll不再监控此pollfd,下次返回时,把revents设置为0。
#include <poll.h> int poll(struct pollfd *fds, nfds_t nfds, int timeout); 参数: fds:结构体数组的首地址 struct pollfd { int fd; /* 文件描述符 */ short events; /* 监控的事件 */ short revents; /* 监控事件中满足条件返回的事件 */ }; # POLLIN 普通或带外优先数据可读,即POLLRDNORM | POLLRDBAND POLLRDNORM 数据可读 POLLRDBAND 优先级带数据可读 POLLPRI 高优先级可读数据 POLLOUT 普通或带外数据可写 POLLWRNORM 数据可写 POLLWRBAND 优先级带数据可写 POLLERR 发生错误 POLLHUP 发生挂起 POLLNVAL 描述字不是一个打开的文件 nfds 监控数组中有多少文件描述符需要被监控 timeout 毫秒级等待 -1:阻塞等,#define INFTIM -1 Linux中没有定义此宏 0:立即返回,不阻塞进程 >0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值
epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
epoll除了提供select/poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
可以使用cat命令查看一个进程可以打开的socket描述符上限。
cat /proc/sys/fs/file-max # 硬件上限,
如有需要,可以通过修改配置文件的方式修改该上限值。
sudo vi /etc/security/limits.conf
# 在文件尾部写入以下配置,soft软限制,hard硬限制。如下图所示。
* soft nofile 65536
* hard nofile 100000
基本API(3个)
#include <sys/epoll.h> 1. 创建一个epoll句柄,参数size用来告诉内核监听的文件描述符的个数,跟内存大小有关。 int epoll_create(int size) # size:监听数目,建议值 返回值: 文件描述符(句柄)、指向内核中红黑树的树根节点(二分法查找) 2. 控制某个epoll监控的文件描述符上的事件:注册、修改、删除。 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) 返回值:成功返回0,不成功返回-1 参数: epfd: 为epoll_creat的句柄 op: 表示动作,3: EPOLL_CTL_ADD (注册新的fd到epfd), EPOLL_CTL_MOD (修改已经注册的fd的监听事件), EPOLL_CTL_DEL (从epfd删除一个fd); fd: 监控的文件描述符 event: 告诉内核需要监听的事件,结构体 struct epoll_event { uint32_t events; /* Epoll events */ EPOLLIN EPOLLOUT EPOLLERR... epoll_data_t data; /* User data variable */ }; typedef union epoll_data { void *ptr; # 泛型指针 int fd; # 监控的文件描述符 uint32_t u32; # 32位无符号整数 uint64_t u64; # 64位无符号整数 } epoll_data_t; events: EPOLLIN : 表示对应的文件描述符可以读(包括对端SOCKET正常关闭) EPOLLOUT: 表示对应的文件描述符可以写 EPOLLPRI: 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来) EPOLLERR: 表示对应的文件描述符发生错误 EPOLLHUP: 表示对应的文件描述符被挂断; EPOLLET: 将EPOLL设为边缘触发模式,这是相对于水平触发而言的 EPOLLONESHOT:只监听一次事件,如果还需要继续监听这个socket, 需要再次把这个socket加入到EPOLL队列里 3. 等待所监控文件描述符上有事件的产生,类似于select()调用。 int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) 返回值: 成功返回有多少文件描述符就绪,时间到时返回0,出错返回-1 参数: epfd: 为epoll_creat的句柄 events: 传出参数,从内核得到epoll_event的集合,数组。 maxevents: 告之内核这个events有多大,其值不能大于创建epoll_create()时的size timeout: 是超时时间 -1: 阻塞 0: 立即返回,非阻塞 >0: 指定毫秒
文件描述符不是指针,而是用fd找到一个指针,再找结构体地址
epoll使用过程:
1、创建一个红黑树树根
int epoll_create(10);
2、读监听 lfd
struct epoll_event evt; // 创建epoll_event 结构体
evt.events = EPOPLLIN; // 设置读监听
evt.data.fd = lfd; // 可以泛型,这里直接传fd。联合体,只能一个有效。
epoll_ctl(epfd, EPOLL_STL_ADD, lfd, &evt); //增加节点到红黑树
// 红黑树节点自带一个结构体,存储了 EPOLLIN和lfd。读操作 和 对应fd
3、取回结果
struct epoll_event evts[10]; // 用来返回监听的 fd,传出参数
int ret = epoll_wait(epfd, evts, 10, -1); // 返回 满足事件的fd个数
循环ret次,判断 fd == lfd 监听 accept。 如果 fd == cfd 读取 read。
总的server代码 epoll_concurrent
struct epoll_event tep, ep[5000]; // 创建epoll变量 int listenfd, connfd, sockfd; listenfd = Socket(AF_INET, SOCK_STREAM, 0); // 创建socket int opt = 1; // 端口复用,server关闭后输入TIME_WAIT,端口马上创新使用 setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof opt); Bind(listenfd, (struct sockaddr *) &servaddr, sizeof servaddr); Listen(listenfd, 20); efd = epoll_create(OPEN_MAX); // 红黑树树根 res = epoll_ctl(efd , EPOLL_STL_ADD, listenfd, &tep); //增加节点到红黑树 while(1){ int ret = epoll_wait(epfd, evts, 10, -1); // 返回 满足事件的fd个数 遍历 ret, if(!(ep[i].events & EPOLLIN)) continue; // 不是读事件,直接返回 if(ep[i].data.fd == listenfd) // 监听 accept,并且添加cfd到红黑树 else{ sockfd = ep[i].data.fd; n = read(sockfd, buf, MAXLINE); // n = 0,客户端关闭了 删除cfd,close sockfd // n小于0,读出错 // n大于0,转大写,write(STDOUT_FILENO, buf, n); writen(sockfd, buf, n); } }
为什么要ET模式:
epoll非阻塞IO
1、边缘触发
2、用循环的方式读取数据
while((len=read(fd, buf, size))>0)
3、设置文件描述符属性为O_NONBLOCK
flag = fcntl(connfd, F_GETFL); # F_GETFL获取文件描述符属性
flag |= O_NONBLOCK; # 修改文件描述符属性
fcntl(connfd, F_SETFL, flag); # F_SETFL设置文件描述符属性
使用水平触发的阻塞IO可能造成死锁
1、比如需要500B的数据,但是只有200B的数据。此时阻塞在read函数上。
2、但是又无法去用epoll监听客户端新写的数据。造成了死锁。
使用 libevent 跨平台开发库。
epoll – 服务器 – 监听 – cfd --可读 – epoll返回 – read – 小写转大写 – write – epoll继续监听。
epoll 反应堆模型:
客户端不一定可写,比如"滑动窗口",所以需要判断
epoll – 服务器 – 监听 – cfd – 可读 – epoll返回 – read – cfd从树上摘下 – 设置监听cfd写事件 — 小写转大写 – 等待epoll_wait 返回可写 — 回写客户端 – cfd从树上摘下 – 设置监听cfd读事件 – epoll继续监听。
epoll事件不再传fd而是传泛型参数,用结构体把fd、参数指针、回调函数指针一起传过去。
存储fd加入到红黑树的时间,如果长时间不干事就剔除。 last_active = time(NULL) 绝对时间
/* 描述就绪文件描述符相关信息 */ struct myevent_s { int fd; //要监听的文件描述符 int events; //对应的监听事件 void *arg; //泛型参数 void (*call_back)(int fd, int events, void *arg); //回调函数 int status; //是否在监听:1->在红黑树上(监听), 0->不在(不监听) char buf[BUFLEN]; int len; long last_active; //记录每次加入红黑树 g_efd 的时间值 }; int g_efd; // 全局变量, 保存epoll_create返回的红黑树树根节点 struct myevent_s g_events[MAX_EVENTS+1]; //结构体数组. +1-->listen fd main内部: 初始化 0、g_efd = epoll_create(MAX_EVENTS+1) 创建红黑树树根 epoll_event 数组,节点 1、initlistensocket(g_efd, port); 创建listen fd,并设置为非阻塞。 socket创建、fcntl非阻塞、 设置 lfd 的回调函数和参数 // void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg); eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]); // g_events数组的最后一个给lfd用。 并设置回调函数 acceptconn,参数为结构体本身。 // 会初始化 last_active 时间,表示 lfd 运行开始 添加 lfd 到红黑树上 // void eventadd(int efd, int events, struct myevent_s *ev) eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]); // 会设置 staus,表示节点是否在树上 bind、listen、 循环: 1、如果一直不干活就剔除 获取当前事件 一次查看一百个连接,判断 last_active 时间是不是大于60s,是则 eventdel 每次 eventset 都会更新 last_active 2、int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000); // 阻塞监听1s 循环nfd 。 // 用 events[i].data.ptr泛型指针保持 myevent_s 结构体 struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr; // 实际上由两个判断,源码带有一些相关的琐碎操作。call_back函数不同 判断读写事件: // 调用回调函数 ev->call_back(ev->fd, events[i].events, ev->arg); 3、lfd 回调函数 acceptconn: void acceptconn(int lfd, int events, void *arg) cfd = accept(lfd, (struct sockaddr *)&cin, &len) // 获取cfd 遍历g_events,找到第一个 status = 0 的位置加入cfd cfd 设置非阻塞 eventset 设置回调函数 recvdata、eventadd 加到树上 4、cfd 回调函数 recvdata: - 读取数据,recv(scoket_fd, buf, len, flag=0); // 读套接字 - 删除读监听,eventdel(g_efd, ev); // status状态变成0 - 如果读取数据大于0,转大写,ev->buf[len] - 设置写监听,epoll_set 回调函数 senddata、epoll_add 加树 - 出错,关闭fd。 5、fd 回调函数 senddata,从树上摘下写事件,写数据,挂读事件 补充:do-while实现goto语句: do{ // do sth1 break; // do sth2 }while(0); - 如果sth1后执行break,sth2的语句不会执行。 - 由于while是0,所以不会循环。
回射服务器
// 任务队列 中存的 单个任务信息 typedef struct { void *(*function)(void *); /* 函数指针,回调函数 */ void *arg; /* 上面函数的参数 */ } threadpool_task_t; /* 各子线程任务结构体 */ /* 描述线程池相关信息 */ struct threadpool_t { pthread_mutex_t lock; /* 用于锁住本结构体 */ pthread_mutex_t thread_counter; /* 记录忙状态线程个数的琐 -- busy_thr_num */ pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */ pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */ pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */ pthread_t adjust_tid; /* 存管理线程tid */ threadpool_task_t *task_queue; /* 任务队列 */ int min_thr_num; /* 线程池最小线程数 */ int max_thr_num; /* 线程池最大线程数 */ int live_thr_num; /* 当前存活线程个数 */ int busy_thr_num; /* 忙状态线程个数 */ int wait_exit_thr_num; /* 要销毁的线程个数 */ int queue_front; /* task_queue队头下标 */ int queue_rear; /* task_queue队尾下标 */ int queue_size; /* task_queue队中实际任务数 */ int queue_max_size; /* task_queue队列可容纳任务数上限 */ int shutdown; /* 标志位,线程池使用状态,true或false */ }; 0、main任务 /*创建线程池 thp ,池里最小3个线程,最大100,队列最大100*/ threadpool_t *thp = threadpool_create(3,100,100); /* 向线程池中添加任务 */ num[i] 为 process的参数 threadpool_add(thp, process, (void*)&num[i]); 。。。 threadpool_destroy(thp); // 销毁线程池 1、初始化线程池: threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size) 初始化结构体的参数。 threadpool_t *pool malloc并menset最大线程id空间。数组下标访问,需要是连续的。 malloc最大任务队列空间。 初始化锁 lock、thread_counter。条件变量 queue_not_empty,queue_not_full 创建线程,pthread_create。 工作线程,其对应的回调函数 threadpool_thread 管理者线程,其对应的回调函数 adjust_thread 返回线程池结构体 pool 。 中途出错,一个个free掉 线程池结构体 内的成员。 2、工作线程: void *threadpool_thread(void *threadpool) 加结构体锁。 while 没有任务 将线程阻塞在条件变量queue_not_empty上。等待 一旦有任务,条件变量 queue_not_empty 唤醒,跳出阻塞。 判断是否需要砍线程。 有任务了 跳出 while 任务队列出队,设置回调函数和参数。 广播条件变量 queue_not_full,表示可以增加任务。 释放结构体锁。 加thread_counter锁。设置工作线程数加一。解thread_counter锁。 调用回调函数。 加thread_counter锁。设置工作线程数减一。解thread_counter锁。 3、管理者线程: void *adjust_thread(void *threadpool) 睡眠固定时间,不需要一直控制线程池。 加结构体锁。 变量另存 当前线程数、存活线程数。 释放结构体锁。 加 thread_counter 锁 另存 忙着的线程数。 解 thread_counter 锁 线程池增加算法:任务数 大于 最小线程池个数, 且 存活的线程数 少于 最大线程个数时 销毁空闲线程算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时 销毁线程:发送 queue_not_empty 条件变量,通知处在空闲状态的线程自行终止 4、向线程增加任务。 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) 加结构体锁。 任务队满,阻塞等待 条件变量 queue_not_full 。 清空工作线程 调用的回调函数 的参数arg 添加任务到任务队列里 任务队列不为空,唤醒一个线程池中的任务线程。 pthread_cond_signal - queue_not_empty 解结构体锁。 4、线程池退出: int threadpool_destroy(threadpool_t *pool) 结构体shutdown = true。 先销毁管理线程。pthread_join(pool->adjust_tid, NULL); 通知所有的空闲线程销毁。pthread_cond_broadcast - queue_not_empty 销毁工作线程:pthread_join(pool->threads[i], NULL); 释放线程池空间(队列、锁、条件变量等)
TCP: 面向连接的可靠数据包传递 —完全弥补
优点:稳定:
数据稳定 — 丢包回传(回执机制)(丢包率97‰)
速率稳定
流量稳定 “滑动窗口”
缺点: 效率低、速度慢
使用场景:大文件、重要文件传输
UDP: 无连接的不可靠报文传递。----完全不弥补
缺点:不稳定:数据、速率、流量
优点:效率高、速度快
使用场景:对实时性要求较高,视频会议、视频电话、广播、飞秋
实际运用: TCP — TCP+UDP — UDP + 应用层自定义协议弥补UDP的丢包。
与TCP类似的,UDP也有可能出现缓冲区被填满后,再接收数据时丢包的现象。由于它没有TCP滑动窗口的机制,通常采用如下两种方法解决:
服务器应用层设计流量控制,控制发送数据速度。
借助setsockopt函数改变接收缓冲区大小。如:
#include <sys/socket.h>
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
int n = 220x1024
setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &n, sizeof(n));
由于UDP不需要维护连接,程序逻辑简单了很多,但是UDP协议是不可靠的,保证通讯可靠性的机制需要在应用层实现。
ssize_t recv(int sockfd, void *buf, size_t len, int flags); # flags默认传0
# 用这个,接收socket传来的数据
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen); # src_addr传出,addrlen传入传出
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
ssize_t send(int sockfd, void *buf, size_t len, int flags);
# 用这个,回传数据
ssize_t sendto(int sockfd, void *buf, size_t len, int flags
const struct sockaddr *dest_addr, socklen_t *addrlen); # dest_addr传入
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
IP:192.168.42.255(广播) --32位 255 255.255.255.255
IP:192.168.42.1(网关)
默认不可以广播,需要给sockfd开放广播权限。
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen); #define SERVER_PORT 8000 /* 无关紧要 */ #define MAXLINE 1500 #define BROADCAST_IP "192.168.42.255" #define CLIENT_PORT 9000 # 重要 # 给sockfd开放广播权限。 int flag = 1; # 表示允许 setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &flag, sozeof(flag)); # 构造 client 地址 IP+端口 192.168.7.255+9000 bzero(&clientaddr, sizeof(clientaddr)); clientaddr.sin_family = AF_INET; inet_pton(AF_INET, BROADCAST_IP, &clientaddr.sin_addr.s_addr); clientaddr.sin_port = htons(CLIENT_PORT); sendto(sockfd, buf, strlen(buf), 0, (struct sockaddr *)&clientaddr, sizeof(clientaddr));
组播组可以是永久的也可以是临时的。组播组地址中,有一部分由官方分配的,称为永久组播组。永久组播组保持不变的是它的ip地址,组中的成员构成可以发生变化。永久组播组中成员的数量都可以是任意的,甚至可以为零。那些没有保留下来供永久组播组使用的ip组播地址,可以被临时组播组利用。
224.0.0.0~224.0.0.255 为预留的组播地址(永久组地址),地址224.0.0.0保留不做分配,其它地址供路由协议使用; 224.0.1.0~224.0.1.255 是公用组播地址,可以用于Internet;欲使用需申请。 224.0.2.0~238.255.255.255 为用户可用的组播地址(临时组地址),全网范围内有效; 239.0.0.0~239.255.255.255 为本地管理组播地址,仅在特定的本地范围内有效。 获取网卡编号, 1、ip ad命令 2、if_nametoindex 命令可以根据网卡名,获取网卡序号。 unsigned int if_nametoindex(const char *ifname); # eth0 int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen); server:获取组播权限。 setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_IF, &group, sizeof(group)); client:将本客户端加入组播。 setsockopt(confd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group));
setsockopt作用:
1. 端口复用。
2. 设置缓冲区大小
3. 开放广播权限
4. 开放组播权限
5. 加入组播组。
实现基本思路:
1. 屏幕截图模块。 24帧
2. 截取帧数 8-12帧
3. 压缩图片 M --> K
4. 压缩数据包
5. 传递 - 多播
6. 解压缩 --- 算法。
7. 成像
1. Pipe fifo 实现最简单
2. mmap 非血缘关系进程间
3. 信号 开销小
4. domain 稳定性最好
网络socket也可用于同一台主机的进程间通讯(通过loopback地址127.0.0.1),但是UNIX Domain Socket用于IPC更有效率:不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等。
IPC机制本质上是可靠的通讯,而网络协议是为不可靠的通讯设计的。UNIX Domain Socket也提供面向流和面向数据包两种API接口,类似于TCP和UDP,但是面向消息的UNIX Domain Socket也是可靠的,消息既不会丢失也不会顺序错乱。
UNIX Domain Socket是全双工的,API接口语义丰富,相比其它IPC机制有明显的优越性,目前已成为使用最广泛的IPC机制,比如X Window服务器和GUI程序之间就是通过UNIX Domain Socket通讯的。
使用UNIX Domain Socket的过程和网络socket十分相似,也要先调用socket()创建一个socket文件描述符,address family指定为AF_UNIX,type无所谓,可以选择SOCK_DGRAM或SOCK_STREAM,protocol参数仍然指定为0即可。
UNIX Domain Socket与网络socket编程最明显的不同在于地址格式不同,用结构体sockaddr_un表示,网络编程的socket地址是IP地址加端口号,而UNIX Domain Socket的地址是一个socket类型的文件在文件系统中的路径,这个socket文件由bind()调用创建,如果调用bind()时该文件已存在,则bind()错误返回。
对比网络套接字地址结构和本地套接字地址结构: struct sockaddr_in { __kernel_sa_family_t sin_family; /* Address family */ 地址结构类型 __be16 sin_port; /* Port number */ 端口号 struct in_addr sin_addr; /* Internet address */ IP地址 }; struct sockaddr_un { __kernel_sa_family_t sun_family; /* AF_UNIX */ 地址结构类型 char sun_path[UNIX_PATH_MAX]; /* pathname */ socket文件名(含路径) }; 将UNIX Domain socket绑定到一个地址 size = offsetof(struct sockaddr_un, sun_path) + strlen(un.sun_path); # offsetof(type, member) --- ((int)&((type *)0)->MEMBER) socket(AF_UNIX, SOCK_STREAM, 0); struct sockaddr_un serv_addr; serv_addr.sun_family = AF_UNIX; strcpy(serv_addr.sun_path, "mysocket") int len = offsetof(struct sockaddr_un, sun_path) + strlen(serv_addr.sun_path); unlink("mysocket"); bind(sfd, (struct sockaddr *)&serv_addr, len); len长度说明 对于结构体: struct sockaddr_un{ sun_family; # 18bits=2B path; # 108B,并不都是有效数据 } 如果直接sizeof(sockaddr_un) = 110B 服务端: 用 offsetof(struct sockaddr_un, sun_path) = 2B,应该用函数得到 strlen(un.sun_path); # 总共是108B,实际不是。如果path是 “mysocket”,则是8B 总 len = 2+8 = 10B accept(sfd, &client_addr, &len_c); 客户端: len_c -= offsetof(); # 得到客户端socket的文件名长度 buf[len_c] = '\0';
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。