赞
踩
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.6</version> <relativePath/> </parent> <groupId>com.xhom</groupId> <artifactId>io-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>io-demo</name> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.86.Final</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.7.6</version> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>
netty:
server:
port: 8090
bossThreads: 1
package com.xhom.iodemo.server; import com.xhom.iodemo.init.SocketChannelInitializer; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Objects; /** * @author visy.wang * @description: 服务端 * @date 2022/12/14 15:04 */ @Slf4j public class SocketServer { private Integer port; private Integer boosThreads = 1; private Integer workerThreads; public SocketServer() throws InterruptedException{ init(); } public SocketServer(Integer port, Integer boosThreads) throws InterruptedException{ this.port = port; this.boosThreads = boosThreads; this.init(); } public SocketServer(Integer port, Integer boosThreads, Integer workerThreads) throws InterruptedException{ this.port = port; this.boosThreads = boosThreads; this.workerThreads = workerThreads; this.init(); } private void init() throws InterruptedException { //1.创建一个服务端的引导类 ServerBootstrap bootstrap = new ServerBootstrap(); //2.创建反应器事件轮询组 //boss轮询组(负责处理父通道:连接/接收监听(如NioServerSocketChannel)) EventLoopGroup bossGroup = new NioEventLoopGroup(boosThreads); //worker轮询组(负责处理子通道:读/写监听(如NioSocketChannel)) EventLoopGroup workerGroup; if(Objects.nonNull(this.workerThreads) && this.workerThreads > 0){ workerGroup= new NioEventLoopGroup(this.workerThreads); }else{ //线程数默认为cpu核心数的2倍 workerGroup = new NioEventLoopGroup(); } //3.设置父子轮询组 bootstrap.group(bossGroup, workerGroup); //4.设置传输通道类型,Netty不仅支持Java NIO,也支持阻塞式的OIO bootstrap.channel(NioServerSocketChannel.class); //5.设置监听端口 bootstrap.localAddress(new InetSocketAddress(this.port)); //6.设置通道参数 bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); //7.装配子通道的Pipeline流水线 bootstrap.childHandler(new SocketChannelInitializer()); //8.开始绑定端口,并通过调用sync()同步方法阻塞直到绑定成功 ChannelFuture channelFuture = bootstrap.bind().sync(); log.info("服务端启动成功,监听端口:{}", this.port); //9.自我阻塞,直到监听通道关闭 ChannelFuture closeFuture = channelFuture.channel().closeFuture(); closeFuture.sync(); log.info("服务端已停止运行"); //10.释放所有资源,包括创建的反应器线程 workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); log.info("已释放服务端占用资源"); } }
package com.xhom.iodemo.init; import com.xhom.iodemo.handler.SocketInboundHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; /** * @author visy.wang * @description: Socket通道初始化器 * @date 2022/12/14 14:37 */ public class SocketChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //添加对byte数组的编解码,是由netty提供的 pipeline.addLast(new ByteArrayDecoder()); //入站 pipeline.addLast(new ByteArrayEncoder()); //出站 //添加自己的入站处理器 pipeline.addLast(new SocketInboundHandler()); } }
package com.xhom.iodemo.handler; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; /** * @author visy.wang * @description: 自定义Socket入站处理器 * @date 2022/12/14 14:42 */ @Slf4j public class SocketInboundHandler extends ChannelInboundHandlerAdapter { //用于保存客户端的通道组 private static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 读取客户端发送来的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //由于我们配置的是字节数组编解码器(ByteArrayDecoder),所以这里取到的msg是byte数组 byte[] data = (byte[])msg; log.info("收到消息:{}", new String(data)); //给其他人转发消息 for (Channel client : clients) { if(!client.equals(ctx.channel())){ client.writeAndFlush(data); } } //无需在流水线继续传递 //super.channelRead(ctx, msg); } /** * 接入客户端 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("新客户端连接:{}", ctx.channel().id().asShortText()); clients.add(ctx.channel()); } /** * 断开客户端 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info("客户端连接断开:{}", ctx.channel().id().asShortText()); clients.remove(ctx.channel()); } /** * 异常处理 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("客户端异常:{}", cause.getMessage(), cause); ctx.channel().close(); clients.remove(ctx.channel()); } }
package com.xhom.iodemo.starter; import com.xhom.iodemo.server.SocketServer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; /** * @author visy.wang * @description: 服务启动器 * @date 2022/12/14 15:23 */ @Slf4j @Component public class ServerStarter implements ApplicationRunner { @Value("${netty.server.port}") private Integer port; @Value("${netty.server.bossThreads}") private Integer bossThreads; @Value("${netty.server.workerThreads:-1}") private Integer workerThreads; @Override public void run(ApplicationArguments args) throws Exception { try{ new SocketServer(port, bossThreads, workerThreads); }catch (Exception e){ log.info("服务端启动异常:{}", e.getMessage(), e); } } }
2022-12-14 18:04:59.624 INFO 12548 --- [ restartedMain] com.xhom.iodemo.server.SocketServer : 服务端启动成功,监听端口:8090
略。可以自行编写,NIO或者Socket编程都可以。
2022-12-14 18:06:06.178 INFO 12548 --- [ntLoopGroup-3-1] c.x.iodemo.handler.SocketInboundHandler : 新客户端连接:e178d48b
2022-12-14 18:06:11.350 INFO 12548 --- [ntLoopGroup-3-1] c.x.iodemo.handler.SocketInboundHandler : 收到消息:张三: 你好
2022-12-14 18:08:19.646 INFO 12548 --- [ntLoopGroup-3-1] c.x.iodemo.handler.SocketInboundHandler : 客户端异常:远程主机强迫关闭了一个现有的连接。
2022-12-14 18:08:19.647 INFO 12548 --- [ntLoopGroup-3-1] c.x.iodemo.handler.SocketInboundHandler : 客户端连接断开:e178d48b
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。