赞
踩
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.77.Final</version>
</dependency>
package com.message.after; import com.message.websocket.WebSocketServer; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; /** * @author kuaiting */ @Component public class AfterExecuteMethods implements CommandLineRunner { /** * 项目启动之后立即执行的方法,可以做些初始化项目的操作以及需要启动项目立即执行的任务 * @param args * @throws Exception */ @Override public void run(String... args) throws Exception { /** * 启动WebSocketServer 服务使用netty实现 */ new WebSocketServer().start(); } }
package com.message.websocket; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @author kuaiting * WebSocket */ public class WebSocketServer { public void start() { // 一个主线程组 NioEventLoopGroup mainGroup = new NioEventLoopGroup(); //一个工作线程组 NioEventLoopGroup subGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(mainGroup, subGroup) //设置队列大小 .option(ChannelOption.SO_BACKLOG, 1024) .channel(NioServerSocketChannel.class) // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 .childOption(ChannelOption.SO_KEEPALIVE, true) //添加自定义初始化处理器 .childHandler(new WsServerInitialzer()); ChannelFuture channelFuture = serverBootstrap.bind(8082).sync(); channelFuture.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { //关闭主线程组 mainGroup.shutdownGracefully(); //关闭工作线程组 subGroup.shutdownGracefully(); } } }
package com.message.websocket; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; /** * @author kuaiting */ public class WsServerInitialzer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //websocket基于http协议,所以需要http编解码器 pipeline.addLast(new HttpServerCodec()); //添加对于读写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //对httpMessage进行聚合 pipeline.addLast(new HttpObjectAggregator(1024*64)); // ================= 上述是用于支持http协议的 ============== //websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址 //比如处理一些握手动作(ping,pong) pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); //自定义handler pipeline.addLast(new ChatHandler()); } }
package com.message.websocket; import com.alibaba.fastjson.JSON; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; /** * @author kuaiting */ @Slf4j public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("已创建WebSocket链接:{}", ctx.channel().remoteAddress()); } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { String text = msg.text(); sendAllMessages(ctx,text); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { channels.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info("断开链接的ID", ctx.channel().id().asLongText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.channel().closeFuture(); } //给每个人发送消息,除发消息人外 private void sendAllMessages(ChannelHandlerContext ctx, String msg) { for (Channel channel : channels) { if (!channel.id().asLongText().equals(ctx.channel().id().asLongText())) { channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(msg))); } } } //给每个人发送消息,除发消息人外 private void sendMessages(String msg) { for (Channel channel : channels) { channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(msg))); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。