赞
踩
它是一个处理I/O操作的多线程事件循环。Netty为不同类型的传输提供了各种EventLoopGroup实现。在本例中,我们正在实现一个服务器端应用程序,因此将使用两个NioEventLoopGroup。第一个,通常被称为“老板”,接受传入的连接。第二个,通常称为“worker”,在老板接受连接后处理已接受的连接的流量,并将已接受的连接注册到work线程中。使用多少线程以及它们如何映射到创建的通道取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。
它是一个设置服务器的帮助类。您可以直接使用Channel设置服务器。但是,请注意,这是一个乏味的过程,大多数情况下您不需要这样做。
这里,我们指定使用NioServerSocketChannel类,该类用于实例化一个新的Channel来接受传入的连接。
它是一个特殊的处理程序,用于帮助用户配置新的Channel。您很可能希望通过添加一些处理程序(如DiscardServerHandler)来配置新Channel的ChannelPipeline,以实现您的网络应用程序。随着应用程序变得复杂,您可能会向管道添加更多的处理程序,并最终将这个匿名类提取到顶级类中。
您还可以设置特定于Channel实现的参数。我们正在编写一个TCP/IP服务器,因此允许我们设置套接字选项,如tcpNoDelay和keepAlive。
option()用于接收传入连接的NioServerSocketChannel。
childOption()用于父ServerChannel接受的通道,在本例中为NioSocketChannel。
package com.zs.netty.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Discards any incoming data. */ public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args.length > 0) { port = Integer.parseInt(args[0]); } new DiscardServer(port).run(); } }
package com.zs.netty.server; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; /** * Handles a server-side channel. */ public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) // @Override // public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) // // Discard the received data silently. // ((ByteBuf) msg).release(); // (3) // // // } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { // (1) System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); // (2) } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
插曲:mac 安装telnet
在使用brew install 安装 时,由于使用的是:https://homebrew.bintray.com/ 源,所以安装时报502 网关错误,修改源后又报404 错误。所以又重新安装了homebrew
执行命令后 /bin/zsh -c “$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)”
再次执行 brew install telnet 就好了
在终端输入 telnet
如下:
$ telnet
telnet> telnet localhost 8080
你好,netty
此时在服务端的控制台上就会接收到客户端的信息。
此对象提供了各种操作,使您能够触发各种I/O事件和操作。在这里,我们调用write(Object)来逐字写入接收到的消息。请注意,我们没有像在DISCARD示例中那样释放接收到的消息。这是因为Netty在将它写出来时为您释放它。
write(对象)不会将消息写入网络。它在内部进行缓冲,然后通过ctx.flush()将其刷新到网络上。或者,为了简洁,你也可以调用ctx.writeAndFlush(msg)。
如果再次运行telnet命令,您将看到服务器返回您发送给它的内容。
echo服务器的完整源代码位于该发行版的io.net .example.echo包中。
下面来修改channelRead 方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
本节中使用的协议为TIME协议。它与前面的示例不同的是,它发送一个包含32位整数的消息,而不接收任何请求,并在消息发送后关闭连接。在本例中,您将学习如何构造和发送消息,以及在完成时关闭连接。
因为我们将忽略任何接收到的数据,而是在建立连接后立即发送消息,所以这次不能使用channelRead()方法。相反,我们应该重写channelActive()方法。下面是实现:
package com.zs.netty.server; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
以下内容翻译自官网:
如前所述,channelActive()方法将在连接建立并准备生成通信流时被调用。让我们在这个方法中写一个32位整数来表示当前时间。
要发送一个新消息,我们需要分配一个新的缓冲区来包含该消息。我们将写入一个32位整数,因此我们需要一个容量至少为4字节的ByteBuf。通过ChannelHandlerContext.alloc()获取当前的ByteBufAllocator并分配一个新的缓冲区。
像往常一样,我们编写构造好的消息。
但是等等,翻转在哪里?我们以前不是在用NIO发送消息之前调用java.nio.ByteBuffer.flip()吗?ByteBuf没有这样的方法,因为它有两个指针;一个用于读操作,另一个用于写操作。当向ByteBuf写入内容时,写入器索引会增加,而读取器索引不会改变。reader索引和writer索引分别表示消息开始和结束的位置。
相比之下,NIO缓冲区没有提供一种不调用flip方法就能确定消息内容开始和结束位置的清晰方法。当您忘记翻转缓冲区时,您将遇到麻烦,因为没有任何数据或不正确的数据将被发送。这样的错误不会在Netty中发生,因为不同的操作类型有不同的指针。当你习惯了它,你会发现它会让你的生活变得更容易——一种没有失控的生活!
另一点需要注意的是,ChannelHandlerContext.write()(和writeAndFlush())方法返回一个ChannelFuture。ChannelFuture表示还没有发生的I/O操作。这意味着,任何请求的操作可能还没有执行,因为在Netty中所有操作都是异步的。例如,下面的代码可能会在消息发送之前关闭连接:
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因此,您需要在ChannelFuture完成后调用close()方法,该方法由write()方法返回,并且当写操作完成时,它会通知它的侦听器。请注意,close()也可能不会立即关闭连接,它会返回一个ChannelFuture。
那么当写请求完成时,我们如何得到通知呢?这就像向返回的ChannelFuture添加一个ChannelFutureListener一样简单。在这里,我们创建了一个新的匿名ChannelFutureListener,它在操作完成时关闭Channel。
与DISCARD和ECHO服务器不同,我们需要TIME协议的客户端,因为人类无法将32位二进制数据转换为日历上的日期。在本节中,我们将讨论如何确保服务器正常工作,并学习如何使用Netty编写客户端。
Netty中服务器和客户端之间最大也是唯一的区别是使用了不同的Bootstrap和Channel实现。请看下面的代码:
package io.netty.example.time; public class TimeClient { public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
Bootstrap类似于ServerBootstrap,除了它是用于非服务器通道,如客户端或无连接通道。
如果只指定一个EventLoopGroup,它将被用作老板组和工人组。但是,boss worker并不用于客户端。
取代NioServerSocketChannel, NioSocketChannel被用于创建客户端通道。
注意,这里不像使用ServerBootstrap那样使用childOption(),因为客户端SocketChannel没有父类。
我们应该调用connect()方法,而不是bind()方法。
正如您所看到的,它与服务器端代码并没有真正的不同。那么ChannelHandler实现呢?它应该从服务器接收一个32位整数,将其转换为人类可读的格式,打印转换后的时间,并关闭连接:
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
在TCP/IP中,Netty将从对等端发送的数据读入ByteBuf。
它看起来非常简单,与服务器端示例没有任何不同。但是,这个处理程序有时会拒绝引发IndexOutOfBoundsException异常。我们将在下一节讨论为什么会发生这种情况。
按照官方的描述,我创建了一个服务端 启动服务端 ,再启动客户端,在客户端控制台上 会收到 当前时间
Socket Buffer的一个小警告
在基于流的传输(如TCP/IP)中,接收的数据存储在套接字接收缓冲区中。不幸的是,基于流的传输的缓冲区不是数据包队列,而是字节队列。这意味着,即使您将两条消息作为两个独立的包发送,操作系统也不会将它们视为两条消息,而只是一组字节。因此,不能保证您所读到的内容与远程同伴所写的内容完全一致。例如,我们假设一个操作系统的TCP/IP堆栈收到了三个数据包:
由于基于流的协议的这种一般属性,在你的应用程序中有很大的机会以以下碎片形式读取它们:
三个包分裂并合并成四个缓冲区
因此,无论接收部分是服务器端还是客户端,都应该将接收到的数据整理成一个或多个应用程序逻辑容易理解的有意义的帧。在上面的例子中,接收到的数据应该如下所示:
现在让我们回到TIME客户机示例。我们这里也有同样的问题。32位整数是非常小的数据量,它不太可能经常被碎片化。然而,问题是它可以碎片化,随着流量的增加,碎片化的可能性也会增加。
最简单的解决方案是创建一个内部累积缓冲区,并等待直到所有4个字节都被接收到内部缓冲区。以下是修复问题的修改后的TimeClientHandler实现 新建TimeClientHandler1:
package com.zs.netty.client; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class TimeClientHandler1 extends ChannelInboundHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); // (1) } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // (1) buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // (2) m.release(); if (buf.readableBytes() >= 4) { // (3) long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
ChannelHandler有两个生命周期侦听器方法:handlerAdded()和handlerRemoved()。只要不阻塞很长时间,就可以执行任意(反)初始化任务。
首先,所有接收到的数据都要累积成buf。
然后,处理程序必须检查buf是否有足够的数据(在本例中为4字节),然后继续执行实际的业务逻辑。否则,当更多的数据到达时,Netty将再次调用channelRead()方法,最终所有4个字节将被累积。
尽管第一个解决方案已经解决了TIME客户机的问题,但是修改后的处理程序看起来并没有那么干净。想象一个更复杂的协议,它由多个字段组成,比如可变长度字段。您的ChannelInboundHandler实现将很快变得不可维护。
您可能已经注意到,可以向ChannelPipeline添加多个ChannelHandler,因此,可以将一个单一的ChannelHandler拆分为多个模块化的ChannelHandler,以降低应用程序的复杂性。例如,你可以将TimeClientHandler分成两个处理程序:
处理碎片问题的TimeDecoder,以及
TimeClientHandler最初的简单版本。
幸运的是,Netty提供了一个可扩展的类,可以帮助你编写第一个开箱即用的类:
package com.zs.netty.client; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class TimeDecoder extends ByteToMessageDecoder { // (1) @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2) if (in.readableBytes() < 4) { return; // (3) } out.add(in.readBytes(4)); // (4) } }
ByteToMessageDecoder是ChannelInboundHandler的一个实现,它可以很容易地处理碎片问题。
每当接收到新数据时,ByteToMessageDecoder会调用decode()方法,并使用内部维护的累积缓冲区。
当累积缓冲区中没有足够的数据时,Decode()可以决定不添加任何内容。当接收到更多数据时,ByteToMessageDecoder将再次调用decode()。
如果decode()向out添加了一个对象,这意味着解码器成功解码了消息。ByteToMessageDecoder将丢弃累积缓冲区的读取部分。请记住,您不需要解码多个消息。ByteToMessageDecoder将继续调用decode()方法,直到它不添加任何内容为止。
现在我们有另一个处理程序要插入到ChannelPipeline中,我们应该在timclient中修改ChannelInitializer实现:
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler1());
}
TimeDecoder也可继承自ReplayingDecoder
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
此外,Netty提供了开箱即用的解码器,使您能够非常容易地实现大多数协议,并帮助您避免最终以一个单一的不可维护的处理程序实现告终。
到目前为止,我们回顾的所有示例都使用ByteBuf作为协议消息的主要数据结构。在本节中,我们将改进TIME协议客户机和服务器示例,使用POJO而不是ByteBuf。
在ChannelHandlers中使用POJO的优势是显而易见的;通过将从ByteBuf中提取信息的代码从处理程序中分离出来,处理程序将变得更易于维护和可重用。在TIME客户机和服务器的示例中,我们只读取一个32位整数,并且直接使用ByteBuf不是一个大问题。但是,您会发现在实现真实协议时,有必要进行分离。
首先,让我们定义一个名为UnixTime的新类型。
package io.netty.example.time; import java.util.Date; public class UnixTime { private final long value; public UnixTime() { this(System.currentTimeMillis() / 1000L + 2208988800L); } public UnixTime(long value) { this.value = value; } public long value() { return value; } @Override public String toString() { return new Date((value() - 2208988800L) * 1000L).toString(); } }
客户端修改
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
服务端也可以使用
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
现在,唯一缺少的部分是一个编码器,它是ChannelOutboundHandler的实现,它将UnixTime转换回ByteBuf。它比编写解码器简单得多,因为在编码消息时不需要处理数据包碎片和汇编。
package io.netty.example.time;
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise); // (1)
}
}
在这一行中有很多重要的东西。
首先,我们按原样传递原始的ChannelPromise,以便在实际将编码数据写入网络时,Netty将其标记为成功或失败。
其次,我们没有调用ctx.flush()。有一个单独的处理程序方法void flush(ChannelHandlerContext ctx),其目的是覆盖flush()操作。
为了进一步简化,你可以使用MessageToByteEncoder:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
剩下的最后一个任务是在TimeServerHandler之前将一个TimeEncoder插入到服务器端的ChannelPipeline中,这是一个简单的练习。
server 端修改如下:添加encode
关闭一个Netty应用程序通常就像关闭你通过shutdownelegant()创建的所有EventLoopGroups一样简单。它返回一个Future,当EventLoopGroup已经完全终止并且属于该组的所有通道已经关闭时通知您。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。