赞
踩
注:本文转自知乎一篇文章彻底搞懂websocket协议的原理与应用(二) - 知乎
WebSocket是HTML5出的东西(协议),也就是说HTTP协议没有变化,或者说没关系,但HTTP是不支持持久连接的(长连接,循环连接的不算)。
连接握手分为两个步骤:请求和应答。WebSocket利用了HTTP协议来建立连接,使用的是HTTP的协议升级机制。
一个标准的HTTP请求,格式如下:
请求头的具体格式定义参见Request-Line格式。
请求header中的字段解析:
协议升级机制
Origin
所有浏览器将会发送一个 Origin请求头。 你可以将这个请求头用于安全方面(检查是否是同一个域,白名单/ 黑名单等),如果你不喜欢这个请求发起源,你可以发送一个403 Forbidden。需要注意的是非浏览器只能发送一个模拟的 Origin。大多数应用会拒绝不含这个请求头的请求。
Sec-WebSocket-Key
由客户端随机生成的,提供基本的防护,防止恶意或者无意的连接。
返回字段解析:
Connection
可以参见 rfc7230 6.1 和 rfc7230 6.7。
Sec-WebSocket-Accept
它通过客户端发送的Sec-WebSocket-Key 计算出来。计算方法:
将 Sec-WebSocket-Key 跟 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 拼接;
通过 SHA1 计算出摘要,并转成 base64 字符串。
Sec-WebSocket-Key / Sec-WebSocket-Accept 的主要作用还是为了避免一些网络通信过程中,一些非期待的数据包,”乱入“进来,导致一些错误的响应,并不能用于实现登录认证和数据安全,这些功能还需要应用层自己实现。
WebSocket 以 frame 为单位传输数据, frame 是客户端和服务端数据传输的最小单元。当一条消息过长时, 通信方可以将该消息拆分成多个 frame 发送, 接收方收到以后重新拼接、解码从而还原出完整的消息, 在 WebSocket 中, frame 有多种类型, frame 的类型由 frame 头部的 Opcode 字段指示, WebSocket frame 的结构如下所示:
该结构的字段语义如下:
FIN, 长度为 1 比特, 该标志位用于指示当前的 frame 是消息的最后一个分段, 因为 WebSocket 支持将长消息切分为若干个 frame 发送, 切分以后, 除了最后一个 frame, 前面的 frame 的 FIN 字段都为 0, 最后一个 frame 的 FIN 字段为 1, 当然, 若消息没有分段, 那么一个 frame 便包含了完成的消息, 此时其 FIN 字段值为 1。
RSV 1 ~ 3, 这三个字段为保留字段, 只有在 WebSocket 扩展时用, 若不启用扩展, 则该三个字段应置为 1, 若接收方收到 RSV 1 ~ 3 不全为 0 的 frame, 并且双方没有协商使用 WebSocket 协议扩展, 则接收方应立即终止 WebSocket 连接。
Opcode, 长度为 4 比特, 该字段将指示 frame 的类型, RFC 6455 定义的 Opcode 共有如下几种:
以下是一个客户端和服务端相互传递文本消息的示例
其中模拟了长消息被切分为多个帧(continuation frame)的例子。
关闭相对简单,由客户端或服务端发送关闭帧,即可完成关闭。
全网最详细epoll讲解,6种epoll的设计,让你吊打面试官
需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享
WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。
1.WebSocket是什么
WebSocket 协议在2008年诞生,2011年成为国际标准。主流浏览器都已经支持。
WebSocket 是一种全新的协议。它将 TCP 的 Socket(套接字)应用在了网页上,从而使通信双方建立起一个保持在活动状态连接通道,并且属于全双工通信。
WebSocket 协议在2008年诞生,2011年成为国际标准。主流浏览器都已经支持。WebSocket 协议借用 HTTP协议 的 101 switch protocol 来达到协议转换,从HTTP协议切换WebSocket通信协议。它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话。
轮询:最早的一种实现实时 Web 应用的方案。客户端以一定的时间间隔向服务端发出请求,以频繁请求的方式来保持客户端和服务器端的通信。
长轮询:长轮询也采用轮询的方式,不过采取的是阻塞模型,客户端发起连接后,如果没消息,就一直不返回Response给客户端。直到有消息才返回,返回完之后,客户端再次建立连接,周而复始。
其他方式:如xhr-streaming、隐藏iframe、ActiveX控件、SSE。
轮询技术非真正实时技术。使用 Ajax 方式模拟实时效果,每次客户端和服务器端交互,都是一次 HTTP 的请求和应答过程,且每次的 HTTP 请求和应答都带有完整 HTTP 头信息,增加传输的数据量。需构建两个http连接。客户端和服务器端编程实现比较复杂,为模拟真实的实时效果,需构造两个 HTTP 连接来模拟客户端和服务器的双向通信,一个连接用来处理客户端到服务器端的数据传输,一个连接用来处理服务器端到客户端的数据传输,增加了编程实现的复杂度、服务器端的负载,制约了应用系统的扩展性。
BS架构下的即时通讯、游戏等应用需要客户端与服务端间的双向通信,而HTTP的请求/响应模型并不适合这种场景。会存在一定的问题:
Websocket出现使得浏览器提供socket的支持成为可能,从而在浏览器和服务器之间建立一条基于tcp的双向连接通道,web开发人员可以很方便的利用websocket构建实时web应用。WebSocket适用于以下场景:
WebSocket是基于TCP的应用层协议。需要特别注意的是:虽然WebSocket协议在建立连接时会使用HTTP协议,但这并意味着WebSocket协议是基于HTTP协议实现的。
客户端到服务端:
服务端到客户端:
客户端发起连接的约定:
客户端检查服务端的响应:
服务端处理客户端连接:
WebSocket通信流程如下:
Websocket帧格式如下:
第一部分:
FIN:1位,用于描述消息是否结束,如果为1则该消息为消息尾部,如果为零则还有后续数据包。
RSV1,RSV2,RSV3:各1位,用于扩展定义,如果没有扩展约定的情况则必须为0。
OPCODE:4位,用于表示消息接收类型,如果接收到未知的opcode,接收端必须关闭连接。
OPCODE说明:
第二部分:
第三部分:
数据掩码:如果MASK设置位0,则该部分可以省略,如果MASK设置位1,则Masking-key是一个32位的掩码。用来解码客户端发送给服务端的数据帧。
第四部分:
数据:该部分,也是最后一部分,是帧真正要发送的数据,可以是任意长度。
控制帧可能插在一个Message的多个分片之间,但一个Message的分片不能交替传输(除非有扩展特别定义)。
控制帧不可分片。
分片需要按照分送方提交顺序传递给接收方,但由于IP路由特性,实际并不能保证顺序到达。
控制帧包括:
连接关闭时状态码说明:
Stomp:
STOMP是基于帧的协议,它的前身是TTMP协议(一个简单的基于文本的协议),专为消息中间件设计。是属于消息队列的一种协议, 和AMQP, JMS平级。它的简单性恰巧可以用于定义websocket的消息体格式. STOMP协议很多MQ都已支持, 比如RabbitMq, ActiveMq。生产者(发送消息)、消息代理、消费者(订阅然后收到消息)。
2. SockJs:
SockJS是一个浏览器JavaScript库,它提供了一个类似于网络的对象。SockJS提供了一个连贯的、跨浏览器的Javascript API,它在浏览器和web服务器之间创建了一个低延迟、全双工、跨域通信通道。
SockJS的一大好处在于提供了浏览器兼容性。优先使用原生WebSocket,如果在不支持websocket的浏览器中,会自动降为轮询的方式。 除此之外,spring也对socketJS提供了支持。
3. Socket.io:
http://Socket.io实际上是WebSocket的父集,http://Socket.io封装了WebSocket和轮询等方法,会根据情况选择方法来进行通讯。
http://Sockei.io最早由Node.js实现,Node.js提供了高效的服务端运行环境,但由于Browser对HTML5的支持不一,为了兼容所有浏览器,提供实时的用户体验,并为开发者提供客户端与服务端一致的编程体验,于是http://Socket.io诞生了。Java模仿Node.js实现了Java版的http://Netty-socket.io库。
http://Socket.io将WebSocket和Polling机制以及其它的实时通信方式封装成通用的接口,并在服务端实现了这些实时机制相应代码,包括:AJAX Long Polling、Adobe Flash Socket、AJAX multipart streaming、Forever Iframem、JSONP Polling。
工程师应该是以解决问题为主的,如果不会解决问题,只会伸手,必然不会长远,有思考,才会有突破,才能高效的处理事情,所以 websocket 到底解决了什么问题呢?它存在的价值是什么?
这还是得从HTTP说起,大家应该都很熟悉这门协议,我们简单说一下它的特点:
•三次握手、四次挥手 的方式建立连接和关闭连接
•支持长连接和短连接两种连接方式
•有同源策略的限制(端口,协议,域名)
•单次 请求-响应 机制,只支持单向通信
其中最鸡肋的就是最后一个特点,单向通信,什么意思呐? 就是说只能由一方发起请求(客户端),另一方响应请求(服务端),而且每一次的请求都是一个单独的事件,请求之间还无法具有关联性,也就是说我上个请求和下个请求完全是隔离的,无法具有连续性。
也许你觉得这样的说法比较难懂,我们来举一个栗子:
每个人都打过电话吧,电话打通后可以一直聊天是不是觉得很舒服啊,这是一种全双工的通信方式,双方都可以主动传递信息。彼此的聊天也具有连续性。我们简单把这种方式理解为 websocket 协议支持的方式。
如果打电话变成了 HTTP 那种方式呢? 那就不叫打电话了,而是联通爸爸的智能语音助手了,我们知道客户端和服务端本身的身份并不是固定的,只要你可以发起通信,就可以充当客户端,能响应请求,就可以当做服务端,但是在HTTP的世界里一般来说,客户端(大多数情况下是浏览器)和服务器一般是固定的,我们打电话 去查话费,会询问要人工服务还是智能助手,如果选了助手,你只要问她问题,她就会找对应的答案来回答你(响应你),一般都是简单的业务,你不问她也不会跟你闲聊,主动才有故事啊!
但是实际上有很多的业务是需要双方都有主动性的,半双工的模式肯定是不够用的,例如聊天室,跟机器人聊天没意思啊,又例如主动推送,我无聊的时候手都不想点屏幕,你能不能主动一点给我推一些好玩的信息过来。
只要做过前后端分离的同学应该都被跨域的问题折磨过。浏览器的这种同源策略,会导致 不同端口/不同域名/不同协议 的请求会有限制,当然这问题前后端都能处理,然而 websocket 就没有这种要求,他支持任何域名或者端口的访问(协议固定了只能是 ws/wss) ,所以它让人用的更加舒服
所以,上面 HTTP 存在的这些问题,websocket 都能解决!!!
主动是 websocket 的一大特点,像之前如果客户端想知道服务端对某个事件的处理进度,就只能通过轮训( Poll )的方式去询问,十分的耗费资源,会存在十分多的无效请求。下面我简单说推送技术的三种模型区别:
pull 和 poll 的唯一区别只在于周期性,但是很明显周期性的去询问,对业务来说清晰度很高,这也是为什么很多小公司都是基于轮训的方式去处理业务,因为简单嘛,能力不够机器来撑。这也是很多公司都会面临的问题,如果业务达到了瓶颈,使劲的堆机器,如果用新技术或者更高级的作法,开发成本和维护成本也会变高,还不如简单一点去增加机器配置。
如果两个人需要通话,首先需要建立一个连接,而且必须是一个长链接,大家都不希望讲几句话就得重新打吧,根据上面说的,websocket 会复用之前 HTTP 建立好的长链接,然后再进行升级,所以他和轮训的区别大致如下所示:
图片省去了建立连接的过程,我们可以发现,基于轮训的方式,必须由客户端手动去请求,才会有响应,而基于 websocket 协议,不再需要你主动约妹子了,妹子也可以主动去约你,这才是公平的世界。
为了更好的阐述这个连接的原理,可以使用swoole 自带的 创建websocket 的功能进行测试,服务端代码如下,如果连接不上,可以看看是不是检查一下端口开放情况(iptables/filewall)和网络的连通性,代码如下:
- //创建websocket服务器对象,监听0.0.0.0:9501端口
- $ws = new Swoole\WebSocket\Server("0.0.0.0", 9501);
-
- //监听WebSocket连接打开事件
- $ws->on('open', function ($ws, $request) {
- var_dump($request->fd, $request->get, $request->server); //request 对象包含请求的相关信息
- //$ws->push($request->fd, "hello, welcome\n");
- });
-
- //监听WebSocket消息事件
- $ws->on('message', function ($ws, $frame) { // frame 是存储信息的变量,也就是传输帧
- echo "Message: {$frame->data}\n";
- $ws->push($frame->fd, "server: {$frame->data}");
- });
-
- //监听WebSocket连接关闭事件
- $ws->on('close', function ($ws, $fd) { // fd 是客户端的标志
- echo "client-{$fd} is closed\n";
- });
-
- $ws->start(); // 启动这个进程

我们可以发现,相比于 HTTP 的头部,websocket 的数据结构十分的简单小巧,没有像 HTTP 协议一样老是带着笨重的头部,这一设计让websocket的报文可以在体量上更具优势,所以传输效率来说更高 。
当然,我们传输的文本也不能在大街上裸跑啊,既然 HTTP 都有衣服穿了(HTTPS),websocket(ws) 当然也有 (wss)。
在以前的文章我们也简单聊过 HTTPS 是个什么东西,大家不了解可以去翻一下之前的文章,总的来说就是使用了非对称加密算法进行了对称加密密钥的传输,后续采用对称加密解密的方式进行数据安全处理。
如果你的业务需要支撑双全工的通信,那么 websocket 便是一个很不错的选择。网上大多数关于 websocket 的文章,大多是基于前端学习者的角度,他们使用 Chrome 的console 的调试实验,本篇文章更多是基于后端开发者的角度。希望对你有所帮助。
1.websocket优点
2.websocket缺点
协议有两个部分:handshake(握手)和 data transfer(数据传输)。
客户端握手报文是在HTTP的基础上发送一次HTTP协议升级请求。
- GET /chat HTTP/1.1
- Host: server.example.com
- Upgrade: websocket
- Connection: Upgrade
- Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
- Origin: http://example.com
- Sec-WebSocket-Protocol: chat, superchat
- Sec-WebSocket-Version: 13
Sec-WebSocket-Key 是由浏览器随机生成的,提供基本的防护,防止恶意或者无意的连接。
Sec-WebSocket-Version 表示 WebSocket 的版本,最初 WebSocket 协议太多,不同厂商都有自己的协议版本,不过现在已经定下来了。如果服务端不支持该版本,需要返回一个 Sec-WebSocket-Versionheader,里面包含服务端支持的版本号。
服务端响应握手也是在HTTP协议基础上回应一个Switching Protocols。
- HTTP/1.1 101 Switching Protocols
- Upgrade: websocket
- Connection: Upgrade
- Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
- Sec-WebSocket-Protocol: chat
Linux下对应实现代码,注释在代码中。
- int websocket_handshake(struct qsevent *ev)
- {
- char linebuf[128];
- int index = 0;
- char sec_data[128] = {0};
- char sec_accept[32] = {0};
- do
- {
- memset(linebuf, 0, sizeof(linebuf));//清空以暂存一行报文
- index = readline(ev->buffer, index, linebuf);//获取一行报文
- if(strstr(linebuf, "Sec-WebSocket-Key"))//如果一行报文里面包括了Sec-WebSocket-Key
- {
- strcat(linebuf, GUID);//和GUID连接起来
- SHA1(linebuf+WEBSOCK_KEY_LENGTH, strlen(linebuf+WEBSOCK_KEY_LENGTH), sec_data);//SHA1
- base64_encode(sec_data, strlen(sec_data), sec_accept);//base64编码
- memset(ev->buffer, 0, MAX_BUFLEN);//清空服务端数据缓冲区
-
- ev->length = sprintf(ev->buffer,//组装握手响应报文到数据缓冲区,下一步有进行下发
- "HTTP/1.1 101 Switching Protocols\r\n"
- "Upgrade: websocket\r\n"
- "Connection: Upgrade\r\n"
- "Sec-websocket-Accept: %s\r\n\r\n", sec_accept);
- break;
- }
- }while(index != -1 && (ev->buffer[index] != '\r') || (ev->buffer[index] != '\n'));//遇到空行之前
- return 0;
- }

先看数据包格式。
FIN:指示这是消息中的最后一个片段。第一个片段也可能是最后的片段。
RSV1, RSV2, RSV3:一般情况下全为 0。当客户端、服务端协商采用 WebSocket 扩展时,这三个标志位可以非0,且值的含义由扩展进行定义。如果出现非零的值,且并没有采用 WebSocket 扩展,连接出错。
opcode:操作代码。
- %x0:表示一个延续帧。当 Opcode 为 0 时,表示本次数据传输采用了数据分片,当前收到的数据帧为其中一个数据分片;
- %x1:表示这是一个文本帧(frame);
- %x2:表示这是一个二进制帧(frame);
- %x3-7:保留的操作代码,用于后续定义的非控制帧;
- %x8:表示连接断开;
- %x9:表示这是一个 ping 操作;
- %xA:表示这是一个 pong 操作;
- %xB-F:保留的操作代码,用于后续定义的控制帧。
- 表示数据载荷的长度
- x 为 0~126:数据的长度为 x 字节;
- x 为 126:后续 2 个字节代表一个 16 位的无符号整数,该无符号整数的值为数据的长度;
- x 为 127:后续 8 个字节代表一个 64 位的无符号整数(最高位为 0),该无符号整数的值为数据的长度。
payload data:消息体。
下面是服务端的代码实现:
- #define GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
- enum
- {
- WS_HANDSHAKE = 0, //握手
- WS_TANSMISSION = 1, //通信
- WS_END = 2, //end
- };
-
- typedef struct _ws_ophdr{
- unsigned char opcode:4,
- rsv3:1,
- rsv2:1,
- rsv1:1,
- fin:1;
- unsigned char pl_len:7,
- mask:1;
- }ws_ophdr;//协议前两个字节
-
- typedef struct _ws_head_126{
- unsigned short payload_lenght;
- char mask_key[4];
- }ws_head_126;//协议mask和消息体长度
-
-
- /*解码*/
- void websocket_umask(char *payload, int length, char *mask_key)
- {
- int i = 0;
- for( ; i<length; i++)
- payload[i] ^= mask_key[i%4];//异或
- }
-
- int websocket_transmission(struct qsevent *ev)
- {
- ws_ophdr *ophdr = (ws_ophdr*)ev->buffer;//协议前两个自己
- printf("ws_recv_data length=%d\n", ophdr->pl_len);
- if(ophdr->pl_len <126)//如果消息体长度小于126
- {
- char * payload = ev->buffer + sizeof(ws_ophdr) + 4;//获取消息地址
- if(ophdr->mask)//如果消息是掩码
- {
- websocket_umask(payload, ophdr->pl_len, ev->buffer+2);//解码,异或
- printf("payload:%s\n", payload);
- }
- printf("payload : %s\n", payload);//消息回显
- }
- else if (hdr->pl_len == 126) {
- ws_head_126 *hdr126 = ev->buffer + sizeof(ws_ophdr);
- } else {
- ws_head_127 *hdr127 = ev->buffer + sizeof(ws_ophdr);
- }
- return 0;
- }
-
- int websocket_request(struct qsevent *ev)
- {
- if(ev->status_machine == WS_HANDSHAKE)
- {
- websocket_handshake(ev);//握手
- ev->status_machine = WS_TANSMISSION;//设置标志位
- }else if(ev->status_machine == WS_TANSMISSION){
- websocket_transmission(ev);//通信
- }
- return 0;
- }

代码是基于reactor百万并发服务器框架实现的。
1.客户端结构体
- struct qsevent{
- int fd; //clientfd
- int events; //事件:读、写或异常
- int status; //是否位于epfd红黑监听树上
- void *arg; //参数
- long last_active; //上次数据收发的事件
-
- int (*callback)(int fd, int event, void *arg); //回调函数,单回调,后面修改成多回调
- unsigned char buffer[MAX_BUFLEN]; //数据缓冲区
- int length; //数据长度
-
- /*http param*/
- int method; //http协议请求头部
- char resource[MAX_BUFLEN]; //请求的资源
- int ret_code; //响应状态码
- };

2.int http_response(struct qsevent *ev)
当客户端发送tcp连接时,服务端的listenfd会触发输入事件会调用ev->callback即accept_cb回调函数响应连接并获得clientfd,连接之后,http数据报文发送上来,服务端的clientfd触发输入事件会调用ev->callback即recv_cb回调函数进行数据接收,并解析http报文。
- int http_request(struct qsevent *ev)
- {
- char linebuf[1024] = {0};//用于从buffer中获取每一行的请求报文
- int idx = readline(ev->buffer, 0, linebuf);//读取第一行请求方法,readline函数,后面介绍
- if(strstr(linebuf, "GET"))//strstr判断是否存在GET请求方法
- {
- ev->method = HTTP_METHOD_GET;//GET方法表示客户端需要获取资源
-
- int i = 0;
- while(linebuf[sizeof("GET ") + i] != ' ')i++;//跳过空格
- linebuf[sizeof("GET ") + i] = '\0';
- sprintf(ev->resource, "./%s/%s", HTTP_METHOD_ROOT, linebuf+sizeof("GET "));//将资源的名字以文件路径形式存储在ev->resource中
- printf("resource:%s\n", ev->resource);//回显
- }
- else if(strstr(linebuf, "POST"))//POST的请求方法,暂时没写,方法差不多
- {}
- return 0;
- }

服务器对客户端的响应报文数据进行http封装储存在buffer中,事件触发时在send_cb回调函数发送给客户端。详细解释请看代码注释。
- int http_response(struct qsevent *ev)
- {
- if(ev == NULL)return -1;
- memset(ev->buffer, 0, MAX_BUFLEN);//清空缓冲区准备储存报文
-
- printf("resource:%s\n", ev->resource);//resource:客户端请求的资源文件,通过http_reques函数获取
- int filefd = open(ev->resource, O_RDONLY);//只读方式打开获得文件句柄
- if(filefd == -1)//获取失败则发送404 NOT FOUND
- {
- ev->ret_code = 404;//404状态码
- ev->length = sprintf(ev->buffer,//将下面数据传入ev->buffer
- /***状态行***/
- /*版本号 状态码 状态码描述 */
- "HTTP/1.1 404 NOT FOUND\r\n"
- /***消息报头***/
- /*获取当前时间*/
- "date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
- /*响应正文类型; 编码方式*/
- "Content-Type: text/html;charset=ISO-8859-1\r\n"
- /*响应正文长度 空行*/
- "Content-Length: 85\r\n\r\n"
- /***响应正文***/
- "<html><head><title>404 Not Found</title></head><body><H1>404</H1></body></html>\r\n\r\n");
- }
- else
- {
- struct stat stat_buf; //文件信息
- fstat(filefd, &stat_buf); //fstat通过文件句柄获取文件信息
- if(S_ISDIR(stat_buf.st_mode)) //如果文件是一个目录
- {
-
- printf(ev->buffer, //同上,将404放入buffer中
- "HTTP/1.1 404 Not Found\r\n"
- "Date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
- "Content-Type: text/html;charset=ISO-8859-1\r\n"
- "Content-Length: 85\r\n\r\n"
- "<html><head><title>404 Not Found</title></head><body><H1>404</H1></body></html>\r\n\r\n" );
-
- }
- else if (S_ISREG(stat_buf.st_mode)) //如果文件是存在
- {
-
- ev->ret_code = 200; //200状态码
-
- ev->length = sprintf(ev->buffer, //length记录长度,buffer储存响应报文
- "HTTP/1.1 200 OK\r\n"
- "Date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
- "Content-Type: text/html;charset=ISO-8859-1\r\n"
- "Content-Length: %ld\r\n\r\n",
- stat_buf.st_size );//文件长度储存在stat_buf.st_size中
-
- }
- return ev->length;//返回报文长度
- }
- }

4.总代码
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/socket.h>
- #include <sys/epoll.h>
- #include <arpa/inet.h>
-
- #include <fcntl.h>
- #include <unistd.h>
- #include <errno.h>
- #include <time.h>
-
- #include <sys/stat.h>
- #include <sys/sendfile.h>
-
- #define HTTP_METHOD_ROOT "html"
-
- #define MAX_BUFLEN 4096
- #define MAX_EPOLLSIZE 1024
- #define MAX_EPOLL_EVENTS 1024
-
- #define HTTP_METHOD_GET 0
- #define HTTP_METHOD_POST 1
-
- typedef int (*NCALLBACK)(int, int, void*);
-
- struct qsevent{
- int fd;
- int events;
- int status;
- void *arg;
- long last_active;
-
- int (*callback)(int fd, int event, void *arg);
- unsigned char buffer[MAX_BUFLEN];
- int length;
-
- /*http param*/
- int method;
- char resource[MAX_BUFLEN];
- int ret_code;
- };
-
- struct qseventblock{
- struct qsevent *eventsarrry;
- struct qseventblock *next;
- };
-
- struct qsreactor{
- int epfd;
- int blkcnt;
- struct qseventblock *evblk;
- };
-
- int recv_cb(int fd, int events, void *arg);
- int send_cb(int fd, int events, void *arg);
- struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd);
-
- int readline(char *allbuf, int idx, char *linebuf)
- {
- int len = strlen(allbuf);
- for( ; idx<len; idx++ )
- {
- if(allbuf[idx] == '\r' && allbuf[idx+1] == '\n')
- return idx+2;
- else
- *(linebuf++) = allbuf[idx];
- }
- return -1;
- }
-
- void qs_event_set(struct qsevent *ev, int fd, NCALLBACK callback, void *arg)
- {
- ev->events = 0;
- ev->fd = fd;
- ev->arg = arg;
- ev->callback = callback;
- ev->last_active = time(NULL);
- return;
- }
-
- int qs_event_add(int epfd, int events, struct qsevent *ev)
- {
- struct epoll_event epv = {0, {0}};;
- epv.events = ev->events = events;
- epv.data.ptr = ev;
-
- if(ev->status == 1)
- {
- if(epoll_ctl(epfd, EPOLL_CTL_MOD, ev->fd, &epv) < 0)
- {
- perror("EPOLL_CTL_MOD error\n");
- return -1;
- }
- }
- else if(ev->status == 0)
- {
- if(epoll_ctl(epfd, EPOLL_CTL_ADD, ev->fd, &epv) < 0)
- {
- perror("EPOLL_CTL_ADD error\n");
- return -2;
- }
- ev->status = 1;
- }
- return 0;
- }
-
- int qs_event_del(int epfd, struct qsevent *ev)
- {
- struct epoll_event epv = {0, {0}};
- if(ev->status != 1)
- return -1;
- ev->status = 0;
- epv.data.ptr = ev;
- if((epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &epv)))
- {
- perror("EPOLL_CTL_DEL error\n");
- return -1;
- }
- return 0;
- }
-
- int sock(short port)
- {
- int fd = socket(AF_INET, SOCK_STREAM, 0);
- fcntl(fd, F_SETFL, O_NONBLOCK);
-
- struct sockaddr_in ser_addr;
- memset(&ser_addr, 0, sizeof(ser_addr));
- ser_addr.sin_addr.s_addr = htonl(INADDR_ANY);
- ser_addr.sin_family = AF_INET;
- ser_addr.sin_port = htons(port);
-
- bind(fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
-
- if(listen(fd, 20) < 0)
- perror("listen error\n");
-
- printf("listener[%d] lstening..\n", fd);
- return fd;
- }
-
- int http_request(struct qsevent *ev)
- {
- char linebuf[1024] = {0};
- int idx = readline(ev->buffer, 0, linebuf);
- if(strstr(linebuf, "GET"))
- {
- ev->method = HTTP_METHOD_GET;
-
- int i = 0;
- while(linebuf[sizeof("GET ") + i] != ' ')i++;
- linebuf[sizeof("GET ") + i] = '\0';
- sprintf(ev->resource, "./%s/%s", HTTP_METHOD_ROOT, linebuf+sizeof("GET "));
- printf("resource:%s\n", ev->resource);
- }
- else if(strstr(linebuf, "POST"))
- {}
- return 0;
- }
-
- int http_response(struct qsevent *ev)
- {
- if(ev == NULL)return -1;
- memset(ev->buffer, 0, MAX_BUFLEN);
-
- printf("resource:%s\n", ev->resource);
-
- int filefd = open(ev->resource, O_RDONLY);
- if(filefd == -1)
- {
- ev->ret_code = 404;
- ev->length = sprintf(ev->buffer,
- "HTTP/1.1 404 NOT FOUND\r\n"
- "date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
- "Content-Type: text/html;charset=ISO-8859-1\r\n"
- "Content-Length: 85\r\n\r\n"
- "<html><head><title>404 Not Found</title></head><body><H1>404</H1></body></html>\r\n\r\n");
- }
- else
- {
- struct stat stat_buf;
- fstat(filefd, &stat_buf);
- if(S_ISDIR(stat_buf.st_mode))
- {
-
- printf(ev->buffer,
- "HTTP/1.1 404 Not Found\r\n"
- "Date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
- "Content-Type: text/html;charset=ISO-8859-1\r\n"
- "Content-Length: 85\r\n\r\n"
- "<html><head><title>404 Not Found</title></head><body><H1>404</H1></body></html>\r\n\r\n" );
-
- }
- else if (S_ISREG(stat_buf.st_mode))
- {
-
- ev->ret_code = 200;
-
- ev->length = sprintf(ev->buffer,
- "HTTP/1.1 200 OK\r\n"
- "Date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
- "Content-Type: text/html;charset=ISO-8859-1\r\n"
- "Content-Length: %ld\r\n\r\n",
- stat_buf.st_size );
-
- }
- return ev->length;
- }
- }
-
- int qsreactor_init(struct qsreactor *reactor)
- {
- if(reactor == NULL)
- return -1;
- memset(reactor, 0, sizeof(struct qsreactor));
- reactor->epfd = epoll_create(1);
- if(reactor->epfd <= 0)
- {
- perror("epoll_create error\n");
- return -1;
- }
- struct qseventblock *block = (struct qseventblock*)malloc(sizeof(struct qseventblock));
- if(block == NULL)
- {
- printf("blockinit malloc error\n");
- close(reactor->epfd);
- return -2;
- }
- memset(block, 0, sizeof(block));
-
- struct qsevent *evs = (struct qsevent*)malloc(MAX_EPOLLSIZE * sizeof(struct qsevent));
- if(evs == NULL)
- {
- printf("evsnit malloc error\n");
- close(reactor->epfd);
- return -3;
- }
- memset(evs, 0, sizeof(evs));
-
- block->next = NULL;
- block->eventsarrry = evs;
-
- reactor->blkcnt = 1;
- reactor->evblk = block;
- return 0;
- }
-
- int qsreactor_alloc(struct qsreactor *reactor)
- {
- if(reactor == NULL)return -1;
- if(reactor->evblk == NULL)return -1;
- struct qseventblock *tailblock = reactor->evblk;
- while(tailblock->next != NULL)
- tailblock = tailblock->next;
- struct qseventblock *newblock = (struct qseventblock*)malloc(sizeof(struct qseventblock));
- if(newblock == NULL)
- {
- printf("newblock alloc error\n");
- return -1;
- }
- memset(newblock, 0, sizeof(newblock));
-
- struct qsevent *neweventarray = (struct qsevent*)malloc(sizeof(struct qsevent) * MAX_EPOLLSIZE);
- if(neweventarray == NULL)
- {
- printf("neweventarray malloc error\n");
- return -1;
- }
- memset(neweventarray, 0, sizeof(neweventarray));
-
- newblock->eventsarrry = neweventarray;
- newblock->next = NULL;
-
- tailblock->next = newblock;
- reactor->blkcnt++;
-
- return 0;
- }
-
- struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd)
- {
- int index = sockfd / MAX_EPOLLSIZE;
- while(index >= reactor->blkcnt)qsreactor_alloc(reactor);
- int i=0;
- struct qseventblock *idxblock = reactor->evblk;
- while(i++<index && idxblock != NULL)
- idxblock = idxblock->next;
-
- return &idxblock->eventsarrry[sockfd%MAX_EPOLLSIZE];
- }
-
- int qsreactor_destory(struct qsreactor *reactor)
- {
- close(reactor->epfd);
- free(reactor->evblk);
- reactor = NULL;
- return 0;
- }
-
- int qsreactor_addlistener(struct qsreactor *reactor, int sockfd, NCALLBACK acceptor)
- {
- if(reactor == NULL)return -1;
- if(reactor->evblk == NULL)return -1;
-
- struct qsevent *event = qsreactor_idx(reactor, sockfd);
- qs_event_set(event, sockfd, acceptor, reactor);
- qs_event_add(reactor->epfd, EPOLLIN, event);
-
- return 0;
- }
-
- int send_cb(int fd, int events, void *arg)
- {
- struct qsreactor *reactor = (struct qsreactor*)arg;
- struct qsevent *ev = qsreactor_idx(reactor, fd);
-
- http_response(ev);
- int ret = send(fd, ev->buffer, ev->length, 0);
- if(ret < 0)
- {
- qs_event_del(reactor->epfd, ev);
- printf("clent[%d] ", fd);
- perror("send error\n");
- close(fd);
- }
- else if(ret > 0)
- {
- if(ev->ret_code == 200)
- {
- int filefd = open(ev->resource, O_RDONLY);
- struct stat stat_buf;
- fstat(filefd, &stat_buf);
-
- sendfile(fd, filefd, NULL, stat_buf.st_size);
- close(filefd);
- }
- printf("send to client[%d]:%s", fd, ev->buffer);
- qs_event_del(reactor->epfd, ev);
- qs_event_set(ev, fd, recv_cb, reactor);
- qs_event_add(reactor->epfd, EPOLLIN, ev);
- }
- return ret;
- }
-
- int recv_cb(int fd, int events, void *arg)
- {
- struct qsreactor *reactor = (struct qsreactor*)arg;
- struct qsevent *ev = qsreactor_idx(reactor, fd);
-
- int len = recv(fd, ev->buffer, MAX_BUFLEN, 0);
- qs_event_del(reactor->epfd, ev);
- if(len > 0)
- {
- ev->length = len;
- ev->buffer[len] = '\0';
-
- printf("client[%d]:%s", fd, ev->buffer);
-
- http_request(ev);
-
- qs_event_del(reactor->epfd, ev);
- qs_event_set(ev, fd, send_cb, reactor);
- qs_event_add(reactor->epfd, EPOLLOUT, ev);
- }
- else if(len == 0)
- {
- qs_event_del(reactor->epfd, ev);
- close(fd);
- printf("client[%d] close\n", fd);
- }
- else
- {
- qs_event_del(reactor->epfd, ev);
- printf("client[%d]", fd);
- perror("reacv error,\n");
- close(fd);
- }
- return 0;
- }
-
- int accept_cb(int fd, int events, void *arg)
- {
- struct qsreactor *reactor = (struct qsreactor*)arg;
- if(reactor == NULL)return -1;
-
- struct sockaddr_in client_addr;
- socklen_t len = sizeof(client_addr);
-
- int clientfd;
-
-
- if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1)
- {
- if(errno != EAGAIN && errno != EINTR)
- {}
- perror("accept error\n");
- return -1;
- }
-
- int flag = 0;
- if((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0)
- {
- printf("fcntl noblock error, %d\n",MAX_BUFLEN);
- return -1;
- }
- struct qsevent *event = qsreactor_idx(reactor, clientfd);
-
- qs_event_set(event, clientfd, recv_cb, reactor);
- qs_event_add(reactor->epfd, EPOLLIN, event);
-
- printf("new connect [%s:%d], pos[%d]\n",
- inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
-
- return 0;
- }
-
- int qsreactor_run(struct qsreactor *reactor)
- {
- if(reactor == NULL)
- return -1;
- if(reactor->evblk == NULL)
- return -1;
- if(reactor->epfd < 0)
- return -1;
-
- struct epoll_event events[MAX_EPOLL_EVENTS + 1];
- while(1)
- {
- int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
-
- if(nready < 0)
- {
- printf("epoll_wait error\n");
- continue;
- }
- for(int i=0; i<nready; i++)
- {
- struct qsevent *ev = (struct qsevent*)events[i].data.ptr;
- if((events[i].events & EPOLLIN) && (ev->events & EPOLLIN))
- {
- ev->callback(ev->fd, events[i].events, ev->arg);
- }
- if((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
- {
- ev->callback(ev->fd, events[i].events, ev->arg);
- }
- }
- }
- }
-
- int main(int argc, char **argv)
- {
- unsigned short port = atoi(argv[1]);
-
- int sockfd = sock(port);
-
-
- struct qsreactor *reactor = (struct qsreactor*)malloc(sizeof(struct qsreactor));
- qsreactor_init(reactor);
-
- qsreactor_addlistener(reactor, sockfd, accept_cb);
- qsreactor_run(reactor);
-
- qsreactor_destory(reactor);
- close(sockfd);
- }

5.epoll反应堆模型下实现websocket协议
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/socket.h>
- #include <sys/epoll.h>
- #include <arpa/inet.h>
-
- #include <fcntl.h>
- #include <unistd.h>
- #include <errno.h>
- #include <time.h>
-
- #include <sys/stat.h>
- #include <sys/sendfile.h>
-
- #include <openssl/sha.h>
- #include <openssl/pem.h>
- #include <openssl/bio.h>
- #include <openssl/evp.h>
-
- #define MAX_BUFLEN 4096
- #define MAX_EPOLLSIZE 1024
- #define MAX_EPOLL_EVENTS 1024
-
- #define GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
-
- enum
- {
- WS_HANDSHAKE = 0,
- WS_TANSMISSION = 1,
- WS_END = 2,
- };
-
- typedef struct _ws_ophdr{
- unsigned char opcode:4,
- rsv3:1,
- rsv2:1,
- rsv1:1,
- fin:1;
- unsigned char pl_len:7,
- mask:1;
- }ws_ophdr;
-
- typedef struct _ws_head_126{
- unsigned short payload_lenght;
- char mask_key[4];
- }ws_head_126;
-
- typedef struct _ws_head_127{
- long long payload_lenght;
- char mask_key[4];
- }ws_head_127;
-
- typedef int (*NCALLBACK)(int, int, void*);
-
- struct qsevent{
- int fd;
- int events;
- int status;
- void *arg;
- long last_active;
-
- int (*callback)(int fd, int event, void *arg);
- unsigned char buffer[MAX_BUFLEN];
- int length;
-
- /*websocket param*/
- int status_machine;
- };
-
- struct qseventblock{
- struct qsevent *eventsarrry;
- struct qseventblock *next;
- };
-
- struct qsreactor{
- int epfd;
- int blkcnt;
- struct qseventblock *evblk;
- };
-
- int recv_cb(int fd, int events, void *arg);
- int send_cb(int fd, int events, void *arg);
- struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd);
-
- int readline(char *allbuf, int idx, char *linebuf)
- {
- int len = strlen(allbuf);
-
- for(;idx < len;idx ++) {
- if (allbuf[idx] == '\r' && allbuf[idx+1] == '\n') {
- return idx+2;
- } else {
- *(linebuf++) = allbuf[idx];
- }
- }
- return -1;
- }
-
- int base64_encode(char *in_str, int in_len, char *out_str)
- {
- BIO *b64, *bio;
- BUF_MEM *bptr = NULL;
- size_t size = 0;
-
- if (in_str == NULL || out_str == NULL)
- return -1;
-
- b64 = BIO_new(BIO_f_base64());
- bio = BIO_new(BIO_s_mem());
- bio = BIO_push(b64, bio);
-
- BIO_write(bio, in_str, in_len);
- BIO_flush(bio);
-
- BIO_get_mem_ptr(bio, &bptr);
- memcpy(out_str, bptr->data, bptr->length);
- out_str[bptr->length-1] = '\0';
- size = bptr->length;
-
- BIO_free_all(bio);
- return size;
- }
-
- #define WEBSOCK_KEY_LENGTH 19
-
- int websocket_handshake(struct qsevent *ev)
- {
- char linebuf[128];
- int index = 0;
- char sec_data[128] = {0};
- char sec_accept[32] = {0};
- do
- {
- memset(linebuf, 0, sizeof(linebuf));
- index = readline(ev->buffer, index, linebuf);
- if(strstr(linebuf, "Sec-WebSocket-Key"))
- {
- strcat(linebuf, GUID);
- SHA1(linebuf+WEBSOCK_KEY_LENGTH, strlen(linebuf+WEBSOCK_KEY_LENGTH), sec_data);
- base64_encode(sec_data, strlen(sec_data), sec_accept);
- memset(ev->buffer, 0, MAX_BUFLEN);
-
- ev->length = sprintf(ev->buffer,
- "HTTP/1.1 101 Switching Protocols\r\n"
- "Upgrade: websocket\r\n"
- "Connection: Upgrade\r\n"
- "Sec-websocket-Accept: %s\r\n\r\n", sec_accept);
- break;
- }
- }while(index != -1 && (ev->buffer[index] != '\r') || (ev->buffer[index] != '\n'));
- return 0;
- }
-
- void websocket_umask(char *payload, int length, char *mask_key)
- {
- int i = 0;
- for( ; i<length; i++)
- payload[i] ^= mask_key[i%4];
- }
-
- int websocket_transmission(struct qsevent *ev)
- {
- ws_ophdr *ophdr = (ws_ophdr*)ev->buffer;
- printf("ws_recv_data length=%d\n", ophdr->pl_len);
- if(ophdr->pl_len <126)
- {
- char * payload = ev->buffer + sizeof(ws_ophdr) + 4;
- if(ophdr->mask)
- {
- websocket_umask(payload, ophdr->pl_len, ev->buffer+2);
- printf("payload:%s\n", payload);
- }
- memset(ev->buffer, 0, ev->length);
- strcpy(ev->buffer, "00ok");
- }
- return 0;
- }
-
- int websocket_request(struct qsevent *ev)
- {
- if(ev->status_machine == WS_HANDSHAKE)
- {
- websocket_handshake(ev);
- ev->status_machine = WS_TANSMISSION;
- }else if(ev->status_machine == WS_TANSMISSION){
- websocket_transmission(ev);
- }
- return 0;
- }
- void qs_event_set(struct qsevent *ev, int fd, NCALLBACK callback, void *arg)
- {
- ev->events = 0;
- ev->fd = fd;
- ev->arg = arg;
- ev->callback = callback;
- ev->last_active = time(NULL);
- return;
- }
-
- int qs_event_add(int epfd, int events, struct qsevent *ev)
- {
- struct epoll_event epv = {0, {0}};;
- epv.events = ev->events = events;
- epv.data.ptr = ev;
-
- if(ev->status == 1)
- {
- if(epoll_ctl(epfd, EPOLL_CTL_MOD, ev->fd, &epv) < 0)
- {
- perror("EPOLL_CTL_MOD error\n");
- return -1;
- }
- }
- else if(ev->status == 0)
- {
- if(epoll_ctl(epfd, EPOLL_CTL_ADD, ev->fd, &epv) < 0)
- {
- perror("EPOLL_CTL_ADD error\n");
- return -2;
- }
- ev->status = 1;
- }
- return 0;
- }
-
- int qs_event_del(int epfd, struct qsevent *ev)
- {
- struct epoll_event epv = {0, {0}};
- if(ev->status != 1)
- return -1;
- ev->status = 0;
- epv.data.ptr = ev;
- if((epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &epv)))
- {
- perror("EPOLL_CTL_DEL error\n");
- return -1;
- }
- return 0;
- }
-
- int sock(short port)
- {
- int fd = socket(AF_INET, SOCK_STREAM, 0);
- fcntl(fd, F_SETFL, O_NONBLOCK);
-
- struct sockaddr_in ser_addr;
- memset(&ser_addr, 0, sizeof(ser_addr));
- ser_addr.sin_addr.s_addr = htonl(INADDR_ANY);
- ser_addr.sin_family = AF_INET;
- ser_addr.sin_port = htons(port);
-
- bind(fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
-
- if(listen(fd, 20) < 0)
- perror("listen error\n");
-
- printf("listener[%d] lstening..\n", fd);
- return fd;
- }
-
- int qsreactor_init(struct qsreactor *reactor)
- {
- if(reactor == NULL)
- return -1;
- memset(reactor, 0, sizeof(struct qsreactor));
- reactor->epfd = epoll_create(1);
- if(reactor->epfd <= 0)
- {
- perror("epoll_create error\n");
- return -1;
- }
- struct qseventblock *block = (struct qseventblock*)malloc(sizeof(struct qseventblock));
- if(block == NULL)
- {
- printf("blockinit malloc error\n");
- close(reactor->epfd);
- return -2;
- }
- memset(block, 0, sizeof(block));
-
- struct qsevent *evs = (struct qsevent*)malloc(MAX_EPOLLSIZE * sizeof(struct qsevent));
- if(evs == NULL)
- {
- printf("evsnit malloc error\n");
- close(reactor->epfd);
- return -3;
- }
- memset(evs, 0, sizeof(evs));
-
- block->next = NULL;
- block->eventsarrry = evs;
-
- reactor->blkcnt = 1;
- reactor->evblk = block;
- return 0;
- }
-
- int qsreactor_alloc(struct qsreactor *reactor)
- {
- if(reactor == NULL)return -1;
- if(reactor->evblk == NULL)return -1;
- struct qseventblock *tailblock = reactor->evblk;
- while(tailblock->next != NULL)
- tailblock = tailblock->next;
- struct qseventblock *newblock = (struct qseventblock*)malloc(sizeof(struct qseventblock));
- if(newblock == NULL)
- {
- printf("newblock alloc error\n");
- return -1;
- }
- memset(newblock, 0, sizeof(newblock));
-
- struct qsevent *neweventarray = (struct qsevent*)malloc(sizeof(struct qsevent) * MAX_EPOLLSIZE);
- if(neweventarray == NULL)
- {
- printf("neweventarray malloc error\n");
- return -1;
- }
- memset(neweventarray, 0, sizeof(neweventarray));
-
- newblock->eventsarrry = neweventarray;
- newblock->next = NULL;
-
- tailblock->next = newblock;
- reactor->blkcnt++;
-
- return 0;
- }
-
- struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd)
- {
- int index = sockfd / MAX_EPOLLSIZE;
- while(index >= reactor->blkcnt)qsreactor_alloc(reactor);
- int i=0;
- struct qseventblock *idxblock = reactor->evblk;
- while(i++<index && idxblock != NULL)
- idxblock = idxblock->next;
-
- return &idxblock->eventsarrry[sockfd%MAX_EPOLLSIZE];
- }
-
- int qsreactor_destory(struct qsreactor *reactor)
- {
- close(reactor->epfd);
- free(reactor->evblk);
- reactor = NULL;
- return 0;
- }
-
- int qsreactor_addlistener(struct qsreactor *reactor, int sockfd, NCALLBACK acceptor)
- {
- if(reactor == NULL)return -1;
- if(reactor->evblk == NULL)return -1;
-
- struct qsevent *event = qsreactor_idx(reactor, sockfd);
- qs_event_set(event, sockfd, acceptor, reactor);
- qs_event_add(reactor->epfd, EPOLLIN, event);
-
- return 0;
- }
-
- int send_cb(int fd, int events, void *arg)
- {
- struct qsreactor *reactor = (struct qsreactor*)arg;
- struct qsevent *ev = qsreactor_idx(reactor, fd);
-
- int ret = send(fd, ev->buffer, ev->length, 0);
- if(ret < 0)
- {
- qs_event_del(reactor->epfd, ev);
- printf("clent[%d] ", fd);
- perror("send error\n");
- close(fd);
- }
- else if(ret > 0)
- {
- printf("send to client[%d]:\n%s\n", fd, ev->buffer);
- qs_event_del(reactor->epfd, ev);
- qs_event_set(ev, fd, recv_cb, reactor);
- qs_event_add(reactor->epfd, EPOLLIN, ev);
- }
- return ret;
- }
-
- int recv_cb(int fd, int events, void *arg)
- {
- struct qsreactor *reactor = (struct qsreactor*)arg;
- struct qsevent *ev = qsreactor_idx(reactor, fd);
-
- int len = recv(fd, ev->buffer, MAX_BUFLEN, 0);
- qs_event_del(reactor->epfd, ev);
- if(len > 0)
- {
- ev->length = len;
- ev->buffer[len] = '\0';
-
- printf("client[%d]:\n%s\n", fd, ev->buffer);
-
- websocket_request(ev);
-
- qs_event_del(reactor->epfd, ev);
- qs_event_set(ev, fd, send_cb, reactor);
- qs_event_add(reactor->epfd, EPOLLOUT, ev);
- }
- else if(len == 0)
- {
- qs_event_del(reactor->epfd, ev);
- close(fd);
- printf("client[%d] close\n", fd);
- }
- else
- {
- qs_event_del(reactor->epfd, ev);
- printf("client[%d]", fd);
- perror("reacv error,\n");
- close(fd);
- }
- return 0;
- }
-
- int accept_cb(int fd, int events, void *arg)
- {
- struct qsreactor *reactor = (struct qsreactor*)arg;
- if(reactor == NULL)return -1;
-
- struct sockaddr_in client_addr;
- socklen_t len = sizeof(client_addr);
-
- int clientfd;
-
-
- if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1)
- {
- if(errno != EAGAIN && errno != EINTR)
- {}
- perror("accept error\n");
- return -1;
- }
-
- int flag = 0;
- if((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0)
- {
- printf("fcntl noblock error, %d\n",MAX_BUFLEN);
- return -1;
- }
- struct qsevent *event = qsreactor_idx(reactor, clientfd);
-
- event->status_machine = WS_HANDSHAKE;
-
- qs_event_set(event, clientfd, recv_cb, reactor);
- qs_event_add(reactor->epfd, EPOLLIN, event);
-
- printf("new connect [%s:%d], pos[%d]\n",
- inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
-
- return 0;
- }
-
- int qsreactor_run(struct qsreactor *reactor)
- {
- if(reactor == NULL)
- return -1;
- if(reactor->evblk == NULL)
- return -1;
- if(reactor->epfd < 0)
- return -1;
-
- struct epoll_event events[MAX_EPOLL_EVENTS + 1];
- while(1)
- {
- int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
-
- if(nready < 0)
- {
- printf("epoll_wait error\n");
- continue;
- }
- for(int i=0; i<nready; i++)
- {
- struct qsevent *ev = (struct qsevent*)events[i].data.ptr;
- if((events[i].events & EPOLLIN) && (ev->events & EPOLLIN))
- {
- ev->callback(ev->fd, events[i].events, ev->arg);
- }
- if((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
- {
- ev->callback(ev->fd, events[i].events, ev->arg);
- }
- }
- }
- }
-
- int main(int argc, char **argv)
- {
- unsigned short port = atoi(argv[1]);
-
- int sockfd = sock(port);
-
-
- struct qsreactor *reactor = (struct qsreactor*)malloc(sizeof(struct qsreactor));
- qsreactor_init(reactor);
-
- qsreactor_addlistener(reactor, sockfd, accept_cb);
- qsreactor_run(reactor);
-
- qsreactor_destory(reactor);
- close(sockfd);
- }

6.C1000K reactor模型,epoll实现,连接并回发一段数据,测试正常
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/socket.h>
- #include <sys/epoll.h>
- #include <arpa/inet.h>
-
- #include <fcntl.h>
- #include <unistd.h>
- #include <errno.h>
- #include <time.h>
-
- #include <sys/stat.h>
- #include <sys/sendfile.h>
-
- #define MAX_BUFLEN 4096
- #define MAX_EPOLLSIZE 1024
- #define MAX_EPOLL_EVENTS 1024
-
- typedef int (*NCALLBACK)(int, int, void*);
-
- struct qsevent{
- int fd;
- int events;
- int status;
- void *arg;
- long last_active;
-
- int (*callback)(int fd, int event, void *arg);
- unsigned char buffer[MAX_BUFLEN];
- int length;
- };
-
- struct qseventblock{
- struct qsevent *eventsarrry;
- struct qseventblock *next;
- };
-
- struct qsreactor{
- int epfd;
- int blkcnt;
- struct qseventblock *evblk;
- };
-
- int recv_cb(int fd, int events, void *arg);
- int send_cb(int fd, int events, void *arg);
- struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd);
-
- void qs_event_set(struct qsevent *ev, int fd, NCALLBACK callback, void *arg)
- {
- ev->events = 0;
- ev->fd = fd;
- ev->arg = arg;
- ev->callback = callback;
- ev->last_active = time(NULL);
- return;
- }
-
- int qs_event_add(int epfd, int events, struct qsevent *ev)
- {
- struct epoll_event epv = {0, {0}};;
- epv.events = ev->events = events;
- epv.data.ptr = ev;
-
- if(ev->status == 1)
- {
- if(epoll_ctl(epfd, EPOLL_CTL_MOD, ev->fd, &epv) < 0)
- {
- perror("EPOLL_CTL_MOD error\n");
- return -1;
- }
- }
- else if(ev->status == 0)
- {
- if(epoll_ctl(epfd, EPOLL_CTL_ADD, ev->fd, &epv) < 0)
- {
- perror("EPOLL_CTL_ADD error\n");
- return -2;
- }
- ev->status = 1;
- }
- return 0;
- }
-
- int qs_event_del(int epfd, struct qsevent *ev)
- {
- struct epoll_event epv = {0, {0}};
- if(ev->status != 1)
- return -1;
- ev->status = 0;
- epv.data.ptr = ev;
- if((epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &epv)))
- {
- perror("EPOLL_CTL_DEL error\n");
- return -1;
- }
- return 0;
- }
-
- int sock(short port)
- {
- int fd = socket(AF_INET, SOCK_STREAM, 0);
- fcntl(fd, F_SETFL, O_NONBLOCK);
-
- struct sockaddr_in ser_addr;
- memset(&ser_addr, 0, sizeof(ser_addr));
- ser_addr.sin_addr.s_addr = htonl(INADDR_ANY);
- ser_addr.sin_family = AF_INET;
- ser_addr.sin_port = htons(port);
-
- bind(fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
-
- if(listen(fd, 20) < 0)
- perror("listen error\n");
-
- printf("listener[%d] lstening..\n", fd);
- return fd;
- }
-
- int qsreactor_init(struct qsreactor *reactor)
- {
- if(reactor == NULL)
- return -1;
- memset(reactor, 0, sizeof(struct qsreactor));
- reactor->epfd = epoll_create(1);
- if(reactor->epfd <= 0)
- {
- perror("epoll_create error\n");
- return -1;
- }
- struct qseventblock *block = (struct qseventblock*)malloc(sizeof(struct qseventblock));
- if(block == NULL)
- {
- printf("blockinit malloc error\n");
- close(reactor->epfd);
- return -2;
- }
- memset(block, 0, sizeof(block));
-
- struct qsevent *evs = (struct qsevent*)malloc(MAX_EPOLLSIZE * sizeof(struct qsevent));
- if(evs == NULL)
- {
- printf("evsnit malloc error\n");
- close(reactor->epfd);
- return -3;
- }
- memset(evs, 0, sizeof(evs));
-
- block->next = NULL;
- block->eventsarrry = evs;
-
- reactor->blkcnt = 1;
- reactor->evblk = block;
- return 0;
- }
-
- int qsreactor_alloc(struct qsreactor *reactor)
- {
- if(reactor == NULL)return -1;
- if(reactor->evblk == NULL)return -1;
- struct qseventblock *tailblock = reactor->evblk;
- while(tailblock->next != NULL)
- tailblock = tailblock->next;
- struct qseventblock *newblock = (struct qseventblock*)malloc(sizeof(struct qseventblock));
- if(newblock == NULL)
- {
- printf("newblock alloc error\n");
- return -1;
- }
- memset(newblock, 0, sizeof(newblock));
-
- struct qsevent *neweventarray = (struct qsevent*)malloc(sizeof(struct qsevent) * MAX_EPOLLSIZE);
- if(neweventarray == NULL)
- {
- printf("neweventarray malloc error\n");
- return -1;
- }
- memset(neweventarray, 0, sizeof(neweventarray));
-
- newblock->eventsarrry = neweventarray;
- newblock->next = NULL;
-
- tailblock->next = newblock;
- reactor->blkcnt++;
-
- return 0;
- }
-
- struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd)
- {
- int index = sockfd / MAX_EPOLLSIZE;
- while(index >= reactor->blkcnt)qsreactor_alloc(reactor);
- int i=0;
- struct qseventblock *idxblock = reactor->evblk;
- while(i++<index && idxblock != NULL)
- idxblock = idxblock->next;
-
- return &idxblock->eventsarrry[sockfd%MAX_EPOLLSIZE];
- }
-
- int qsreactor_destory(struct qsreactor *reactor)
- {
- close(reactor->epfd);
- free(reactor->evblk);
- reactor = NULL;
- return 0;
- }
-
- int qsreactor_addlistener(struct qsreactor *reactor, int sockfd, NCALLBACK acceptor)
- {
- if(reactor == NULL)return -1;
- if(reactor->evblk == NULL)return -1;
-
- struct qsevent *event = qsreactor_idx(reactor, sockfd);
- qs_event_set(event, sockfd, acceptor, reactor);
- qs_event_add(reactor->epfd, EPOLLIN, event);
-
- return 0;
- }
-
- int send_cb(int fd, int events, void *arg)
- {
- struct qsreactor *reactor = (struct qsreactor*)arg;
- struct qsevent *ev = qsreactor_idx(reactor, fd);
-
- int ret = send(fd, ev->buffer, ev->length, 0);
- if(ret < 0)
- {
- qs_event_del(reactor->epfd, ev);
- printf("clent[%d] ", fd);
- perror("send error\n");
- close(fd);
- }
- else if(ret > 0)
- {
- printf("send to client[%d]:%s", fd, ev->buffer);
- qs_event_del(reactor->epfd, ev);
- qs_event_set(ev, fd, recv_cb, reactor);
- qs_event_add(reactor->epfd, EPOLLIN, ev);
- }
- return ret;
- }
-
- int recv_cb(int fd, int events, void *arg)
- {
- struct qsreactor *reactor = (struct qsreactor*)arg;
- struct qsevent *ev = qsreactor_idx(reactor, fd);
-
- int len = recv(fd, ev->buffer, MAX_BUFLEN, 0);
- qs_event_del(reactor->epfd, ev);
- if(len > 0)
- {
- ev->length = len;
- ev->buffer[len] = '\0';
-
- printf("client[%d]:%s", fd, ev->buffer);
-
- qs_event_del(reactor->epfd, ev);
- qs_event_set(ev, fd, send_cb, reactor);
- qs_event_add(reactor->epfd, EPOLLOUT, ev);
- }
- else if(len == 0)
- {
- qs_event_del(reactor->epfd, ev);
- close(fd);
- printf("client[%d] close\n", fd);
- }
- else
- {
- qs_event_del(reactor->epfd, ev);
- printf("client[%d]", fd);
- perror("reacv error,\n");
- close(fd);
- }
- return 0;
- }
-
- int accept_cb(int fd, int events, void *arg)
- {
- struct qsreactor *reactor = (struct qsreactor*)arg;
- if(reactor == NULL)return -1;
-
- struct sockaddr_in client_addr;
- socklen_t len = sizeof(client_addr);
-
- int clientfd;
-
-
- if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1)
- {
- if(errno != EAGAIN && errno != EINTR)
- {}
- perror("accept error\n");
- return -1;
- }
-
- int flag = 0;
- if((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0)
- {
- printf("fcntl noblock error, %d\n",MAX_BUFLEN);
- return -1;
- }
- struct qsevent *event = qsreactor_idx(reactor, clientfd);
-
- qs_event_set(event, clientfd, recv_cb, reactor);
- qs_event_add(reactor->epfd, EPOLLIN, event);
-
- printf("new connect [%s:%d], pos[%d]\n",
- inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
-
- return 0;
- }
-
- int qsreactor_run(struct qsreactor *reactor)
- {
- if(reactor == NULL)
- return -1;
- if(reactor->evblk == NULL)
- return -1;
- if(reactor->epfd < 0)
- return -1;
-
- struct epoll_event events[MAX_EPOLL_EVENTS + 1];
- while(1)
- {
- int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
-
- if(nready < 0)
- {
- printf("epoll_wait error\n");
- continue;
- }
- for(int i=0; i<nready; i++)
- {
- struct qsevent *ev = (struct qsevent*)events[i].data.ptr;
- if((events[i].events & EPOLLIN) && (ev->events & EPOLLIN))
- {
- ev->callback(ev->fd, events[i].events, ev->arg);
- }
- if((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
- {
- ev->callback(ev->fd, events[i].events, ev->arg);
- }
- }
- }
- }
-
- int main(int argc, char **argv)
- {
- unsigned short port = atoi(argv[1]);
-
- int sockfd = sock(port);
-
-
- struct qsreactor *reactor = (struct qsreactor*)malloc(sizeof(struct qsreactor));
- qsreactor_init(reactor);
-
- qsreactor_addlistener(reactor, sockfd, accept_cb);
- qsreactor_run(reactor);
-
- qsreactor_destory(reactor);
- close(sockfd);
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。