当前位置:   article > 正文

一篇文章彻底搞懂websocket协议的原理与应用(二)_两个网页 new websocket 是同一个吗

两个网页 new websocket 是同一个吗

 注:本文转自知乎一篇文章彻底搞懂websocket协议的原理与应用(二) - 知乎

六、WebSocket语言支持

  • 所有主流浏览器都支持RFC6455。但是具体的WebSocket版本有区别。
  • php jetty netty ruby Kaazing nginx python Tomcat Django erlang
  • WebSocket浏览器支持
  • WebSocket浏览器支持
  • netty .net等语言均可以用来实现支持WebSocket的服务器。
  • websocket api在浏览器端的广泛实现似乎只是一个时间问题了, 值得注意的是服务器端没有标准的api, 各个实现都有自己的一套api, 并且tcp也没有类似的提案, 所以使用websocket开发服务器端有一定的风险.可能会被锁定在某个平台上或者将来被迫升级。

WebSocket是HTML5出的东西(协议),也就是说HTTP协议没有变化,或者说没关系,但HTTP是不支持持久连接的(长连接,循环连接的不算)。

  • 首先HTTP有1.1和1.0之说,也就是所谓的keep-alive,把多个HTTP请求合并为一个,但是Websocket其实是一个新协议,跟HTTP协议基本没有关系,只是为了兼容现有浏览器的握手规范而已,也就是说它是HTTP协议上的一种补充可以通过这样一张图理解:

  • 有交集,但是并不是全部。另外Html5是指的一系列新的API,或- 者说新规范,新技术。Http协议本身只有1.0和1.1,而且跟Html本身没有直接关系。
  • 通俗来说,你可以用HTTP 协议 传输非Html 数据 ,就是这样=。=
  • 再简单来说, 层级不一样 。

七、WebSocket通信

1.连接握手

连接握手分为两个步骤:请求和应答。WebSocket利用了HTTP协议来建立连接,使用的是HTTP的协议升级机制。

1.请求

一个标准的HTTP请求,格式如下:

请求头的具体格式定义参见Request-Line格式

请求header中的字段解析:

协议升级机制

Origin

所有浏览器将会发送一个 Origin请求头。 你可以将这个请求头用于安全方面(检查是否是同一个域,白名单/ 黑名单等),如果你不喜欢这个请求发起源,你可以发送一个403 Forbidden。需要注意的是非浏览器只能发送一个模拟的 Origin。大多数应用会拒绝不含这个请求头的请求。

Sec-WebSocket-Key

由客户端随机生成的,提供基本的防护,防止恶意或者无意的连接。

2.应答

返回字段解析:

Connection

可以参见 rfc7230 6.1 和 rfc7230 6.7。

Sec-WebSocket-Accept

它通过客户端发送的Sec-WebSocket-Key 计算出来。计算方法:

将 Sec-WebSocket-Key 跟 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 拼接;

通过 SHA1 计算出摘要,并转成 base64 字符串。

Sec-WebSocket-Key / Sec-WebSocket-Accept 的主要作用还是为了避免一些网络通信过程中,一些非期待的数据包,”乱入“进来,导致一些错误的响应,并不能用于实现登录认证和数据安全,这些功能还需要应用层自己实现。

2.数据传输(双工)

WebSocket 以 frame 为单位传输数据, frame 是客户端和服务端数据传输的最小单元。当一条消息过长时, 通信方可以将该消息拆分成多个 frame 发送, 接收方收到以后重新拼接、解码从而还原出完整的消息, 在 WebSocket 中, frame 有多种类型, frame 的类型由 frame 头部的 Opcode 字段指示, WebSocket frame 的结构如下所示:

该结构的字段语义如下:

FIN, 长度为 1 比特, 该标志位用于指示当前的 frame 是消息的最后一个分段, 因为 WebSocket 支持将长消息切分为若干个 frame 发送, 切分以后, 除了最后一个 frame, 前面的 frame 的 FIN 字段都为 0, 最后一个 frame 的 FIN 字段为 1, 当然, 若消息没有分段, 那么一个 frame 便包含了完成的消息, 此时其 FIN 字段值为 1。

RSV 1 ~ 3, 这三个字段为保留字段, 只有在 WebSocket 扩展时用, 若不启用扩展, 则该三个字段应置为 1, 若接收方收到 RSV 1 ~ 3 不全为 0 的 frame, 并且双方没有协商使用 WebSocket 协议扩展, 则接收方应立即终止 WebSocket 连接。

Opcode, 长度为 4 比特, 该字段将指示 frame 的类型, RFC 6455 定义的 Opcode 共有如下几种:

  • 0x0, 代表当前是一个 continuation frame,既被切分的长消息的每个分片frame
  • 0x1, 代表当前是一个 text frame
  • 0x2, 代表当前是一个 binary frame
  • 0x3 ~ 7, 目前保留, 以后将用作更多的非控制类 frame
  • 0x8, 代表当前是一个 connection close, 用于关闭 WebSocket 连接
  • 0x9, 代表当前是一个 ping frame
  • 0xA, 代表当前是一个 pong frame
  • 0xB ~ F, 目前保留, 以后将用作更多的控制类 frame
  1. Mask, 长度为 1 比特, 该字段是一个标志位, 用于指示 frame 的数据 (Payload) 是否使用掩码掩盖, RFC 6455 规定当且仅当由客户端向服务端发送的 frame, 需要使用掩码覆盖, 掩码覆盖主要为了解决代理缓存污染攻击 (更多细节见 RFC 6455 Section 10.3)。
  2. Payload Len, 以字节为单位指示 frame Payload 的长度, 该字段的长度可变, 可能为 7 比特, 也可能为 7 + 16 比特, 也可能为 7 + 64 比特. 具体来说, 当 Payload 的实际长度在 [0, 125] 时, 则 Payload Len 字段的长度为 7 比特, 它的值直接代表了 Payload 的实际长度; 当 Payload 的实际长度为 126 时, 则 Payload Len 后跟随的 16 位将被解释为 16-bit 的无符号整数, 该整数的值指示 Payload 的实际长度; 当 Payload 的实际长度为 127 时, 其后的 64 比特将被解释为 64-bit 的无符号整数, 该整数的值指示 Payload 的实际长度。
  3. Masking-key, 该字段为可选字段, 当 Mask 标志位为 1 时, 代表这是一个掩码覆盖的 frame, 此时 Masking-key 字段存在, 其长度为 32 位, RFC 6455 规定所有由客户端发往服务端的 frame 都必须使用掩码覆盖, 即对于所有由客户端发往服务端的 frame, 该字段都必须存在, 该字段的值是由客户端使用熵值足够大的随机数发生器生成, 关于掩码覆盖, 将下面讨论, 若 Mask 标识位 0, 则 frame 中将设置该字段 (注意是不设置该字段, 而不仅仅是不给该字段赋值)。
  4. Payload, 该字段的长度是任意的, 该字段即为 frame 的数据部分, 若通信双方协商使用了 WebSocket 扩展, 则该扩展数据 (Extension data) 也将存放在此处, 扩展数据 + 应用数据, 它们的长度和便为 Payload Len 字段指示的值。

以下是一个客户端和服务端相互传递文本消息的示例

其中模拟了长消息被切分为多个帧(continuation frame)的例子。

3.关闭请求

关闭相对简单,由客户端或服务端发送关闭帧,即可完成关闭。

相关视频推荐

大厂面试技巧及websocket协议使用场景和实现方案

即时通讯网页版本的实现方案-websocket的原理

全网最详细epoll讲解,6种epoll的设计,让你吊打面试官

学习地址:c/c++ linux服务器开发/后台架构师

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

八、WebSocket协议进一步理解

WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。

1.WebSocket是什么

WebSocket 协议在2008年诞生,2011年成为国际标准。主流浏览器都已经支持。

WebSocket 是一种全新的协议。它将 TCP 的 Socket(套接字)应用在了网页上,从而使通信双方建立起一个保持在活动状态连接通道,并且属于全双工通信。

WebSocket 协议在2008年诞生,2011年成为国际标准。主流浏览器都已经支持。WebSocket 协议借用 HTTP协议 的 101 switch protocol 来达到协议转换,从HTTP协议切换WebSocket通信协议。它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话。

2.WebSocket出现之前的实时技术

轮询:最早的一种实现实时 Web 应用的方案。客户端以一定的时间间隔向服务端发出请求,以频繁请求的方式来保持客户端和服务器端的通信。

长轮询:长轮询也采用轮询的方式,不过采取的是阻塞模型,客户端发起连接后,如果没消息,就一直不返回Response给客户端。直到有消息才返回,返回完之后,客户端再次建立连接,周而复始。

其他方式:如xhr-streaming、隐藏iframe、ActiveX控件、SSE。

轮询技术非真正实时技术。使用 Ajax 方式模拟实时效果,每次客户端和服务器端交互,都是一次 HTTP 的请求和应答过程,且每次的 HTTP 请求和应答都带有完整 HTTP 头信息,增加传输的数据量。需构建两个http连接。客户端和服务器端编程实现比较复杂,为模拟真实的实时效果,需构造两个 HTTP 连接来模拟客户端和服务器的双向通信,一个连接用来处理客户端到服务器端的数据传输,一个连接用来处理服务器端到客户端的数据传输,增加了编程实现的复杂度、服务器端的负载,制约了应用系统的扩展性。

3.WebSocket应用场景

BS架构下的即时通讯、游戏等应用需要客户端与服务端间的双向通信,而HTTP的请求/响应模型并不适合这种场景。会存在一定的问题:

  • 服务器端被迫提供两类接口,一类提供给客户端轮询新消息,一类提供给客户端推送消息给服务器端。
  • HTTP协议有较多的额外开销,每次发送消息都会有一个HTTP header信息,而且如果不用Keep-Alive每次还都要握手。
  • 客户端的脚本比如JS可能还需要跟踪整个过程,发送一个消息后,我可能需要跟踪这个消息的返回。

Websocket出现使得浏览器提供socket的支持成为可能,从而在浏览器和服务器之间建立一条基于tcp的双向连接通道,web开发人员可以很方便的利用websocket构建实时web应用。WebSocket适用于以下场景:

  • 在线聊天场景:例如qq聊天、淘宝与客服聊天、在线客服等等。这种场景都是需要实时的接收服务器推送的内容。
  • 协同办公:例如腾讯在线文档,腾讯的在线文档是支持多人编辑的,在excel中,一旦有人修改就要立即同步给所有人。
  • 直播弹幕:例如虎牙、斗鱼等各大直播平台,在直播时都是有弹幕的,遇到一些精彩片段时,往往会有弹幕刷屏。在这种情况下使用WebSocket会有一个更好的用户体验。
  • 位置共享:例如微信里位置共享,这种场景需要用户实时的共享自己的位置给服务器,服务器收到位置信息后,要实时的推送给其它共享者的,实时性要求较高;百度地图导航系统,在自己位置到达某个地方之后,语音会立即播报前面道路情况,比如上高架、下地道、拐弯、直行、学校慢行等等。这种场景实时性特别高,汽车速度很快,延迟1秒钟,可能就错过了最佳提醒时机。
  • 其他通过定义WebSocket子协议的扩展支持:例如sip、mqtt、xmpp、stomp等。

4.WebSocket协议栈

WebSocket是基于TCP的应用层协议。需要特别注意的是:虽然WebSocket协议在建立连接时会使用HTTP协议,但这并意味着WebSocket协议是基于HTTP协议实现的。

5.WebSocket与HTTP的区别

  • 通信方式不同。WebSocket是双向通信模式,客户端与服务器之间只有在握手阶段是使用HTTP协议的“请求-响应”模式交互,而一旦连接建立之后的通信则使用双向模式交互,不论是客户端还是服务端都可以随时将数据发送给对方;而HTTP协议则至始至终都采用“请求-响应”模式进行通信。也正因为如此,HTTP协议的通信效率没有WebSocket高。
  • 协议格式不同。HTTP协议的一个数据包就是一条完整的消息;而WebSocket客户端与服务端通信的最小单位是帧,由1个或多个帧组成一条完整的消息。即:发送端将消息切割成多个帧,并发送给服务端;服务端接收消息帧,并将关联的帧重新组装成完整的消息。

6.WebSocket握手过程

客户端到服务端:

  • GET ws://localhost…… HTTP/1.1 :打开阶段握手,使用http1.1协议。
  • Upgrade:websocket,表示请求为特殊http请求,请求的目的是要将客户端和服务端的通信协议从http升级为websocket。
  • Sec-websocket-key:Base64 encode 的值,是浏览器随机生成的。客户端向服务端提供的握手信息。

服务端到客户端:

  • 101状态码:表示切换协议。服务器根据客户端的请求切换到Websocket协议。
  • Sec-websocket-accept: 将请求头中的Set-websocket-key添加字符串并做SHA-1加密后做Base64编码,告知客户端服务器能够发起websocket连接。

客户端发起连接的约定:

  • 如果请求为wss,则在TCP建立后,进行TLS连接建立。
  • 请求的方式必须为GET,HTTP版本至少为HTTP1.1。
  • 请求头中必须有Host。
  • 请求头中必须有Upgrade,取值必须为websocket。
  • 请求头中必须有Connection,取值必须为Upgrade。
  • 请求头中必须有Sec-WebSocket-Key,取值为16字节随机数的Base64编码。
  • 请求头中必须有Sec-WebSocket-Version,取值为13。
  • 请求头中可选Sec-WebSocket-Protocol,取值为客户端期望的一个或多个子协议(多个以逗号分割)。
  • 请求头中可选Sec-WebSocket-Extensitons,取值为子协议支持的扩展集(一般是压缩方式)。
  • 可以包含cookie、Authorization等HTTP规范内合法的请求头。

客户端检查服务端的响应:

  • 服务端返回状态码为101代表升级成功,否则判定连接失败。
  • 响应头中缺少Upgrade或取值不是websocket,判定连接失败。
  • 响应头中缺少Connection或取值不是Upgrade,判定连接失败。
  • 响应头中缺少Sec-WebSocket-Accept或取值非法(其值为请求头中的Set-websocket-key添加字符串并做SHA-1加密后做Base64编码),判定连接失败。
  • 响应头中有Sec-WebSocket-Extensions,但取值不是请求头中的子集,判定连接失败。
  • 响应头中有Sec-WebSocket-Protocol,但取值不是请求头中的子集,判定连接失败。

服务端处理客户端连接:

  • 服务端根据请求中的Sec-WebSocket-Protocol 字段,选择一个子协议返回,如果不返回,表示不同意请求的任何子协议。如果请求中未携带,也不返回。
  • 如果建立连接成功,返回状态码为101。
  • 响应头Connection设置为Upgrade。
  • 响应头Upgrade设置为websocket。
  • Sec-WebSocket-Accpet根据请求头Set-websocket-key计算得到,计算方式为:Set-websocket-key的值添加字符串: 258EAFA5-E914-47DA-95CA-C5AB0DC85B11并做SHA-1加密后得到16进制表示的字符串,将每两位当作一个字节进行分隔,得到字节数组,对字节数组做Base64编码。

7.WebSocket帧格式

WebSocket通信流程如下:

Websocket帧格式如下:

第一部分:

FIN:1位,用于描述消息是否结束,如果为1则该消息为消息尾部,如果为零则还有后续数据包。

RSV1,RSV2,RSV3:各1位,用于扩展定义,如果没有扩展约定的情况则必须为0。

OPCODE:4位,用于表示消息接收类型,如果接收到未知的opcode,接收端必须关闭连接。

OPCODE说明:

  • 0x0表示附加数据帧,当前数据帧为分片的数据帧。
  • 0x1表示文本数据帧,采用UTF-8编码。
  • 0x2表示二进制数据帧。
  • 0x3-7暂时无定义,为以后的非控制帧保留。
  • 0x8表示连接关闭。
  • 0x9表示ping。
  • 0xA表示pong。
  • 0xB-F暂时无定义,为以后的控制帧保留。

第二部分:

  • MASK:1位,用于标识PayloadData是否经过掩码处理。服务端发送给客户端的数据帧不能使用掩码,客户端发送给服务端的数据帧必须使用掩码。如果一个帧的数据使用了掩码,那么在Maksing-key部分必须是一个32个bit位的掩码,用来给服务端解码数据。
  • Payload len:数据的长度:默认位7个bit位。如果数据的长度小于125个字节(注意:是字节)则用默认的7个bit来标示数据的长度。如果数据的长度为126个字节,则用后面相邻的2个字节来保存一个16bit位的无符号整数作为数据的长度。如果数据的长度大于126个字节,则用后面相邻的8个字节来保存一个64bit位的无符号整数作为数据的长度。
  • payload len本来是只能用7bit来表达的,也就是最多一个frame的payload只能有127个字节,为了表示更大的长度,给出的解决方案是添加扩展payload len字段。当payload实际长度超过126(包括),但在2^16-1长度内,则将payload len置为126,payload的实际长度由长为16bit的extended payload length来表达。当payload实际长度超过216(包括),但在264-1长度内,则将payload置为127,payload的实际长度由长为64bit的extended payload length来表达。

第三部分:

数据掩码:如果MASK设置位0,则该部分可以省略,如果MASK设置位1,则Masking-key是一个32位的掩码。用来解码客户端发送给服务端的数据帧。

第四部分:

数据:该部分,也是最后一部分,是帧真正要发送的数据,可以是任意长度。

8.WebSocket分片传输

控制帧可能插在一个Message的多个分片之间,但一个Message的分片不能交替传输(除非有扩展特别定义)。

控制帧不可分片。

分片需要按照分送方提交顺序传递给接收方,但由于IP路由特性,实际并不能保证顺序到达。

控制帧包括:

  • Close:用于关闭连接,可以携带数据,表示关闭原因。
  • Ping:可以携带数据。
  • Pong:用于Keep-alive,返回最近一次Ping中的数据,可以只发送Pong帧,做单向心跳。

连接关闭时状态码说明:

9.WebSocket相关扩展

Stomp:

STOMP是基于帧的协议,它的前身是TTMP协议(一个简单的基于文本的协议),专为消息中间件设计。是属于消息队列的一种协议, 和AMQP, JMS平级。它的简单性恰巧可以用于定义websocket的消息体格式. STOMP协议很多MQ都已支持, 比如RabbitMq, ActiveMq。生产者(发送消息)、消息代理、消费者(订阅然后收到消息)。

2. SockJs:

SockJS是一个浏览器JavaScript库,它提供了一个类似于网络的对象。SockJS提供了一个连贯的、跨浏览器的Javascript API,它在浏览器和web服务器之间创建了一个低延迟、全双工、跨域通信通道。

SockJS的一大好处在于提供了浏览器兼容性。优先使用原生WebSocket,如果在不支持websocket的浏览器中,会自动降为轮询的方式。 除此之外,spring也对socketJS提供了支持。

3. Socket.io

http://Socket.io实际上是WebSocket的父集,http://Socket.io封装了WebSocket和轮询等方法,会根据情况选择方法来进行通讯。

http://Sockei.io最早由Node.js实现,Node.js提供了高效的服务端运行环境,但由于Browser对HTML5的支持不一,为了兼容所有浏览器,提供实时的用户体验,并为开发者提供客户端与服务端一致的编程体验,于是http://Socket.io诞生了。Java模仿Node.js实现了Java版的http://Netty-socket.io库。

http://Socket.io将WebSocket和Polling机制以及其它的实时通信方式封装成通用的接口,并在服务端实现了这些实时机制相应代码,包括:AJAX Long Polling、Adobe Flash Socket、AJAX multipart streaming、Forever Iframem、JSONP Polling。

九、WebSocket 能解决什么问题

工程师应该是以解决问题为主的,如果不会解决问题,只会伸手,必然不会长远,有思考,才会有突破,才能高效的处理事情,所以 websocket 到底解决了什么问题呢?它存在的价值是什么?

这还是得从HTTP说起,大家应该都很熟悉这门协议,我们简单说一下它的特点:

•三次握手、四次挥手 的方式建立连接和关闭连接

•支持长连接和短连接两种连接方式

•有同源策略的限制(端口,协议,域名)

•单次 请求-响应 机制,只支持单向通信

其中最鸡肋的就是最后一个特点,单向通信,什么意思呐? 就是说只能由一方发起请求(客户端),另一方响应请求(服务端),而且每一次的请求都是一个单独的事件,请求之间还无法具有关联性,也就是说我上个请求和下个请求完全是隔离的,无法具有连续性。

也许你觉得这样的说法比较难懂,我们来举一个栗子:

每个人都打过电话吧,电话打通后可以一直聊天是不是觉得很舒服啊,这是一种全双工的通信方式,双方都可以主动传递信息。彼此的聊天也具有连续性。我们简单把这种方式理解为 websocket 协议支持的方式。

如果打电话变成了 HTTP 那种方式呢? 那就不叫打电话了,而是联通爸爸的智能语音助手了,我们知道客户端和服务端本身的身份并不是固定的,只要你可以发起通信,就可以充当客户端,能响应请求,就可以当做服务端,但是在HTTP的世界里一般来说,客户端(大多数情况下是浏览器)和服务器一般是固定的,我们打电话 去查话费,会询问要人工服务还是智能助手,如果选了助手,你只要问她问题,她就会找对应的答案来回答你(响应你),一般都是简单的业务,你不问她也不会跟你闲聊,主动才有故事啊!

但是实际上有很多的业务是需要双方都有主动性的,半双工的模式肯定是不够用的,例如聊天室,跟机器人聊天没意思啊,又例如主动推送,我无聊的时候手都不想点屏幕,你能不能主动一点给我推一些好玩的信息过来。

只要做过前后端分离的同学应该都被跨域的问题折磨过。浏览器的这种同源策略,会导致 不同端口/不同域名/不同协议 的请求会有限制,当然这问题前后端都能处理,然而 websocket 就没有这种要求,他支持任何域名或者端口的访问(协议固定了只能是 ws/wss) ,所以它让人用的更加舒服

所以,上面 HTTP 存在的这些问题,websocket 都能解决!!!

十、WebSocket工作原理

主动是 websocket 的一大特点,像之前如果客户端想知道服务端对某个事件的处理进度,就只能通过轮训( Poll )的方式去询问,十分的耗费资源,会存在十分多的无效请求。下面我简单说推送技术的三种模型区别:

  • •pull (主动获取) 即客户端主动发起请求,获取消息
  • •poll (周期性主动获取) 即周期性的主动发起请求,获取消息
  • •push (主动推送) 服务端主动推送消息给客户端

pull 和 poll 的唯一区别只在于周期性,但是很明显周期性的去询问,对业务来说清晰度很高,这也是为什么很多小公司都是基于轮训的方式去处理业务,因为简单嘛,能力不够机器来撑。这也是很多公司都会面临的问题,如果业务达到了瓶颈,使劲的堆机器,如果用新技术或者更高级的作法,开发成本和维护成本也会变高,还不如简单一点去增加机器配置。

如果两个人需要通话,首先需要建立一个连接,而且必须是一个长链接,大家都不希望讲几句话就得重新打吧,根据上面说的,websocket 会复用之前 HTTP 建立好的长链接,然后再进行升级,所以他和轮训的区别大致如下所示:

图片省去了建立连接的过程,我们可以发现,基于轮训的方式,必须由客户端手动去请求,才会有响应,而基于 websocket 协议,不再需要你主动约妹子了,妹子也可以主动去约你,这才是公平的世界。

为了更好的阐述这个连接的原理,可以使用swoole 自带的 创建websocket 的功能进行测试,服务端代码如下,如果连接不上,可以看看是不是检查一下端口开放情况(iptables/filewall)和网络的连通性,代码如下:

  1. //创建websocket服务器对象,监听0.0.0.0:9501端口
  2. $ws = new Swoole\WebSocket\Server("0.0.0.0", 9501);
  3. //监听WebSocket连接打开事件
  4. $ws->on('open', function ($ws, $request) {
  5. var_dump($request->fd, $request->get, $request->server); //request 对象包含请求的相关信息
  6. //$ws->push($request->fd, "hello, welcome\n");
  7. });
  8. //监听WebSocket消息事件
  9. $ws->on('message', function ($ws, $frame) { // frame 是存储信息的变量,也就是传输帧
  10. echo "Message: {$frame->data}\n";
  11. $ws->push($frame->fd, "server: {$frame->data}");
  12. });
  13. //监听WebSocket连接关闭事件
  14. $ws->on('close', function ($ws, $fd) { // fd 是客户端的标志
  15. echo "client-{$fd} is closed\n";
  16. });
  17. $ws->start(); // 启动这个进程

我们可以发现,相比于 HTTP 的头部,websocket 的数据结构十分的简单小巧,没有像 HTTP 协议一样老是带着笨重的头部,这一设计让websocket的报文可以在体量上更具优势,所以传输效率来说更高 。

当然,我们传输的文本也不能在大街上裸跑啊,既然 HTTP 都有衣服穿了(HTTPS),websocket(ws) 当然也有 (wss)。

在以前的文章我们也简单聊过 HTTPS 是个什么东西,大家不了解可以去翻一下之前的文章,总的来说就是使用了非对称加密算法进行了对称加密密钥的传输,后续采用对称加密解密的方式进行数据安全处理。

如果你的业务需要支撑双全工的通信,那么 websocket 便是一个很不错的选择。网上大多数关于 websocket 的文章,大多是基于前端学习者的角度,他们使用 Chrome 的console 的调试实验,本篇文章更多是基于后端开发者的角度。希望对你有所帮助。

十一、进一步解析什么是WebSocket协议

一、websocket特点

1.websocket优点

  • 保持连接状态:websocket需要先创建连接,使其成为有状态的协议。
  • 更好支持二进制:定义了二进制帧,增加安全性。
  • 支持扩展:定义了扩展,可以自己实现部分部分自定义。
  • 压缩效果好:可以沿用上下文的内容,有更好的压缩效果。

2.websocket缺点

  • 开发要求高: 前端后端都增加了一定的难度。
  • 推送消息相对复杂。
  • HTTP协议已经很成熟,现今websocket则太新了一点。

二、websocket协议通信过程

协议有两个部分:handshake(握手)和 data transfer(数据传输)。

1.handshake

1.客户端

客户端握手报文是在HTTP的基础上发送一次HTTP协议升级请求。

  1. GET /chat HTTP/1.1
  2. Host: server.example.com
  3. Upgrade: websocket
  4. Connection: Upgrade
  5. Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
  6. Origin: http://example.com
  7. Sec-WebSocket-Protocol: chat, superchat
  8. Sec-WebSocket-Version: 13

Sec-WebSocket-Key 是由浏览器随机生成的,提供基本的防护,防止恶意或者无意的连接。

Sec-WebSocket-Version 表示 WebSocket 的版本,最初 WebSocket 协议太多,不同厂商都有自己的协议版本,不过现在已经定下来了。如果服务端不支持该版本,需要返回一个 Sec-WebSocket-Versionheader,里面包含服务端支持的版本号。

2.服务端

服务端响应握手也是在HTTP协议基础上回应一个Switching Protocols。

  1. HTTP/1.1 101 Switching Protocols
  2. Upgrade: websocket
  3. Connection: Upgrade
  4. Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
  5. Sec-WebSocket-Protocol: chat

Linux下对应实现代码,注释在代码中。

  1. int websocket_handshake(struct qsevent *ev)
  2. {
  3. char linebuf[128];
  4. int index = 0;
  5. char sec_data[128] = {0};
  6. char sec_accept[32] = {0};
  7. do
  8. {
  9. memset(linebuf, 0, sizeof(linebuf));//清空以暂存一行报文
  10. index = readline(ev->buffer, index, linebuf);//获取一行报文
  11. if(strstr(linebuf, "Sec-WebSocket-Key"))//如果一行报文里面包括了Sec-WebSocket-Key
  12. {
  13. strcat(linebuf, GUID);//和GUID连接起来
  14. SHA1(linebuf+WEBSOCK_KEY_LENGTH, strlen(linebuf+WEBSOCK_KEY_LENGTH), sec_data);//SHA1
  15. base64_encode(sec_data, strlen(sec_data), sec_accept);//base64编码
  16. memset(ev->buffer, 0, MAX_BUFLEN);//清空服务端数据缓冲区
  17. ev->length = sprintf(ev->buffer,//组装握手响应报文到数据缓冲区,下一步有进行下发
  18. "HTTP/1.1 101 Switching Protocols\r\n"
  19. "Upgrade: websocket\r\n"
  20. "Connection: Upgrade\r\n"
  21. "Sec-websocket-Accept: %s\r\n\r\n", sec_accept);
  22. break;
  23. }
  24. }while(index != -1 && (ev->buffer[index] != '\r') || (ev->buffer[index] != '\n'));//遇到空行之前
  25. return 0;
  26. }

3.data transfer

先看数据包格式。

FIN:指示这是消息中的最后一个片段。第一个片段也可能是最后的片段。

RSV1, RSV2, RSV3:一般情况下全为 0。当客户端、服务端协商采用 WebSocket 扩展时,这三个标志位可以非0,且值的含义由扩展进行定义。如果出现非零的值,且并没有采用 WebSocket 扩展,连接出错。

opcode:操作代码。

  1. %x0:表示一个延续帧。当 Opcode 为 0 时,表示本次数据传输采用了数据分片,当前收到的数据帧为其中一个数据分片;
  2. %x1:表示这是一个文本帧(frame);
  3. %x2:表示这是一个二进制帧(frame);
  4. %x3-7:保留的操作代码,用于后续定义的非控制帧;
  5. %x8:表示连接断开;
  6. %x9:表示这是一个 ping 操作;
  7. %xA:表示这是一个 pong 操作;
  8. %xB-F:保留的操作代码,用于后续定义的控制帧。
  • mask:是否需要掩码。
  • Payload length: 7bit or 7 + 16bit or 7 + 64bit
  1. 表示数据载荷的长度
  2. x 为 0~126:数据的长度为 x 字节;
  3. x 为 126:后续 2 个字节代表一个 16 位的无符号整数,该无符号整数的值为数据的长度;
  4. x 为 127:后续 8 个字节代表一个 64 位的无符号整数(最高位为 0),该无符号整数的值为数据的长度。

payload data:消息体。
下面是服务端的代码实现:

  1. #define GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
  2. enum
  3. {
  4. WS_HANDSHAKE = 0, //握手
  5. WS_TANSMISSION = 1, //通信
  6. WS_END = 2, //end
  7. };
  8. typedef struct _ws_ophdr{
  9. unsigned char opcode:4,
  10. rsv3:1,
  11. rsv2:1,
  12. rsv1:1,
  13. fin:1;
  14. unsigned char pl_len:7,
  15. mask:1;
  16. }ws_ophdr;//协议前两个字节
  17. typedef struct _ws_head_126{
  18. unsigned short payload_lenght;
  19. char mask_key[4];
  20. }ws_head_126;//协议mask和消息体长度
  21. /*解码*/
  22. void websocket_umask(char *payload, int length, char *mask_key)
  23. {
  24. int i = 0;
  25. for( ; i<length; i++)
  26. payload[i] ^= mask_key[i%4];//异或
  27. }
  28. int websocket_transmission(struct qsevent *ev)
  29. {
  30. ws_ophdr *ophdr = (ws_ophdr*)ev->buffer;//协议前两个自己
  31. printf("ws_recv_data length=%d\n", ophdr->pl_len);
  32. if(ophdr->pl_len <126)//如果消息体长度小于126
  33. {
  34. char * payload = ev->buffer + sizeof(ws_ophdr) + 4;//获取消息地址
  35. if(ophdr->mask)//如果消息是掩码
  36. {
  37. websocket_umask(payload, ophdr->pl_len, ev->buffer+2);//解码,异或
  38. printf("payload:%s\n", payload);
  39. }
  40. printf("payload : %s\n", payload);//消息回显
  41. }
  42. else if (hdr->pl_len == 126) {
  43. ws_head_126 *hdr126 = ev->buffer + sizeof(ws_ophdr);
  44. } else {
  45. ws_head_127 *hdr127 = ev->buffer + sizeof(ws_ophdr);
  46. }
  47. return 0;
  48. }
  49. int websocket_request(struct qsevent *ev)
  50. {
  51. if(ev->status_machine == WS_HANDSHAKE)
  52. {
  53. websocket_handshake(ev);//握手
  54. ev->status_machine = WS_TANSMISSION;//设置标志位
  55. }else if(ev->status_machine == WS_TANSMISSION){
  56. websocket_transmission(ev);//通信
  57. }
  58. return 0;
  59. }

代码是基于reactor百万并发服务器框架实现的。

5.epoll反应堆模型下实现http协议

1.客户端结构体

  1. struct qsevent{
  2. int fd; //clientfd
  3. int events; //事件:读、写或异常
  4. int status; //是否位于epfd红黑监听树上
  5. void *arg; //参数
  6. long last_active; //上次数据收发的事件
  7. int (*callback)(int fd, int event, void *arg); //回调函数,单回调,后面修改成多回调
  8. unsigned char buffer[MAX_BUFLEN]; //数据缓冲区
  9. int length; //数据长度
  10. /*http param*/
  11. int method; //http协议请求头部
  12. char resource[MAX_BUFLEN]; //请求的资源
  13. int ret_code; //响应状态码
  14. };

2.int http_response(struct qsevent *ev)

当客户端发送tcp连接时,服务端的listenfd会触发输入事件会调用ev->callback即accept_cb回调函数响应连接并获得clientfd,连接之后,http数据报文发送上来,服务端的clientfd触发输入事件会调用ev->callback即recv_cb回调函数进行数据接收,并解析http报文。

  1. int http_request(struct qsevent *ev)
  2. {
  3. char linebuf[1024] = {0};//用于从buffer中获取每一行的请求报文
  4. int idx = readline(ev->buffer, 0, linebuf);//读取第一行请求方法,readline函数,后面介绍
  5. if(strstr(linebuf, "GET"))//strstr判断是否存在GET请求方法
  6. {
  7. ev->method = HTTP_METHOD_GET;//GET方法表示客户端需要获取资源
  8. int i = 0;
  9. while(linebuf[sizeof("GET ") + i] != ' ')i++;//跳过空格
  10. linebuf[sizeof("GET ") + i] = '\0';
  11. sprintf(ev->resource, "./%s/%s", HTTP_METHOD_ROOT, linebuf+sizeof("GET "));//将资源的名字以文件路径形式存储在ev->resource中
  12. printf("resource:%s\n", ev->resource);//回显
  13. }
  14. else if(strstr(linebuf, "POST"))//POST的请求方法,暂时没写,方法差不多
  15. {}
  16. return 0;
  17. }

一篇文章彻底搞懂websocket协议的原理与应用(二) - 知乎 http_response(struct qsevent *ev)

服务器对客户端的响应报文数据进行http封装储存在buffer中,事件触发时在send_cb回调函数发送给客户端。详细解释请看代码注释。

  1. int http_response(struct qsevent *ev)
  2. {
  3. if(ev == NULL)return -1;
  4. memset(ev->buffer, 0, MAX_BUFLEN);//清空缓冲区准备储存报文
  5. printf("resource:%s\n", ev->resource);//resource:客户端请求的资源文件,通过http_reques函数获取
  6. int filefd = open(ev->resource, O_RDONLY);//只读方式打开获得文件句柄
  7. if(filefd == -1)//获取失败则发送404 NOT FOUND
  8. {
  9. ev->ret_code = 404;//404状态码
  10. ev->length = sprintf(ev->buffer,//将下面数据传入ev->buffer
  11. /***状态行***/
  12. /*版本号 状态码 状态码描述 */
  13. "HTTP/1.1 404 NOT FOUND\r\n"
  14. /***消息报头***/
  15. /*获取当前时间*/
  16. "date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
  17. /*响应正文类型; 编码方式*/
  18. "Content-Type: text/html;charset=ISO-8859-1\r\n"
  19. /*响应正文长度 空行*/
  20. "Content-Length: 85\r\n\r\n"
  21. /***响应正文***/
  22. "<html><head><title>404 Not Found</title></head><body><H1>404</H1></body></html>\r\n\r\n");
  23. }
  24. else
  25. {
  26. struct stat stat_buf; //文件信息
  27. fstat(filefd, &stat_buf); //fstat通过文件句柄获取文件信息
  28. if(S_ISDIR(stat_buf.st_mode)) //如果文件是一个目录
  29. {
  30. printf(ev->buffer, //同上,将404放入buffer中
  31. "HTTP/1.1 404 Not Found\r\n"
  32. "Date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
  33. "Content-Type: text/html;charset=ISO-8859-1\r\n"
  34. "Content-Length: 85\r\n\r\n"
  35. "<html><head><title>404 Not Found</title></head><body><H1>404</H1></body></html>\r\n\r\n" );
  36. }
  37. else if (S_ISREG(stat_buf.st_mode)) //如果文件是存在
  38. {
  39. ev->ret_code = 200; //200状态码
  40. ev->length = sprintf(ev->buffer, //length记录长度,buffer储存响应报文
  41. "HTTP/1.1 200 OK\r\n"
  42. "Date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
  43. "Content-Type: text/html;charset=ISO-8859-1\r\n"
  44. "Content-Length: %ld\r\n\r\n",
  45. stat_buf.st_size );//文件长度储存在stat_buf.st_size中
  46. }
  47. return ev->length;//返回报文长度
  48. }
  49. }

4.总代码

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <sys/socket.h>
  5. #include <sys/epoll.h>
  6. #include <arpa/inet.h>
  7. #include <fcntl.h>
  8. #include <unistd.h>
  9. #include <errno.h>
  10. #include <time.h>
  11. #include <sys/stat.h>
  12. #include <sys/sendfile.h>
  13. #define HTTP_METHOD_ROOT "html"
  14. #define MAX_BUFLEN 4096
  15. #define MAX_EPOLLSIZE 1024
  16. #define MAX_EPOLL_EVENTS 1024
  17. #define HTTP_METHOD_GET 0
  18. #define HTTP_METHOD_POST 1
  19. typedef int (*NCALLBACK)(int, int, void*);
  20. struct qsevent{
  21. int fd;
  22. int events;
  23. int status;
  24. void *arg;
  25. long last_active;
  26. int (*callback)(int fd, int event, void *arg);
  27. unsigned char buffer[MAX_BUFLEN];
  28. int length;
  29. /*http param*/
  30. int method;
  31. char resource[MAX_BUFLEN];
  32. int ret_code;
  33. };
  34. struct qseventblock{
  35. struct qsevent *eventsarrry;
  36. struct qseventblock *next;
  37. };
  38. struct qsreactor{
  39. int epfd;
  40. int blkcnt;
  41. struct qseventblock *evblk;
  42. };
  43. int recv_cb(int fd, int events, void *arg);
  44. int send_cb(int fd, int events, void *arg);
  45. struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd);
  46. int readline(char *allbuf, int idx, char *linebuf)
  47. {
  48. int len = strlen(allbuf);
  49. for( ; idx<len; idx++ )
  50. {
  51. if(allbuf[idx] == '\r' && allbuf[idx+1] == '\n')
  52. return idx+2;
  53. else
  54. *(linebuf++) = allbuf[idx];
  55. }
  56. return -1;
  57. }
  58. void qs_event_set(struct qsevent *ev, int fd, NCALLBACK callback, void *arg)
  59. {
  60. ev->events = 0;
  61. ev->fd = fd;
  62. ev->arg = arg;
  63. ev->callback = callback;
  64. ev->last_active = time(NULL);
  65. return;
  66. }
  67. int qs_event_add(int epfd, int events, struct qsevent *ev)
  68. {
  69. struct epoll_event epv = {0, {0}};;
  70. epv.events = ev->events = events;
  71. epv.data.ptr = ev;
  72. if(ev->status == 1)
  73. {
  74. if(epoll_ctl(epfd, EPOLL_CTL_MOD, ev->fd, &epv) < 0)
  75. {
  76. perror("EPOLL_CTL_MOD error\n");
  77. return -1;
  78. }
  79. }
  80. else if(ev->status == 0)
  81. {
  82. if(epoll_ctl(epfd, EPOLL_CTL_ADD, ev->fd, &epv) < 0)
  83. {
  84. perror("EPOLL_CTL_ADD error\n");
  85. return -2;
  86. }
  87. ev->status = 1;
  88. }
  89. return 0;
  90. }
  91. int qs_event_del(int epfd, struct qsevent *ev)
  92. {
  93. struct epoll_event epv = {0, {0}};
  94. if(ev->status != 1)
  95. return -1;
  96. ev->status = 0;
  97. epv.data.ptr = ev;
  98. if((epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &epv)))
  99. {
  100. perror("EPOLL_CTL_DEL error\n");
  101. return -1;
  102. }
  103. return 0;
  104. }
  105. int sock(short port)
  106. {
  107. int fd = socket(AF_INET, SOCK_STREAM, 0);
  108. fcntl(fd, F_SETFL, O_NONBLOCK);
  109. struct sockaddr_in ser_addr;
  110. memset(&ser_addr, 0, sizeof(ser_addr));
  111. ser_addr.sin_addr.s_addr = htonl(INADDR_ANY);
  112. ser_addr.sin_family = AF_INET;
  113. ser_addr.sin_port = htons(port);
  114. bind(fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
  115. if(listen(fd, 20) < 0)
  116. perror("listen error\n");
  117. printf("listener[%d] lstening..\n", fd);
  118. return fd;
  119. }
  120. int http_request(struct qsevent *ev)
  121. {
  122. char linebuf[1024] = {0};
  123. int idx = readline(ev->buffer, 0, linebuf);
  124. if(strstr(linebuf, "GET"))
  125. {
  126. ev->method = HTTP_METHOD_GET;
  127. int i = 0;
  128. while(linebuf[sizeof("GET ") + i] != ' ')i++;
  129. linebuf[sizeof("GET ") + i] = '\0';
  130. sprintf(ev->resource, "./%s/%s", HTTP_METHOD_ROOT, linebuf+sizeof("GET "));
  131. printf("resource:%s\n", ev->resource);
  132. }
  133. else if(strstr(linebuf, "POST"))
  134. {}
  135. return 0;
  136. }
  137. int http_response(struct qsevent *ev)
  138. {
  139. if(ev == NULL)return -1;
  140. memset(ev->buffer, 0, MAX_BUFLEN);
  141. printf("resource:%s\n", ev->resource);
  142. int filefd = open(ev->resource, O_RDONLY);
  143. if(filefd == -1)
  144. {
  145. ev->ret_code = 404;
  146. ev->length = sprintf(ev->buffer,
  147. "HTTP/1.1 404 NOT FOUND\r\n"
  148. "date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
  149. "Content-Type: text/html;charset=ISO-8859-1\r\n"
  150. "Content-Length: 85\r\n\r\n"
  151. "<html><head><title>404 Not Found</title></head><body><H1>404</H1></body></html>\r\n\r\n");
  152. }
  153. else
  154. {
  155. struct stat stat_buf;
  156. fstat(filefd, &stat_buf);
  157. if(S_ISDIR(stat_buf.st_mode))
  158. {
  159. printf(ev->buffer,
  160. "HTTP/1.1 404 Not Found\r\n"
  161. "Date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
  162. "Content-Type: text/html;charset=ISO-8859-1\r\n"
  163. "Content-Length: 85\r\n\r\n"
  164. "<html><head><title>404 Not Found</title></head><body><H1>404</H1></body></html>\r\n\r\n" );
  165. }
  166. else if (S_ISREG(stat_buf.st_mode))
  167. {
  168. ev->ret_code = 200;
  169. ev->length = sprintf(ev->buffer,
  170. "HTTP/1.1 200 OK\r\n"
  171. "Date: Thu, 11 Nov 2021 12:28:52 GMT\r\n"
  172. "Content-Type: text/html;charset=ISO-8859-1\r\n"
  173. "Content-Length: %ld\r\n\r\n",
  174. stat_buf.st_size );
  175. }
  176. return ev->length;
  177. }
  178. }
  179. int qsreactor_init(struct qsreactor *reactor)
  180. {
  181. if(reactor == NULL)
  182. return -1;
  183. memset(reactor, 0, sizeof(struct qsreactor));
  184. reactor->epfd = epoll_create(1);
  185. if(reactor->epfd <= 0)
  186. {
  187. perror("epoll_create error\n");
  188. return -1;
  189. }
  190. struct qseventblock *block = (struct qseventblock*)malloc(sizeof(struct qseventblock));
  191. if(block == NULL)
  192. {
  193. printf("blockinit malloc error\n");
  194. close(reactor->epfd);
  195. return -2;
  196. }
  197. memset(block, 0, sizeof(block));
  198. struct qsevent *evs = (struct qsevent*)malloc(MAX_EPOLLSIZE * sizeof(struct qsevent));
  199. if(evs == NULL)
  200. {
  201. printf("evsnit malloc error\n");
  202. close(reactor->epfd);
  203. return -3;
  204. }
  205. memset(evs, 0, sizeof(evs));
  206. block->next = NULL;
  207. block->eventsarrry = evs;
  208. reactor->blkcnt = 1;
  209. reactor->evblk = block;
  210. return 0;
  211. }
  212. int qsreactor_alloc(struct qsreactor *reactor)
  213. {
  214. if(reactor == NULL)return -1;
  215. if(reactor->evblk == NULL)return -1;
  216. struct qseventblock *tailblock = reactor->evblk;
  217. while(tailblock->next != NULL)
  218. tailblock = tailblock->next;
  219. struct qseventblock *newblock = (struct qseventblock*)malloc(sizeof(struct qseventblock));
  220. if(newblock == NULL)
  221. {
  222. printf("newblock alloc error\n");
  223. return -1;
  224. }
  225. memset(newblock, 0, sizeof(newblock));
  226. struct qsevent *neweventarray = (struct qsevent*)malloc(sizeof(struct qsevent) * MAX_EPOLLSIZE);
  227. if(neweventarray == NULL)
  228. {
  229. printf("neweventarray malloc error\n");
  230. return -1;
  231. }
  232. memset(neweventarray, 0, sizeof(neweventarray));
  233. newblock->eventsarrry = neweventarray;
  234. newblock->next = NULL;
  235. tailblock->next = newblock;
  236. reactor->blkcnt++;
  237. return 0;
  238. }
  239. struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd)
  240. {
  241. int index = sockfd / MAX_EPOLLSIZE;
  242. while(index >= reactor->blkcnt)qsreactor_alloc(reactor);
  243. int i=0;
  244. struct qseventblock *idxblock = reactor->evblk;
  245. while(i++<index && idxblock != NULL)
  246. idxblock = idxblock->next;
  247. return &idxblock->eventsarrry[sockfd%MAX_EPOLLSIZE];
  248. }
  249. int qsreactor_destory(struct qsreactor *reactor)
  250. {
  251. close(reactor->epfd);
  252. free(reactor->evblk);
  253. reactor = NULL;
  254. return 0;
  255. }
  256. int qsreactor_addlistener(struct qsreactor *reactor, int sockfd, NCALLBACK acceptor)
  257. {
  258. if(reactor == NULL)return -1;
  259. if(reactor->evblk == NULL)return -1;
  260. struct qsevent *event = qsreactor_idx(reactor, sockfd);
  261. qs_event_set(event, sockfd, acceptor, reactor);
  262. qs_event_add(reactor->epfd, EPOLLIN, event);
  263. return 0;
  264. }
  265. int send_cb(int fd, int events, void *arg)
  266. {
  267. struct qsreactor *reactor = (struct qsreactor*)arg;
  268. struct qsevent *ev = qsreactor_idx(reactor, fd);
  269. http_response(ev);
  270. int ret = send(fd, ev->buffer, ev->length, 0);
  271. if(ret < 0)
  272. {
  273. qs_event_del(reactor->epfd, ev);
  274. printf("clent[%d] ", fd);
  275. perror("send error\n");
  276. close(fd);
  277. }
  278. else if(ret > 0)
  279. {
  280. if(ev->ret_code == 200)
  281. {
  282. int filefd = open(ev->resource, O_RDONLY);
  283. struct stat stat_buf;
  284. fstat(filefd, &stat_buf);
  285. sendfile(fd, filefd, NULL, stat_buf.st_size);
  286. close(filefd);
  287. }
  288. printf("send to client[%d]:%s", fd, ev->buffer);
  289. qs_event_del(reactor->epfd, ev);
  290. qs_event_set(ev, fd, recv_cb, reactor);
  291. qs_event_add(reactor->epfd, EPOLLIN, ev);
  292. }
  293. return ret;
  294. }
  295. int recv_cb(int fd, int events, void *arg)
  296. {
  297. struct qsreactor *reactor = (struct qsreactor*)arg;
  298. struct qsevent *ev = qsreactor_idx(reactor, fd);
  299. int len = recv(fd, ev->buffer, MAX_BUFLEN, 0);
  300. qs_event_del(reactor->epfd, ev);
  301. if(len > 0)
  302. {
  303. ev->length = len;
  304. ev->buffer[len] = '\0';
  305. printf("client[%d]:%s", fd, ev->buffer);
  306. http_request(ev);
  307. qs_event_del(reactor->epfd, ev);
  308. qs_event_set(ev, fd, send_cb, reactor);
  309. qs_event_add(reactor->epfd, EPOLLOUT, ev);
  310. }
  311. else if(len == 0)
  312. {
  313. qs_event_del(reactor->epfd, ev);
  314. close(fd);
  315. printf("client[%d] close\n", fd);
  316. }
  317. else
  318. {
  319. qs_event_del(reactor->epfd, ev);
  320. printf("client[%d]", fd);
  321. perror("reacv error,\n");
  322. close(fd);
  323. }
  324. return 0;
  325. }
  326. int accept_cb(int fd, int events, void *arg)
  327. {
  328. struct qsreactor *reactor = (struct qsreactor*)arg;
  329. if(reactor == NULL)return -1;
  330. struct sockaddr_in client_addr;
  331. socklen_t len = sizeof(client_addr);
  332. int clientfd;
  333. if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1)
  334. {
  335. if(errno != EAGAIN && errno != EINTR)
  336. {}
  337. perror("accept error\n");
  338. return -1;
  339. }
  340. int flag = 0;
  341. if((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0)
  342. {
  343. printf("fcntl noblock error, %d\n",MAX_BUFLEN);
  344. return -1;
  345. }
  346. struct qsevent *event = qsreactor_idx(reactor, clientfd);
  347. qs_event_set(event, clientfd, recv_cb, reactor);
  348. qs_event_add(reactor->epfd, EPOLLIN, event);
  349. printf("new connect [%s:%d], pos[%d]\n",
  350. inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
  351. return 0;
  352. }
  353. int qsreactor_run(struct qsreactor *reactor)
  354. {
  355. if(reactor == NULL)
  356. return -1;
  357. if(reactor->evblk == NULL)
  358. return -1;
  359. if(reactor->epfd < 0)
  360. return -1;
  361. struct epoll_event events[MAX_EPOLL_EVENTS + 1];
  362. while(1)
  363. {
  364. int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
  365. if(nready < 0)
  366. {
  367. printf("epoll_wait error\n");
  368. continue;
  369. }
  370. for(int i=0; i<nready; i++)
  371. {
  372. struct qsevent *ev = (struct qsevent*)events[i].data.ptr;
  373. if((events[i].events & EPOLLIN) && (ev->events & EPOLLIN))
  374. {
  375. ev->callback(ev->fd, events[i].events, ev->arg);
  376. }
  377. if((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
  378. {
  379. ev->callback(ev->fd, events[i].events, ev->arg);
  380. }
  381. }
  382. }
  383. }
  384. int main(int argc, char **argv)
  385. {
  386. unsigned short port = atoi(argv[1]);
  387. int sockfd = sock(port);
  388. struct qsreactor *reactor = (struct qsreactor*)malloc(sizeof(struct qsreactor));
  389. qsreactor_init(reactor);
  390. qsreactor_addlistener(reactor, sockfd, accept_cb);
  391. qsreactor_run(reactor);
  392. qsreactor_destory(reactor);
  393. close(sockfd);
  394. }

5.epoll反应堆模型下实现websocket协议

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <sys/socket.h>
  5. #include <sys/epoll.h>
  6. #include <arpa/inet.h>
  7. #include <fcntl.h>
  8. #include <unistd.h>
  9. #include <errno.h>
  10. #include <time.h>
  11. #include <sys/stat.h>
  12. #include <sys/sendfile.h>
  13. #include <openssl/sha.h>
  14. #include <openssl/pem.h>
  15. #include <openssl/bio.h>
  16. #include <openssl/evp.h>
  17. #define MAX_BUFLEN 4096
  18. #define MAX_EPOLLSIZE 1024
  19. #define MAX_EPOLL_EVENTS 1024
  20. #define GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
  21. enum
  22. {
  23. WS_HANDSHAKE = 0,
  24. WS_TANSMISSION = 1,
  25. WS_END = 2,
  26. };
  27. typedef struct _ws_ophdr{
  28. unsigned char opcode:4,
  29. rsv3:1,
  30. rsv2:1,
  31. rsv1:1,
  32. fin:1;
  33. unsigned char pl_len:7,
  34. mask:1;
  35. }ws_ophdr;
  36. typedef struct _ws_head_126{
  37. unsigned short payload_lenght;
  38. char mask_key[4];
  39. }ws_head_126;
  40. typedef struct _ws_head_127{
  41. long long payload_lenght;
  42. char mask_key[4];
  43. }ws_head_127;
  44. typedef int (*NCALLBACK)(int, int, void*);
  45. struct qsevent{
  46. int fd;
  47. int events;
  48. int status;
  49. void *arg;
  50. long last_active;
  51. int (*callback)(int fd, int event, void *arg);
  52. unsigned char buffer[MAX_BUFLEN];
  53. int length;
  54. /*websocket param*/
  55. int status_machine;
  56. };
  57. struct qseventblock{
  58. struct qsevent *eventsarrry;
  59. struct qseventblock *next;
  60. };
  61. struct qsreactor{
  62. int epfd;
  63. int blkcnt;
  64. struct qseventblock *evblk;
  65. };
  66. int recv_cb(int fd, int events, void *arg);
  67. int send_cb(int fd, int events, void *arg);
  68. struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd);
  69. int readline(char *allbuf, int idx, char *linebuf)
  70. {
  71. int len = strlen(allbuf);
  72. for(;idx < len;idx ++) {
  73. if (allbuf[idx] == '\r' && allbuf[idx+1] == '\n') {
  74. return idx+2;
  75. } else {
  76. *(linebuf++) = allbuf[idx];
  77. }
  78. }
  79. return -1;
  80. }
  81. int base64_encode(char *in_str, int in_len, char *out_str)
  82. {
  83. BIO *b64, *bio;
  84. BUF_MEM *bptr = NULL;
  85. size_t size = 0;
  86. if (in_str == NULL || out_str == NULL)
  87. return -1;
  88. b64 = BIO_new(BIO_f_base64());
  89. bio = BIO_new(BIO_s_mem());
  90. bio = BIO_push(b64, bio);
  91. BIO_write(bio, in_str, in_len);
  92. BIO_flush(bio);
  93. BIO_get_mem_ptr(bio, &bptr);
  94. memcpy(out_str, bptr->data, bptr->length);
  95. out_str[bptr->length-1] = '\0';
  96. size = bptr->length;
  97. BIO_free_all(bio);
  98. return size;
  99. }
  100. #define WEBSOCK_KEY_LENGTH 19
  101. int websocket_handshake(struct qsevent *ev)
  102. {
  103. char linebuf[128];
  104. int index = 0;
  105. char sec_data[128] = {0};
  106. char sec_accept[32] = {0};
  107. do
  108. {
  109. memset(linebuf, 0, sizeof(linebuf));
  110. index = readline(ev->buffer, index, linebuf);
  111. if(strstr(linebuf, "Sec-WebSocket-Key"))
  112. {
  113. strcat(linebuf, GUID);
  114. SHA1(linebuf+WEBSOCK_KEY_LENGTH, strlen(linebuf+WEBSOCK_KEY_LENGTH), sec_data);
  115. base64_encode(sec_data, strlen(sec_data), sec_accept);
  116. memset(ev->buffer, 0, MAX_BUFLEN);
  117. ev->length = sprintf(ev->buffer,
  118. "HTTP/1.1 101 Switching Protocols\r\n"
  119. "Upgrade: websocket\r\n"
  120. "Connection: Upgrade\r\n"
  121. "Sec-websocket-Accept: %s\r\n\r\n", sec_accept);
  122. break;
  123. }
  124. }while(index != -1 && (ev->buffer[index] != '\r') || (ev->buffer[index] != '\n'));
  125. return 0;
  126. }
  127. void websocket_umask(char *payload, int length, char *mask_key)
  128. {
  129. int i = 0;
  130. for( ; i<length; i++)
  131. payload[i] ^= mask_key[i%4];
  132. }
  133. int websocket_transmission(struct qsevent *ev)
  134. {
  135. ws_ophdr *ophdr = (ws_ophdr*)ev->buffer;
  136. printf("ws_recv_data length=%d\n", ophdr->pl_len);
  137. if(ophdr->pl_len <126)
  138. {
  139. char * payload = ev->buffer + sizeof(ws_ophdr) + 4;
  140. if(ophdr->mask)
  141. {
  142. websocket_umask(payload, ophdr->pl_len, ev->buffer+2);
  143. printf("payload:%s\n", payload);
  144. }
  145. memset(ev->buffer, 0, ev->length);
  146. strcpy(ev->buffer, "00ok");
  147. }
  148. return 0;
  149. }
  150. int websocket_request(struct qsevent *ev)
  151. {
  152. if(ev->status_machine == WS_HANDSHAKE)
  153. {
  154. websocket_handshake(ev);
  155. ev->status_machine = WS_TANSMISSION;
  156. }else if(ev->status_machine == WS_TANSMISSION){
  157. websocket_transmission(ev);
  158. }
  159. return 0;
  160. }
  161. void qs_event_set(struct qsevent *ev, int fd, NCALLBACK callback, void *arg)
  162. {
  163. ev->events = 0;
  164. ev->fd = fd;
  165. ev->arg = arg;
  166. ev->callback = callback;
  167. ev->last_active = time(NULL);
  168. return;
  169. }
  170. int qs_event_add(int epfd, int events, struct qsevent *ev)
  171. {
  172. struct epoll_event epv = {0, {0}};;
  173. epv.events = ev->events = events;
  174. epv.data.ptr = ev;
  175. if(ev->status == 1)
  176. {
  177. if(epoll_ctl(epfd, EPOLL_CTL_MOD, ev->fd, &epv) < 0)
  178. {
  179. perror("EPOLL_CTL_MOD error\n");
  180. return -1;
  181. }
  182. }
  183. else if(ev->status == 0)
  184. {
  185. if(epoll_ctl(epfd, EPOLL_CTL_ADD, ev->fd, &epv) < 0)
  186. {
  187. perror("EPOLL_CTL_ADD error\n");
  188. return -2;
  189. }
  190. ev->status = 1;
  191. }
  192. return 0;
  193. }
  194. int qs_event_del(int epfd, struct qsevent *ev)
  195. {
  196. struct epoll_event epv = {0, {0}};
  197. if(ev->status != 1)
  198. return -1;
  199. ev->status = 0;
  200. epv.data.ptr = ev;
  201. if((epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &epv)))
  202. {
  203. perror("EPOLL_CTL_DEL error\n");
  204. return -1;
  205. }
  206. return 0;
  207. }
  208. int sock(short port)
  209. {
  210. int fd = socket(AF_INET, SOCK_STREAM, 0);
  211. fcntl(fd, F_SETFL, O_NONBLOCK);
  212. struct sockaddr_in ser_addr;
  213. memset(&ser_addr, 0, sizeof(ser_addr));
  214. ser_addr.sin_addr.s_addr = htonl(INADDR_ANY);
  215. ser_addr.sin_family = AF_INET;
  216. ser_addr.sin_port = htons(port);
  217. bind(fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
  218. if(listen(fd, 20) < 0)
  219. perror("listen error\n");
  220. printf("listener[%d] lstening..\n", fd);
  221. return fd;
  222. }
  223. int qsreactor_init(struct qsreactor *reactor)
  224. {
  225. if(reactor == NULL)
  226. return -1;
  227. memset(reactor, 0, sizeof(struct qsreactor));
  228. reactor->epfd = epoll_create(1);
  229. if(reactor->epfd <= 0)
  230. {
  231. perror("epoll_create error\n");
  232. return -1;
  233. }
  234. struct qseventblock *block = (struct qseventblock*)malloc(sizeof(struct qseventblock));
  235. if(block == NULL)
  236. {
  237. printf("blockinit malloc error\n");
  238. close(reactor->epfd);
  239. return -2;
  240. }
  241. memset(block, 0, sizeof(block));
  242. struct qsevent *evs = (struct qsevent*)malloc(MAX_EPOLLSIZE * sizeof(struct qsevent));
  243. if(evs == NULL)
  244. {
  245. printf("evsnit malloc error\n");
  246. close(reactor->epfd);
  247. return -3;
  248. }
  249. memset(evs, 0, sizeof(evs));
  250. block->next = NULL;
  251. block->eventsarrry = evs;
  252. reactor->blkcnt = 1;
  253. reactor->evblk = block;
  254. return 0;
  255. }
  256. int qsreactor_alloc(struct qsreactor *reactor)
  257. {
  258. if(reactor == NULL)return -1;
  259. if(reactor->evblk == NULL)return -1;
  260. struct qseventblock *tailblock = reactor->evblk;
  261. while(tailblock->next != NULL)
  262. tailblock = tailblock->next;
  263. struct qseventblock *newblock = (struct qseventblock*)malloc(sizeof(struct qseventblock));
  264. if(newblock == NULL)
  265. {
  266. printf("newblock alloc error\n");
  267. return -1;
  268. }
  269. memset(newblock, 0, sizeof(newblock));
  270. struct qsevent *neweventarray = (struct qsevent*)malloc(sizeof(struct qsevent) * MAX_EPOLLSIZE);
  271. if(neweventarray == NULL)
  272. {
  273. printf("neweventarray malloc error\n");
  274. return -1;
  275. }
  276. memset(neweventarray, 0, sizeof(neweventarray));
  277. newblock->eventsarrry = neweventarray;
  278. newblock->next = NULL;
  279. tailblock->next = newblock;
  280. reactor->blkcnt++;
  281. return 0;
  282. }
  283. struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd)
  284. {
  285. int index = sockfd / MAX_EPOLLSIZE;
  286. while(index >= reactor->blkcnt)qsreactor_alloc(reactor);
  287. int i=0;
  288. struct qseventblock *idxblock = reactor->evblk;
  289. while(i++<index && idxblock != NULL)
  290. idxblock = idxblock->next;
  291. return &idxblock->eventsarrry[sockfd%MAX_EPOLLSIZE];
  292. }
  293. int qsreactor_destory(struct qsreactor *reactor)
  294. {
  295. close(reactor->epfd);
  296. free(reactor->evblk);
  297. reactor = NULL;
  298. return 0;
  299. }
  300. int qsreactor_addlistener(struct qsreactor *reactor, int sockfd, NCALLBACK acceptor)
  301. {
  302. if(reactor == NULL)return -1;
  303. if(reactor->evblk == NULL)return -1;
  304. struct qsevent *event = qsreactor_idx(reactor, sockfd);
  305. qs_event_set(event, sockfd, acceptor, reactor);
  306. qs_event_add(reactor->epfd, EPOLLIN, event);
  307. return 0;
  308. }
  309. int send_cb(int fd, int events, void *arg)
  310. {
  311. struct qsreactor *reactor = (struct qsreactor*)arg;
  312. struct qsevent *ev = qsreactor_idx(reactor, fd);
  313. int ret = send(fd, ev->buffer, ev->length, 0);
  314. if(ret < 0)
  315. {
  316. qs_event_del(reactor->epfd, ev);
  317. printf("clent[%d] ", fd);
  318. perror("send error\n");
  319. close(fd);
  320. }
  321. else if(ret > 0)
  322. {
  323. printf("send to client[%d]:\n%s\n", fd, ev->buffer);
  324. qs_event_del(reactor->epfd, ev);
  325. qs_event_set(ev, fd, recv_cb, reactor);
  326. qs_event_add(reactor->epfd, EPOLLIN, ev);
  327. }
  328. return ret;
  329. }
  330. int recv_cb(int fd, int events, void *arg)
  331. {
  332. struct qsreactor *reactor = (struct qsreactor*)arg;
  333. struct qsevent *ev = qsreactor_idx(reactor, fd);
  334. int len = recv(fd, ev->buffer, MAX_BUFLEN, 0);
  335. qs_event_del(reactor->epfd, ev);
  336. if(len > 0)
  337. {
  338. ev->length = len;
  339. ev->buffer[len] = '\0';
  340. printf("client[%d]:\n%s\n", fd, ev->buffer);
  341. websocket_request(ev);
  342. qs_event_del(reactor->epfd, ev);
  343. qs_event_set(ev, fd, send_cb, reactor);
  344. qs_event_add(reactor->epfd, EPOLLOUT, ev);
  345. }
  346. else if(len == 0)
  347. {
  348. qs_event_del(reactor->epfd, ev);
  349. close(fd);
  350. printf("client[%d] close\n", fd);
  351. }
  352. else
  353. {
  354. qs_event_del(reactor->epfd, ev);
  355. printf("client[%d]", fd);
  356. perror("reacv error,\n");
  357. close(fd);
  358. }
  359. return 0;
  360. }
  361. int accept_cb(int fd, int events, void *arg)
  362. {
  363. struct qsreactor *reactor = (struct qsreactor*)arg;
  364. if(reactor == NULL)return -1;
  365. struct sockaddr_in client_addr;
  366. socklen_t len = sizeof(client_addr);
  367. int clientfd;
  368. if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1)
  369. {
  370. if(errno != EAGAIN && errno != EINTR)
  371. {}
  372. perror("accept error\n");
  373. return -1;
  374. }
  375. int flag = 0;
  376. if((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0)
  377. {
  378. printf("fcntl noblock error, %d\n",MAX_BUFLEN);
  379. return -1;
  380. }
  381. struct qsevent *event = qsreactor_idx(reactor, clientfd);
  382. event->status_machine = WS_HANDSHAKE;
  383. qs_event_set(event, clientfd, recv_cb, reactor);
  384. qs_event_add(reactor->epfd, EPOLLIN, event);
  385. printf("new connect [%s:%d], pos[%d]\n",
  386. inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
  387. return 0;
  388. }
  389. int qsreactor_run(struct qsreactor *reactor)
  390. {
  391. if(reactor == NULL)
  392. return -1;
  393. if(reactor->evblk == NULL)
  394. return -1;
  395. if(reactor->epfd < 0)
  396. return -1;
  397. struct epoll_event events[MAX_EPOLL_EVENTS + 1];
  398. while(1)
  399. {
  400. int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
  401. if(nready < 0)
  402. {
  403. printf("epoll_wait error\n");
  404. continue;
  405. }
  406. for(int i=0; i<nready; i++)
  407. {
  408. struct qsevent *ev = (struct qsevent*)events[i].data.ptr;
  409. if((events[i].events & EPOLLIN) && (ev->events & EPOLLIN))
  410. {
  411. ev->callback(ev->fd, events[i].events, ev->arg);
  412. }
  413. if((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
  414. {
  415. ev->callback(ev->fd, events[i].events, ev->arg);
  416. }
  417. }
  418. }
  419. }
  420. int main(int argc, char **argv)
  421. {
  422. unsigned short port = atoi(argv[1]);
  423. int sockfd = sock(port);
  424. struct qsreactor *reactor = (struct qsreactor*)malloc(sizeof(struct qsreactor));
  425. qsreactor_init(reactor);
  426. qsreactor_addlistener(reactor, sockfd, accept_cb);
  427. qsreactor_run(reactor);
  428. qsreactor_destory(reactor);
  429. close(sockfd);
  430. }

6.C1000K reactor模型,epoll实现,连接并回发一段数据,测试正常

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <sys/socket.h>
  5. #include <sys/epoll.h>
  6. #include <arpa/inet.h>
  7. #include <fcntl.h>
  8. #include <unistd.h>
  9. #include <errno.h>
  10. #include <time.h>
  11. #include <sys/stat.h>
  12. #include <sys/sendfile.h>
  13. #define MAX_BUFLEN 4096
  14. #define MAX_EPOLLSIZE 1024
  15. #define MAX_EPOLL_EVENTS 1024
  16. typedef int (*NCALLBACK)(int, int, void*);
  17. struct qsevent{
  18. int fd;
  19. int events;
  20. int status;
  21. void *arg;
  22. long last_active;
  23. int (*callback)(int fd, int event, void *arg);
  24. unsigned char buffer[MAX_BUFLEN];
  25. int length;
  26. };
  27. struct qseventblock{
  28. struct qsevent *eventsarrry;
  29. struct qseventblock *next;
  30. };
  31. struct qsreactor{
  32. int epfd;
  33. int blkcnt;
  34. struct qseventblock *evblk;
  35. };
  36. int recv_cb(int fd, int events, void *arg);
  37. int send_cb(int fd, int events, void *arg);
  38. struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd);
  39. void qs_event_set(struct qsevent *ev, int fd, NCALLBACK callback, void *arg)
  40. {
  41. ev->events = 0;
  42. ev->fd = fd;
  43. ev->arg = arg;
  44. ev->callback = callback;
  45. ev->last_active = time(NULL);
  46. return;
  47. }
  48. int qs_event_add(int epfd, int events, struct qsevent *ev)
  49. {
  50. struct epoll_event epv = {0, {0}};;
  51. epv.events = ev->events = events;
  52. epv.data.ptr = ev;
  53. if(ev->status == 1)
  54. {
  55. if(epoll_ctl(epfd, EPOLL_CTL_MOD, ev->fd, &epv) < 0)
  56. {
  57. perror("EPOLL_CTL_MOD error\n");
  58. return -1;
  59. }
  60. }
  61. else if(ev->status == 0)
  62. {
  63. if(epoll_ctl(epfd, EPOLL_CTL_ADD, ev->fd, &epv) < 0)
  64. {
  65. perror("EPOLL_CTL_ADD error\n");
  66. return -2;
  67. }
  68. ev->status = 1;
  69. }
  70. return 0;
  71. }
  72. int qs_event_del(int epfd, struct qsevent *ev)
  73. {
  74. struct epoll_event epv = {0, {0}};
  75. if(ev->status != 1)
  76. return -1;
  77. ev->status = 0;
  78. epv.data.ptr = ev;
  79. if((epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &epv)))
  80. {
  81. perror("EPOLL_CTL_DEL error\n");
  82. return -1;
  83. }
  84. return 0;
  85. }
  86. int sock(short port)
  87. {
  88. int fd = socket(AF_INET, SOCK_STREAM, 0);
  89. fcntl(fd, F_SETFL, O_NONBLOCK);
  90. struct sockaddr_in ser_addr;
  91. memset(&ser_addr, 0, sizeof(ser_addr));
  92. ser_addr.sin_addr.s_addr = htonl(INADDR_ANY);
  93. ser_addr.sin_family = AF_INET;
  94. ser_addr.sin_port = htons(port);
  95. bind(fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
  96. if(listen(fd, 20) < 0)
  97. perror("listen error\n");
  98. printf("listener[%d] lstening..\n", fd);
  99. return fd;
  100. }
  101. int qsreactor_init(struct qsreactor *reactor)
  102. {
  103. if(reactor == NULL)
  104. return -1;
  105. memset(reactor, 0, sizeof(struct qsreactor));
  106. reactor->epfd = epoll_create(1);
  107. if(reactor->epfd <= 0)
  108. {
  109. perror("epoll_create error\n");
  110. return -1;
  111. }
  112. struct qseventblock *block = (struct qseventblock*)malloc(sizeof(struct qseventblock));
  113. if(block == NULL)
  114. {
  115. printf("blockinit malloc error\n");
  116. close(reactor->epfd);
  117. return -2;
  118. }
  119. memset(block, 0, sizeof(block));
  120. struct qsevent *evs = (struct qsevent*)malloc(MAX_EPOLLSIZE * sizeof(struct qsevent));
  121. if(evs == NULL)
  122. {
  123. printf("evsnit malloc error\n");
  124. close(reactor->epfd);
  125. return -3;
  126. }
  127. memset(evs, 0, sizeof(evs));
  128. block->next = NULL;
  129. block->eventsarrry = evs;
  130. reactor->blkcnt = 1;
  131. reactor->evblk = block;
  132. return 0;
  133. }
  134. int qsreactor_alloc(struct qsreactor *reactor)
  135. {
  136. if(reactor == NULL)return -1;
  137. if(reactor->evblk == NULL)return -1;
  138. struct qseventblock *tailblock = reactor->evblk;
  139. while(tailblock->next != NULL)
  140. tailblock = tailblock->next;
  141. struct qseventblock *newblock = (struct qseventblock*)malloc(sizeof(struct qseventblock));
  142. if(newblock == NULL)
  143. {
  144. printf("newblock alloc error\n");
  145. return -1;
  146. }
  147. memset(newblock, 0, sizeof(newblock));
  148. struct qsevent *neweventarray = (struct qsevent*)malloc(sizeof(struct qsevent) * MAX_EPOLLSIZE);
  149. if(neweventarray == NULL)
  150. {
  151. printf("neweventarray malloc error\n");
  152. return -1;
  153. }
  154. memset(neweventarray, 0, sizeof(neweventarray));
  155. newblock->eventsarrry = neweventarray;
  156. newblock->next = NULL;
  157. tailblock->next = newblock;
  158. reactor->blkcnt++;
  159. return 0;
  160. }
  161. struct qsevent *qsreactor_idx(struct qsreactor *reactor, int sockfd)
  162. {
  163. int index = sockfd / MAX_EPOLLSIZE;
  164. while(index >= reactor->blkcnt)qsreactor_alloc(reactor);
  165. int i=0;
  166. struct qseventblock *idxblock = reactor->evblk;
  167. while(i++<index && idxblock != NULL)
  168. idxblock = idxblock->next;
  169. return &idxblock->eventsarrry[sockfd%MAX_EPOLLSIZE];
  170. }
  171. int qsreactor_destory(struct qsreactor *reactor)
  172. {
  173. close(reactor->epfd);
  174. free(reactor->evblk);
  175. reactor = NULL;
  176. return 0;
  177. }
  178. int qsreactor_addlistener(struct qsreactor *reactor, int sockfd, NCALLBACK acceptor)
  179. {
  180. if(reactor == NULL)return -1;
  181. if(reactor->evblk == NULL)return -1;
  182. struct qsevent *event = qsreactor_idx(reactor, sockfd);
  183. qs_event_set(event, sockfd, acceptor, reactor);
  184. qs_event_add(reactor->epfd, EPOLLIN, event);
  185. return 0;
  186. }
  187. int send_cb(int fd, int events, void *arg)
  188. {
  189. struct qsreactor *reactor = (struct qsreactor*)arg;
  190. struct qsevent *ev = qsreactor_idx(reactor, fd);
  191. int ret = send(fd, ev->buffer, ev->length, 0);
  192. if(ret < 0)
  193. {
  194. qs_event_del(reactor->epfd, ev);
  195. printf("clent[%d] ", fd);
  196. perror("send error\n");
  197. close(fd);
  198. }
  199. else if(ret > 0)
  200. {
  201. printf("send to client[%d]:%s", fd, ev->buffer);
  202. qs_event_del(reactor->epfd, ev);
  203. qs_event_set(ev, fd, recv_cb, reactor);
  204. qs_event_add(reactor->epfd, EPOLLIN, ev);
  205. }
  206. return ret;
  207. }
  208. int recv_cb(int fd, int events, void *arg)
  209. {
  210. struct qsreactor *reactor = (struct qsreactor*)arg;
  211. struct qsevent *ev = qsreactor_idx(reactor, fd);
  212. int len = recv(fd, ev->buffer, MAX_BUFLEN, 0);
  213. qs_event_del(reactor->epfd, ev);
  214. if(len > 0)
  215. {
  216. ev->length = len;
  217. ev->buffer[len] = '\0';
  218. printf("client[%d]:%s", fd, ev->buffer);
  219. qs_event_del(reactor->epfd, ev);
  220. qs_event_set(ev, fd, send_cb, reactor);
  221. qs_event_add(reactor->epfd, EPOLLOUT, ev);
  222. }
  223. else if(len == 0)
  224. {
  225. qs_event_del(reactor->epfd, ev);
  226. close(fd);
  227. printf("client[%d] close\n", fd);
  228. }
  229. else
  230. {
  231. qs_event_del(reactor->epfd, ev);
  232. printf("client[%d]", fd);
  233. perror("reacv error,\n");
  234. close(fd);
  235. }
  236. return 0;
  237. }
  238. int accept_cb(int fd, int events, void *arg)
  239. {
  240. struct qsreactor *reactor = (struct qsreactor*)arg;
  241. if(reactor == NULL)return -1;
  242. struct sockaddr_in client_addr;
  243. socklen_t len = sizeof(client_addr);
  244. int clientfd;
  245. if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1)
  246. {
  247. if(errno != EAGAIN && errno != EINTR)
  248. {}
  249. perror("accept error\n");
  250. return -1;
  251. }
  252. int flag = 0;
  253. if((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0)
  254. {
  255. printf("fcntl noblock error, %d\n",MAX_BUFLEN);
  256. return -1;
  257. }
  258. struct qsevent *event = qsreactor_idx(reactor, clientfd);
  259. qs_event_set(event, clientfd, recv_cb, reactor);
  260. qs_event_add(reactor->epfd, EPOLLIN, event);
  261. printf("new connect [%s:%d], pos[%d]\n",
  262. inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
  263. return 0;
  264. }
  265. int qsreactor_run(struct qsreactor *reactor)
  266. {
  267. if(reactor == NULL)
  268. return -1;
  269. if(reactor->evblk == NULL)
  270. return -1;
  271. if(reactor->epfd < 0)
  272. return -1;
  273. struct epoll_event events[MAX_EPOLL_EVENTS + 1];
  274. while(1)
  275. {
  276. int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
  277. if(nready < 0)
  278. {
  279. printf("epoll_wait error\n");
  280. continue;
  281. }
  282. for(int i=0; i<nready; i++)
  283. {
  284. struct qsevent *ev = (struct qsevent*)events[i].data.ptr;
  285. if((events[i].events & EPOLLIN) && (ev->events & EPOLLIN))
  286. {
  287. ev->callback(ev->fd, events[i].events, ev->arg);
  288. }
  289. if((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
  290. {
  291. ev->callback(ev->fd, events[i].events, ev->arg);
  292. }
  293. }
  294. }
  295. }
  296. int main(int argc, char **argv)
  297. {
  298. unsigned short port = atoi(argv[1]);
  299. int sockfd = sock(port);
  300. struct qsreactor *reactor = (struct qsreactor*)malloc(sizeof(struct qsreactor));
  301. qsreactor_init(reactor);
  302. qsreactor_addlistener(reactor, sockfd, accept_cb);
  303. qsreactor_run(reactor);
  304. qsreactor_destory(reactor);
  305. close(sockfd);
  306. }
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号