赞
踩
Netty是一个异步事件驱动的网络应用框架,可快速开发可维护的高性能协议服务器和客户端。基于NIO实现的高性能网络IO框架,极大简化基于常用网络协议的编程(TCP、UDP等)。
tcpServer.port=8899
package com.zkzp.eage.netty.server; import com.zkzp.eage.netty.handle.HeatBeatHandler; import com.zkzp.eage.netty.handle.NettyServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @Slf4j @Component public class NettyTcpServer { @Value("${tcpServer.port}") private int tcpServerPort; @Autowired private NettyServerHandler nettyServerHandler; private static boolean isStart = false; EventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); public boolean serverStart(){ try{ if(isStart) return true; ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); bootstrap.option(ChannelOption.SO_BACKLOG, 2048).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.channel(NioServerSocketChannel.class); ChannelInitializer<SocketChannel> channelChannelInitializer = new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new IdleStateHandler(40, 0, 0, TimeUnit.MINUTES)); pipeline.addLast(new HeatBeatHandler()); pipeline.addLast(new LineBasedFrameDecoder(1024)); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(nettyServerHandler); } }; bootstrap.childHandler(channelChannelInitializer); ChannelFuture channelFuture = bootstrap.bind(tcpServerPort).sync(); log.info("Netty Tcp Server start success on port:{}", tcpServerPort); isStart = true; return true; }catch(Exception e){ e.printStackTrace(); return false; } } public synchronized boolean serverStop() { try { if (!isStart) return true; Future<?> future = this.workerGroup.shutdownGracefully().await(); if (!future.isSuccess()) { log.error("<---- netty workerGroup cannot be stopped", future.cause()); return false; } future = this.bossGroup.shutdownGracefully().await(); if (!future.isSuccess()) { log.error("<---- netty bossGroup cannot be stopped", future.cause()); return false; } log.info("关闭Netty Tcp 服务端成功"); isStart = false; return true; } catch (Exception e) { e.printStackTrace(); return false; } } }
package com.zkzp.eage.netty.handle; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; public class HeatBeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (!(evt instanceof IdleStateEvent)) { super.userEventTriggered(ctx, evt); return; } if (((IdleStateEvent) evt).state() == IdleState.READER_IDLE) { ctx.disconnect(); } } }
package com.zkzp.eage.netty.handle; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Component @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { private static final ConcurrentHashMap<String, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { //在这里做业务处理 System.out.println("----------------"+msg.toString()); } @Override public void channelActive(ChannelHandlerContext ctx) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String channelId = ctx.channel().id().asLongText(); if (!CHANNEL_MAP.containsKey(channelId)) { CHANNEL_MAP.put(channelId, ctx); log.info("新的连接加入了:[{}]", channelId); } } @Override public void channelInactive(ChannelHandlerContext ctx) { String channelId = ctx.channel().id().asLongText(); if (CHANNEL_MAP.containsKey(channelId)) { //删除连接 CHANNEL_MAP.remove(channelId); log.info("连接已断开:[{}]", channelId); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); Channel channel = ctx.channel(); if (channel.isActive()) { ctx.close(); } } }
2022-09-28 16:49:21.491 INFO 8516 --- [ main] c.zkzp.eage.netty.server.NettyTcpServer : Netty Tcp Server start success on port:8899
2022-09-28 16:49:27.460 INFO 8516 --- [ntLoopGroup-3-1] c.z.e.netty.handle.NettyServerHandler : 新的连接加入了:[e00af6fffeb19725-00002144-00000001-28bb7b3a64721f9d-58f4af16]
----------------hello
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。