赞
踩
**
**
Netty支持多种网络通信模型,包括传统的阻塞I/O、非阻塞I/O、多路复用I/O和异步I/O。其中,非阻塞I/O和多路复用I/O是Netty的核心特性。
非阻塞I/O:Netty通过使用Java的NIO(New I/O)库,实现了非阻塞的I/O操作。这意味着当一个操作正在进行时,不会阻塞线程,线程可以继续处理其他任务。这种模型非常适合高并发的网络应用程序,可以提供更高的吞吐量和并发性能。
多路复用I/O:Netty使用了Reactor模式,通过一个线程池处理多个I/O事件,提高了系统的资源利用率。Netty的多路复用I/O模型可以同时处理成千上万个连接,而每个连接只需使用一个线程,极大地减少了线程的创建和上下文切换开销。
Netty使用了事件驱动的编程模型来处理网络操作。它提供了一系列的事件和处理器,开发者可以根据自己的需求自定义处理逻辑。
事件:Netty的核心是事件,它代表了网络操作中的各种状态和动作,如连接建立、数据接收、数据发送等。Netty提供了多种类型的事件,每个事件都有对应的处理器进行处理。
处理器:处理器是用于处理特定类型事件的组件,它包含了业务逻辑的实现。开发者可以通过编写自定义的处理器来实现特定的功能。处理器可以被链接成处理器链,形成一个处理事件的流水线。
通过使用事件和处理器,开发者可以实现复杂的网络应用程序,例如实现自定义的协议、数据解析、安全认证等。
Netty提供了灵活的协议设计和实现能力,使开发者可以轻松构建自己的网络协议。
编解码器:Netty提供了一系列的编解码器,用于将字节数据和Java对象相互转换。这些编解码器可以用于构建自定义协议,实现数据的序列化和反序列化。
自定义协议:通过使用Netty的事件和处理器机制,开发者可以自定义网络协议。可以根据自己的需求定义协议的消息格式、数据传输方式、错误处理等。
Netty提供了许多性能优化的功能和技术,以提高网络应用程序的性能和可扩展性。
零拷贝:Netty使用了零拷贝技术,避免了数据在内存之间的复制操作,减少了CPU和内存的开销。这对于处理大量数据的高性能应用程序尤为重要。
内存管理:Netty提供了高效的内存管理机制,可以有效地管理内存的分配和释放。它使用了内存池和内存复用的技术,减少了内存的分配和回收频率,提高了性能。
高性能传输:Netty支持多种高性能传输协议,如TCP、UDP和Unix域套接字。开发者可以根据自己的需求选择合适的传输协议,以获得最佳的性能。
在本篇博客中,我们将学习如何使用Netty框架实现基于Socket的异步通信。Netty是一个高性能、异步事件驱动的网络应用程序框架,它简化了网络编程的复杂性,并提供了可靠的、高效的数据传输。
在开始之前,确保您已经安装了Java和Maven。然后,按照以下步骤进行操作:
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.78.Final</version>
</dependency>
</dependencies>
在Server.java中,我们将创建一个服务端,并监听指定的端口。当客户端连接到服务器时,我们将打印出连接成功的消息,并向客户端发送一条欢迎消息。
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class Server { private int port; public Server(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 这里可以流水线式加载各种Handler,以实现预期的功能 ch.pipeline().addLast(new ServerHandler()); } }); ChannelFuture f = b.bind(port).sync(); System.out.println("Server started on port " + port); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; new Server(port).start(); } }
在上述代码中,我们创建了两个EventLoopGroup:bossGroup和workerGroup。bossGroup负责接受客户端的连接,而workerGroup负责处理客户端的请求。
我们使用ServerBootstrap配置和启动服务器。在childHandler方法中,我们初始化通道的处理器链,并添加了一个名为ServerHandler的处理器。
通过bind方法绑定服务器端口,并通过sync方法等待服务器启动完成。
在Client.java中,我们将创建一个客户端,并连接到指定的服务器。一旦连接成功,我们将向服务器发送一条消息,并打印出服务器返回的响应消息。
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { private String host; private int port; public Client(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture f = b.connect(host, port).sync(); System.out.println("Connected to server: " + host + ":" + port); // 发送消息给服务器 String message = "Hello, server!"; f.channel().writeAndFlush(message); // 等待直到连接关闭 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { String host = "localhost"; int port = 8080; new Client(host, port).start(); } }
在上述代码中,我们创建了一个EventLoopGroup,并使用Bootstrap配置和启动客户端。在handler方法中,我们初始化通道的处理器链,并添加了一个名为ClientHandler的处理器。
通过connect方法连接到服务器,并通过sync方法等待连接完成。
在连接建立后,我们向服务器发送一条消息,并使用writeAndFlush方法将其发送给服务器。
最后,我们通过调用closeFuture方法等待连接关闭。
在ServerHandler.java中,我们编写一个处理器,用于处理从客户端接收到的消息并发送响应消息。
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 读取客户端发送的消息 ByteBuf buf = (ByteBuf) msg; String message = buf.toString(); System.out.println("Received message from client: " + message); // 发送响应消息给客户端 String response = "Hello, client!"; ByteBuf responseBuf = ctx.alloc().buffer(response.length()); responseBuf.writeBytes(response.getBytes()); ctx.writeAndFlush(responseBuf); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
在上述代码中,我们重写了channelRead方法,用于读取客户端发送的消息。我们将接收到的消息转换为字符串,并打印出来。
然后,我们准备发送响应消息给客户端。首先,我们创建一个ByteBuf来存储响应消息的字节数据。然后,将字符串转换为字节数组,并将其写入ByteBuf中。
最后,我们使用ctx.writeAndFlush方法将响应消息发送给客户端。
在ClientHandler.java中,我们编写一个处理器,用于处理从服务器接收到的响应消息。
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 读取服务器发送的响应消息 ByteBuf buf = (ByteBuf) msg; String response = buf.toString(); System.out.println("Received response from server: " + response); // 关闭连接 ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
在上述代码中,我们重写了channelRead方法,用于读取服务器发送的响应消息。我们将接收到的消息转换为字符串,并打印出来。
然后,我们调用ctx.close方法关闭与服务器的连接。
现在,我们已经完成了Netty的Socket网络编程示例。在终端中,分别运行Server和Client的main方法,您将看到服务器和客户端之间的通信。
这就是使用Netty实现Socket网络编程的基本过程。通过使用Netty,我们可以轻松地构建高性能的、可靠的网络应用程序。
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class Server { private int port; public Server(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 这里可以流水线式加载各种Handler,以实现预期的功能 ch.pipeline().{添加后续的Http处理器(见下文)} } }); ChannelFuture f = b.bind(port).sync(); System.out.println("Server started on port " + port); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; new Server(port).start(); } }
如在这里添加一个对于页面的解决方案,创建一个新的类封装;
import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import java.io.IOException; import java.io.InputStream; public class PageResolver { //直接单例模式 private static final PageResolver INSTANCE = new PageResolver(); private PageResolver(){} public static PageResolver getInstance(){ return INSTANCE; } //请求路径给进来,接着我们需要将页面拿到,然后转换成响应数据包发回去 public FullHttpResponse resolveResource(String path){ if(path.startsWith("/")) { //判断一下是不是正常的路径请求 path = path.equals("/") ? "index.html" : path.substring(1); //如果是直接请求根路径,那就默认返回index页面,否则就该返回什么路径的文件就返回什么 try(InputStream stream = this.getClass().getClassLoader().getResourceAsStream(path)) { if(stream != null) { //拿到文件输入流之后,才可以返回页面 byte[] bytes = new byte[stream.available()]; stream.read(bytes); return this.packet(HttpResponseStatus.OK, bytes); //数据先读出来,然后交给下面的方法打包 } } catch (IOException e){ e.printStackTrace(); } } //其他情况一律返回404 return this.packet(HttpResponseStatus.NOT_FOUND, "404 Not Found!".getBytes()); } //包装成FullHttpResponse,把状态码和数据写进去 private FullHttpResponse packet(HttpResponseStatus status, byte[] data){ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status); response.content().writeBytes(data); return response; } }
然后把流水线中加入这个Http的解码器和译码器,以及一个ChannelInboundHandlerAdapter,处理解析到的数据;
ch.pipeline()
.addLast(new HttpRequestDecoder()) //Http请求解码器
.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)) //搞一个聚合器,将内容聚合为一个FullHttpRequest,参数是最大内容长度
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest request = (FullHttpRequest) msg;
//请求进来了直接走解析
PageResolver resolver = PageResolver.getInstance();
ctx.channel().writeAndFlush(resolver.resolveResource(request.uri()));
ctx.channel().close();
}
})
.addLast(new HttpResponseEncoder());
可以在添加一个自定义组件例如:
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.EventLoopGroup; import io.netty.handler.codec.http.FullHttpRequest; import java.util.Objects; public class ShutdownHandler extends ChannelInboundHandlerAdapter { public EventLoopGroup bootstrap; public EventLoopGroup workstrap; public ShutdownHandler(EventLoopGroup bootstrap,EventLoopGroup workstrap){ this.bootstrap = bootstrap; this.workstrap = workstrap; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { FullHttpRequest request = (FullHttpRequest) msg; String uri = request.uri(); // 关闭服务 if(Objects.equals(uri, "/shutdown")){ ctx.channel().close(); bootstrap.shutdownGracefully(); workstrap.shutdownGracefully(); }else{ // 继续向下传递消息 ctx.fireChannelRead(msg); } } }
Server的总代码:
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.example.handler.PageResolver; import org.example.handler.ShutdownHandler; public class Server { private final int port; public Server(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 这里可以流水线式加载各种Handler,以实现预期的功能 ch.pipeline() .addLast(new HttpRequestDecoder()) //Http请求解码器 .addLast(new HttpObjectAggregator(Integer.MAX_VALUE)) //搞一个聚合器,将内容聚合为一个FullHttpRequest,参数是最大内容长度 .addLast(new ShutdownHandler(bossGroup, workerGroup)) .addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { FullHttpRequest request = (FullHttpRequest) msg; //请求进来了直接走解析 PageResolver resolver = PageResolver.getInstance(); ctx.channel().writeAndFlush(resolver.resolveResource(request.uri())); ctx.channel().close(); } }) .addLast(new HttpResponseEncoder()); } }); ChannelFuture f = b.bind(port).sync(); System.out.println("Server started on port " + port); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8081; new Server(port).start(); } }
最后启动服务器,就可以访问到resource中的静态页面资源了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。