赞
踩
<input name="url" value="{{url}}" bindinput ="urlInput"/>
<button size='mini' type="warn">断开连接</button>
<button size='mini' type="primary" bindtap="connectSocket">开启连接</button>
<textarea placeholder="输入发送内容" bindinput ="msgInput"></textarea>
<button size='mini' type="primary" bindtap="sendMsg">发送</button>
<view wx:for="{{msgs}}">{{index}}: {{item}}</view>
界面效果:
数据部分包含三个字段:
url:字符串类型,代表WebSocket服务器地址;
msgs:数组类型,用于存储发送和接收的消息;
msg:字符串类型,用于保存发送的内容;
另外还定义了三个函数:
connectSocket:提供了连接WebSocket服务的功能;
msgInput:获取用户输入的内容;
sendMsg:实现了向远程WebSocket发送消息的功能;
Page({ data: { url: 'ws://localhost:8888/ws', msgs: [], msg: '', } // 连接WebSocket服务 connectSocket() { let _this = this; // 连接websocket服务 let task = wx.connectSocket({ url: _this.data.url }); // 监听websocket消息,并将接收到的消息添加到消息数组msgs中 task.onMessage(function(res) { _this.setData({ msgs: [..._this.data.msgs, "接收到消息 -> " + res.data] }); }); // 保存websocket实例 _this.setData({ socketTask: task, msgs: [..._this.data.msgs,"连接成功!"] }); }, // 获取输入内容,并临时保存在msg中 msgInput(e) { this.setData({ msg: e.detail.value }); }, // 发送消息 sendMsg() { // 1.获取输入内容 let msg = this.data.msg; // 2.发送消息到WebSocket服务端 this.data.socketTask.send({ data: msg }); } })
在从HTTP或HTTPS协议切换到WebSocket时,将会使用一种称为升级握手的机制。因此我们的WebSocket服务端程序将始终以HTTP作为开始,然后再执行升级。其约定为:如果被请求的URL以/ws
结尾,那么我们将会把该协议升级为WebSocket;否则,服务器将使用基本的HTTP。当连接升级完毕后,所有数据都将会使用WebSocket进行传输(如下图)。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>io.netty</groupId> <artifactId>NettyWebSocket</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.48.Final</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
1)定义一个专门处理Http协议的处理器,当浏览器第一次连接时候会读取首页的html文件,并将html文件内容返回给浏览器展示。
package io.netty.websocket; import io.netty.channel.*; import io.netty.handler.codec.http.*; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedNioFile; import java.io.File; import java.io.RandomAccessFile; import java.net.URISyntaxException; import java.net.URL; // 处理Http协议的Handler,该Handler只会在第一次客户端连接时候有用。 public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private final String wsUri; private static final File INDEX; static { URL location = HttpRequestHandler.class.getProtectionDomain() .getCodeSource().getLocation(); try { String path = location.toURI() + "index.html"; path = !path.contains("file:") ? path : path.substring(5); INDEX = new File(path); } catch (URISyntaxException e) { throw new IllegalStateException("Unable to locate index.html", e); } } public HttpRequestHandler(String wsUri) { this.wsUri = wsUri; } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { // 如果被请求的 URL 以/ws 结尾,那么我们将会把该协议升级为 WebSocket。 if (wsUri.equalsIgnoreCase(request.getUri())) { // 将请求传递给下一个ChannelHandler,即WebSocketServerProtocolHandler处理 // request.retain()会增加引用计数器,以防止资源被释放 ctx.fireChannelRead(request.retain()); return; } handleHttpRequest(ctx, request); } /** * 该方法读取首页html文件内容,然后将内容返回给客户端展示 * @param ctx * @param request * @throws Exception */ private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { // HTTP1.1协议允许客户端先判定服务器是否愿意接受客户端发来的消息主体,以减少由于服务器拒绝请求所带来的额外资源开销 if (HttpHeaders.is100ContinueExpected(request)) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.writeAndFlush(response); } // 从resources目录读取index.html文件 RandomAccessFile file = new RandomAccessFile(INDEX, "r"); // 准备响应头信息 HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8"); boolean keepAlive = HttpHeaders.isKeepAlive(request); if (keepAlive) { response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length()); response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); } ctx.write(response); // 输出html文件内容 ctx.write(new ChunkedNioFile(file.getChannel())); // 最后发送一个LastHttpContent来标记响应的结束 ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); // 如果不是长链接,则在写操作完成后关闭Channel if (!keepAlive) { future.addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
2)定义一个处理器,负责处理所有委托管理的WebSocket帧类型以及升级握手本身。如果握手成功,则所需的ChannelHandler将会被添加到ChannelPipeline中,而那些不需要的ChannelHandler会被移除掉。
WebSocket升级完成后,WebSocketServerProtocolHandler会把HttpRequestDecoder替换为WebSocketFrameDecoder,把HttpResponseEncoder替换为WebSocketFrameEncoder。为了性能最大化,WebSocketServerProtocolHandler会移除任何不再被WebSocket连接所需要的ChannelHandler,其中包括 HttpObjectAggregator 和 HttpRequestHandler。
实现代码:
package io.netty.websocket; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; // 处理WebSocket协议的Handler public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private final ChannelGroup channelGroup; public TextWebSocketFrameHandler(ChannelGroup channelGroup) { this.channelGroup = channelGroup; } // 用户事件监听,每次客户端连接时候自动触发 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { String content = "Client " + ctx.channel().remoteAddress().toString().substring(1) + " joined"; System.out.println(content); // 如果是握手完成事件,则从Pipeline中删除HttpRequestHandler,并将当前channel添加到ChannelGroup中 if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { // 从Pipeline中删除HttpRequestHandler ctx.pipeline().remove(HttpRequestHandler.class); // 通知所有已连接的WebSocket客户端,新的客户端已经连接上了 TextWebSocketFrame msg = new TextWebSocketFrame(content); channelGroup.writeAndFlush(msg); // 将WebSocket Channel添加到ChannelGroup中,以便可以它接收所有消息 channelGroup.add(ctx.channel()); } else { super.userEventTriggered(ctx, evt); } } // 每次客户端发送消息时执行 @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg) throws Exception { System.out.println("读取到的消息:" + msg.retain()); // 将读取到的消息写到ChannelGroup中所有已经连接的客户端 channelGroup.writeAndFlush(msg.retain()); } }
上面userEventTriggered
方法监听用户事件。当有客户端连接时候,会自动执行该方法。而channelRead0
方法负责读取客户端发送过来的消息,然后通过channelGroup
将消息输出到所有已连接的客户端。
定义一个ChannelInitializer的子类,其主要目的是在某个 Channel 注册到 EventLoop 后,对这个 Channel 执行一些初始化操作。
package io.netty.websocket; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.group.ChannelGroup; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; public class ChatServerInitializer extends ChannelInitializer<Channel> { private final ChannelGroup channelGroup; public ChatServerInitializer(ChannelGroup channelGroup) { this.channelGroup = channelGroup; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); // 安装编解码器,以实现对HttpRequest、 HttpContent、LastHttp-Content与字节之间的编解码 pipeline.addLast(new HttpServerCodec()); // 专门处理写文件的Handler pipeline.addLast(new ChunkedWriteHandler()); // Http聚合器,可以让pipeline中下一个Channel收到完整的HTTP信息 pipeline.addLast(new HttpObjectAggregator(64 * 1024)); // 处理Http协议的ChannelHandler,只会在客户端第一次连接时候有用 pipeline.addLast(new HttpRequestHandler("/ws")); // 升级Websocket后,使用该 ChannelHandler 处理Websocket请求 pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // 安装专门处理 Websocket TextWebSocketFrame 帧的处理器 pipeline.addLast(new TextWebSocketFrameHandler(channelGroup)); } }
package io.netty.websocket; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.ImmediateEventExecutor; import java.net.InetSocketAddress; public class ChatServer { public void start() { ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChatServerInitializer(channelGroup)); ChannelFuture future = bootstrap.bind(new InetSocketAddress(8888)).syncUninterruptibly(); System.out.println("Starting ChatServer on port 8888 ..."); future.channel().closeFuture().syncUninterruptibly(); } finally { channelGroup.close(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new ChatServer().start(); } }
该html文件提供网页版的WebSocket客户端页面。在src/main/resources
目录下新建一个html文件。
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>WebSocket Chat</title> </head> <body> <form οnsubmit="return false;"> <h3>WebSocket 聊天室:</h3> <textarea id="responseText" style="width: 500px; height: 300px;"></textarea><br/> <input type="text" name="message" style="width: 300px" value="Hello Netty"/> <input type="button" value="发送消息" onclick="send(this.form.message.value)"/> <input type="button" value="清空聊天记录" onclick="clearScreen()"/> </form> <script type="text/javascript"> var socket; if (!window.WebSocket) { window.WebSocket = window.MozWebSocket; } if (window.WebSocket) { socket = new WebSocket("ws://localhost:8888/ws"); // 注意:使用tls协议通信时候,协议名为wss // socket = new WebSocket("wss://localhost:8443/ws"); socket.onopen = function(event) { var ta = document.getElementById('responseText'); ta.value = "连接开启!"; }; socket.onclose = function(event) { var ta = document.getElementById('responseText'); ta.value = ta.value + '\n' + "连接被关闭!"; }; socket.onmessage = function(event) { var ta = document.getElementById('responseText'); ta.value = ta.value + '\n' + event.data; }; } else { alert("你的浏览器不支持 WebSocket!"); } function send(message) { if (!window.WebSocket) { return; } if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("连接没有开启."); } } function clearScreen() { document.getElementById('responseText').value = ""; } </script> </body> </html>
界面效果:
最终效果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。