赞
踩
第四篇:数据传输的简介
· Transports(传输)
· NIO(non-blocking IO,New IO), OIO(Old IO,blocking IO), Local(本地), Embedded(嵌入式)
· Use-case(用例)
· APIs(接口)
网络应用程序一个很重要的工作是传输数据。传输数据的过程不一样取决是使用哪种交通工具,但是传输的方式是一样的:都是以字节码传输。Java开发网络程序传输数据的过程和方式是被抽象了的,我们不需要关注底层接口,只需要使用Java API或其他网络框架如Netty就能达到传输数据的目的。发送数据和接收数据都是字节码。Nothing more,nothing less。
如果你曾经使用Java提供的网络接口工作过,你可能已经遇到过想从阻塞传输切换到非阻塞传输的情况,这种切换是比较困难的,因为阻塞IO和非阻塞IO使用的API有很大的差异;Netty提供了上层的传输实现接口使得这种情况变得简单。我们可以让所写的代码尽可能通用,而不会依赖一些实现相关的APIs。当我们想切换传输方式的时候不需要花很大的精力和时间来重构代码。
本章将介绍统一的API以及如何使用它们,会拿Netty的API和Java的API做比较来告诉你为什么Netty可以更容易的使用。本章也提供了一些优质的用例代码,以便最佳使用Netty。使用Netty不需要其他的网络框架或网络编程经验,若有则只是对理解netty有帮助,但不是必要的。下面让我们来看看真是世界里的传输工作。
4.1 案例研究:切换传输方式
为了让你想象如何运输,我会从一个简单的应用程序开始,这个应用程序什么都不做,只是接受客户端连接并发送“Hi!”字符串消息到客户端,发送完了就断开连接。我不会详细讲解这个过程的实现,它只是一个例子。
4.1.1 使用Java的I/O和NIO
我们将不用Netty实现这个例子,下面代码是使用阻塞IO实现的例子:
[java] view plaincopy
1. package netty.in.action;
2.
3. import java.io.IOException;
4. import java.io.OutputStream;
5. import java.net.ServerSocket;
6. import java.net.Socket;
7. import java.nio.charset.Charset;
8.
9. /**
10. * Blocking networking without Netty
11. * @author c.k
12. *
13. */
14. public class PlainOioServer {
15.
16. public void server(int port) throws Exception {
17. //bind server to port
18. final ServerSocket socket = new ServerSocket(port);
19. try {
20. while(true){
21. //accept connection
22. final Socket clientSocket = socket.accept();
23. System.out.println("Accepted connection from " + clientSocket);
24. //create new thread to handle connection
25. new Thread(new Runnable() {
26. @Override
27. public void run() {
28. OutputStream out;
29. try{
30. out = clientSocket.getOutputStream();
31. //write message to connected client
32. out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));
33. out.flush();
34. //close connection once message written and flushed
35. clientSocket.close();
36. }catch(IOException e){
37. try {
38. clientSocket.close();
39. } catch (IOException e1) {
40. e1.printStackTrace();
41. }
42. }
43. }
44. }).start();//start thread to begin handling
45. }
46. }catch(Exception e){
47. e.printStackTrace();
48. socket.close();
49. }
50. }
51.
52. }
上面的方式很简洁,但是这种阻塞模式在大连接数的情况就会有很严重的问题,如客户端连接超时,服务器响应严重延迟。为了解决这种情况,我们可以使用异步网络处理所有的并发连接,但问题在于NIO和OIO的API是完全不同的,所以一个用OIO开发的网络应用程序想要使用NIO重构代码几乎是重新开发。
下面代码是使用Java NIO实现的例子:
[java] view plaincopy
1. package netty.in.action;
2.
3. import java.net.InetSocketAddress;
4. import java.net.ServerSocket;
5. import java.nio.ByteBuffer;
6. import java.nio.channels.SelectionKey;
7. import java.nio.channels.Selector;
8. import java.nio.channels.ServerSocketChannel;
9. import java.nio.channels.SocketChannel;
10. import java.util.Iterator;
11. /**
12. * Asynchronous networking without Netty
13. * @author c.k
14. *
15. */
16. public class PlainNioServer {
17.
18. public void server(int port) throws Exception {
19. System.out.println("Listening for connections on port " + port);
20. //open Selector that handles channels
21. Selector selector = Selector.open();
22. //open ServerSocketChannel
23. ServerSocketChannel serverChannel = ServerSocketChannel.open();
24. //get ServerSocket
25. ServerSocket serverSocket = serverChannel.socket();
26. //bind server to port
27. serverSocket.bind(new InetSocketAddress(port));
28. //set to non-blocking
29. serverChannel.configureBlocking(false);
30. //register ServerSocket to selector and specify that it is interested in new accepted clients
31. serverChannel.register(selector, SelectionKey.OP_ACCEPT);
32. final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
33. while (true) {
34. //Wait for new events that are ready for process. This will block until something happens
35. int n = selector.select();
36. if (n > 0) {
37. //Obtain all SelectionKey instances that received events
38. Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
39. while (iter.hasNext()) {
40. SelectionKey key = iter.next();
41. iter.remove();
42. try {
43. //Check if event was because new client ready to get accepted
44. if (key.isAcceptable()) {
45. ServerSocketChannel server = (ServerSocketChannel) key.channel();
46. SocketChannel client = server.accept();
47. System.out.println("Accepted connection from " + client);
48. client.configureBlocking(false);
49. //Accept client and register it to selector
50. client.register(selector, SelectionKey.OP_WRITE, msg.duplicate());
51. }
52. //Check if event was because socket is ready to write data
53. if (key.isWritable()) {
54. SocketChannel client = (SocketChannel) key.channel();
55. ByteBuffer buff = (ByteBuffer) key.attachment();
56. //write data to connected client
57. while (buff.hasRemaining()) {
58. if (client.write(buff) == 0) {
59. break;
60. }
61. }
62. client.close();//close client
63. }
64. } catch (Exception e) {
65. key.cancel();
66. key.channel().close();
67. }
68. }
69. }
70. }
71. }
72.
73. }
如你所见,即使它们实现的功能是一样,但是代码完全不同。下面我们将用Netty来实现相同的功能。
4.1.2 Netty中使用I/O和NIO
下面代码是使用Netty作为网络框架编写的一个阻塞IO例子:
[java] view plaincopy
1. package netty.in.action;
2.
3. import java.net.InetSocketAddress;
4.
5. import io.netty.bootstrap.ServerBootstrap;
6. import io.netty.buffer.ByteBuf;
7. import io.netty.buffer.Unpooled;
8. import io.netty.channel.Channel;
9. import io.netty.channel.ChannelFuture;
10. import io.netty.channel.ChannelFutureListener;
11. import io.netty.channel.ChannelHandlerContext;
12. import io.netty.channel.ChannelInboundHandlerAdapter;
13. import io.netty.channel.ChannelInitializer;
14. import io.netty.channel.EventLoopGroup;
15. import io.netty.channel.nio.NioEventLoopGroup;
16. import io.netty.channel.socket.oio.OioServerSocketChannel;
17. import io.netty.util.CharsetUtil;
18.
19. public class NettyOioServer {
20.
21. public void server(int port) throws Exception {
22. final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));
23. //事件循环组
24. EventLoopGroup group = new NioEventLoopGroup();
25. try {
26. //用来引导服务器配置
27. ServerBootstrap b = new ServerBootstrap();
28. //使用OIO阻塞模式
29. b.group(group).channel(OioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
30. //指定ChannelInitializer初始化handlers
31. .childHandler(new ChannelInitializer<Channel>() {
32. @Override
33. protected void initChannel(Channel ch) throws Exception {
34. //添加一个“入站”handler到ChannelPipeline
35. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
36. @Override
37. public void channelActive(ChannelHandlerContext ctx) throws Exception {
38. //连接后,写消息到客户端,写完后便关闭连接
39. ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
40. }
41. });
42. }
43. });
44. //绑定服务器接受连接
45. ChannelFuture f = b.bind().sync();
46. f.channel().closeFuture().sync();
47. } catch (Exception e) {
48. //释放所有资源
49. group.shutdownGracefully();
50. }
51. }
52.
53. }
上面代码实现功能一样,但结构清晰明了,这只是Netty的优势之一。
4.1.3 Netty中实现异步支持
下面代码是使用Netty实现异步,可以看出使用Netty由OIO切换到NIO是非常的方便。
[java] view plaincopy
1. package netty.in.action;
2.
3. import io.netty.bootstrap.ServerBootstrap;
4. import io.netty.buffer.ByteBuf;
5. import io.netty.buffer.Unpooled;
6. import io.netty.channel.ChannelFuture;
7. import io.netty.channel.ChannelFutureListener;
8. import io.netty.channel.ChannelHandlerContext;
9. import io.netty.channel.ChannelInboundHandlerAdapter;
10. import io.netty.channel.ChannelInitializer;
11. import io.netty.channel.EventLoopGroup;
12. import io.netty.channel.nio.NioEventLoopGroup;
13. import io.netty.channel.socket.SocketChannel;
14. import io.netty.channel.socket.nio.NioServerSocketChannel;
15. import io.netty.util.CharsetUtil;
16.
17. import java.net.InetSocketAddress;
18.
19. public class NettyNioServer {
20.
21. public void server(int port) throws Exception {
22. final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));
23. // 事件循环组
24. EventLoopGroup group = new NioEventLoopGroup();
25. try {
26. // 用来引导服务器配置
27. ServerBootstrap b = new ServerBootstrap();
28. // 使用NIO异步模式
29. b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
30. // 指定ChannelInitializer初始化handlers
31. .childHandler(new ChannelInitializer<SocketChannel>() {
32. @Override
33. protected void initChannel(SocketChannel ch) throws Exception {
34. // 添加一个“入站”handler到ChannelPipeline
35. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
36. @Override
37. public void channelActive(ChannelHandlerContext ctx) throws Exception {
38. // 连接后,写消息到客户端,写完后便关闭连接
39. ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
40. }
41. });
42. }
43. });
44. // 绑定服务器接受连接
45. ChannelFuture f = b.bind().sync();
46. f.channel().closeFuture().sync();
47. } catch (Exception e) {
48. // 释放所有资源
49. group.shutdownGracefully();
50. }
51. }
52. }
因为Netty使用相同的API来实现每个传输,它并不关心你使用什么来实现。Netty通过操作Channel接口和ChannelPipeline、ChannelHandler来实现传输。
4.2 Transport API
传输API的核心是Channel接口,它用于所有出站的操作。Channel接口的类层次结构如下
如上图所示,每个Channel都会分配一个ChannelPipeline和ChannelConfig。ChannelConfig负责设置并存储配置,并允许在运行期间更新它们。传输一般有特定的配置设置,只作用于传输,没有其他的实现。ChannelPipeline容纳了使用的ChannelHandler实例,这些ChannelHandler将处理通道传递的“入站”和“出站”数据。ChannelHandler的实现允许你改变数据状态和传输数据,本书有章节详细讲解ChannelHandler,ChannelHandler是Netty的重点概念。
现在我们可以使用ChannelHandler做下面一些事情:
· 传输数据时,将数据从一种格式转换到另一种格式
· 异常通知
· Channel变为有效或无效时获得通知
· Channel被注册或从EventLoop中注销时获得通知
· 通知用户特定事件
这些ChannelHandler实例添加到ChannelPipeline中,在ChannelPipeline中按顺序逐个执行。它类似于一个链条,有使用过Servlet的读者可能会更容易理解。
ChannelPipeline实现了拦截过滤器模式,这意味着我们连接不同的ChannelHandler来拦截并处理经过ChannelPipeline的数据或事件。可以把ChannelPipeline想象成UNIX管道,它允许不同的命令链(ChannelHandler相当于命令)。你还可以在运行时根据需要添加ChannelHandler实例到ChannelPipeline或从ChannelPipeline中删除,这能帮助我们构建高度灵活的Netty程序。此外,访问指定的ChannelPipeline和ChannelConfig,你能在Channel自身上进行操作。Channel提供了很多方法,如下列表:
· eventLoop(),返回分配给Channel的EventLoop
· pipeline(),返回分配给Channel的ChannelPipeline
· isActive(),返回Channel是否激活,已激活说明与远程连接对等
· localAddress(),返回已绑定的本地SocketAddress
· remoteAddress(),返回已绑定的远程SocketAddress
· write(),写数据到远程客户端,数据通过ChannelPipeline传输过去
后面会越来越熟悉这些方法,现在只需要记住我们的操作都是在相同的接口上运行,Netty的高灵活性让你可以以不同的传输实现进行重构。
写数据到远程已连接客户端可以调用Channel.write()方法,如下代码:
[java] view plaincopy
1. Channel channel = ...
2. //Create ByteBuf that holds data to write
3. ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);
4. //Write data
5. ChannelFuture cf = channel.write(buf);
6. //Add ChannelFutureListener to get notified after write completes
7. cf.addListener(new ChannelFutureListener() {
8. @Override
9. public void operationComplete(ChannelFuture future) {
10. //Write operation completes without error
11. if (future.isSuccess()) {
12. System.out.println(.Write successful.);
13. } else {
14. //Write operation completed but because of error
15. System.err.println(.Write error.);
16. future.cause().printStacktrace();
17. }
18. }
19. });
Channel是线程安全(thread-safe)的,它可以被多个不同的线程安全的操作,在多线程环境下,所有的方法都是安全的。正因为Channel是安全的,我们存储对Channel的引用,并在学习的时候使用它写入数据到远程已连接的客户端,使用多线程也是如此。下面的代码是一个简单的多线程例子:
[java] view plaincopy
1. final Channel channel = ...
2. //Create ByteBuf that holds data to write
3. final ByteBuf buf = Unpooled.copiedBuffer("your data",CharsetUtil.UTF_8);
4. //Create Runnable which writes data to channel
5. Runnable writer = new Runnable() {
6. @Override
7. public void run() {
8. channel.write(buf.duplicate());
9. }
10. };
11. //Obtain reference to the Executor which uses threads to execute tasks
12. Executor executor = Executors.newChachedThreadPool();
13. // write in one thread
14. //Hand over write task to executor for execution in thread
15. executor.execute(writer);
16. // write in another thread
17. //Hand over another write task to executor for execution in thread
18. executor.execute(writer);
此外,这种方法保证了写入的消息以相同的顺序通过写入它们的方法。想了解所有方法的使用可以参考Netty API文档。
4.3 Netty包含的传输实现
Netty自带了一些传输协议的实现,虽然没有支持所有的传输协议,但是其自带的已足够我们来使用。Netty应用程序的传输协议依赖于底层协议,本节我们将学习Netty中的传输协议。
Netty中的传输方式有如下几种:
· NIO,io.netty.channel.socket.nio,基于java.nio.channels的工具包,使用选择器作为基础的方法。
· OIO,io.netty.channel.socket.oio,基于java.net的工具包,使用阻塞流。
· Local,io.netty.channel.local,用来在虚拟机之间本地通信。
· Embedded,io.netty.channel.embedded,嵌入传输,它允许在没有真正网络的运输中使用ChannelHandler,可以非常有用的来测试ChannelHandler的实现。
4.3.1 NIO - Nonblocking I/O
NIO传输是目前最常用的方式,它通过使用选择器提供了完全异步的方式操作所有的I/O,NIO从Java 1.4才被提供。NIO中,我们可以注册一个通道或获得某个通道的改变的状态,通道状态有下面几种改变:
· 一个新的Channel被接受并已准备好
· Channel连接完成
· Channel中有数据并已准备好读取
· Channel发送数据出去
处理完改变的状态后需重新设置他们的状态,用一个线程来检查是否有已准备好的Channel,如果有则执行相关事件。在这里可能只同时一个注册的事件而忽略其他的。选择器所支持的操作在SelectionKey中定义,具体如下:
· OP_ACCEPT,有新连接时得到通知
· OP_CONNECT,连接完成后得到通知
· OP_READ,准备好读取数据时得到通知
· OP_WRITE,写入数据到通道时得到通知
Netty中的NIO传输就是基于这样的模型来接收和发送数据,通过封装将自己的接口提供给用户使用,这完全隐藏了内部实现。如前面所说,Netty隐藏内部的实现细节,将抽象出来的API暴露出来供使用,下面是处理流程图:
NIO在处理过程也会有一定的延迟,若连接数不大的话,延迟一般在毫秒级,但是其吞吐量依然比OIO模式的要高。Netty中的NIO传输是“zero-file-copy”,也就是零文件复制,这种机制可以让程序速度更快,更高效的从文件系统中传输内容,零复制就是我们的应用程序不会将发送的数据先复制到JVM堆栈在进行处理,而是直接从内核空间操作。接下来我们将讨论OIO传输,它是阻塞的。
4.3.2 OIO - Old blocking I/O
OIO就是java中提供的Socket接口,java最开始只提供了阻塞的Socket,阻塞会导致程序性能低。下面是OIO的处理流程图,若想详细了解,可以参阅其他相关资料。
4.3.3 Local - In VM transport
Netty包含了本地传输,这个传输实现使用相同的API用于虚拟机之间的通信,传输是完全异步的。每个Channel使用唯一的SocketAddress,客户端通过使用SocketAddress进行连接,在服务器会被注册为长期运行,一旦通道关闭,它会自动注销,客户端无法再使用它。
连接到本地传输服务器的行为与其他的传输实现几乎是相同的,需要注意的一个重点是只能在本地的服务器和客户端上使用它们。Local未绑定任何Socket,值提供JVM进程之间的通信。
4.3.4 Embedded transport
Netty还包括嵌入传输,与之前讲述的其他传输实现比较,它是不是一个真的传输呢?若不是一个真的传输,我们用它可以做什么呢?Embedded transport允许更容易的使用不同的ChannelHandler之间的交互,这也更容易嵌入到其他的ChannelHandler实例并像一个辅助类一样使用它们。它一般用来测试特定的ChannelHandler实现,也可以在ChannelHandler中重新使用一些ChannelHandler来进行扩展,为了实现这样的目的,它自带了一个具体的Channel实现,即:EmbeddedChannel。
4.4 每种传输方式在什么时候使用?
不多加赘述,看下面列表:
· OIO,在低连接数、需要低延迟时、阻塞时使用
· NIO,在高连接数时使用
· Local,在同一个JVM内通信时使用
· Embedded,测试ChannelHandler时使用
5.buffer数据缓冲
· ByteBuf
· ByteBufHolder
· ByteBufAllocator
· 使用这些接口分配缓冲和执行操作
每当你需要传输数据时,它必须包含一个缓冲区。Java NIO API自带的缓冲区类是相当有限的,没有经过优化,使用JDK的ByteBuffer操作更复杂。缓冲区是一个重要的组建,它是API的一部分。Netty提供了一个强大的缓冲区实现用于表示一个字节序列,并帮助你操作原始字节或自定义的POJO。Netty的ByteBuf相当于JDK的ByteBuffer,ByteBuf的作用是在Netty中通过Channel传输数据。它被重新设计以解决JDK的ByteBuffer中的一些问题,从而使开发人员开发网络应用程序显得更有效率。本章将讲述Netty中的缓冲区,并了解它为什么比JDK自带的缓冲区实现更优秀,还会深入了解在Netty中使用ByteBuf访问数据以及如何使用它。
5.1 Buffer API
Netty的缓冲API有两个接口:
· ByteBuf
· ByteBufHolder
Netty使用reference-counting(引用计数)的时候知道安全释放Buf和其他资源,虽然知道Netty有效的使用引用计数,这都是自动完成的。这允许Netty使用池和其他技巧来加快速度和保持内存利用率在正常水平,你不需要做任何事情来实现这一点,但是在开发Netty应用程序时,你应该处理数据尽快释放池资源。
Netty缓冲API提供了几个优势:
· 可以自定义缓冲类型
· 通过一个内置的复合缓冲类型实现零拷贝
· 扩展性好,比如StringBuffer
· 不需要调用flip()来切换读/写模式
· 读取和写入索引分开
· 方法链
· 引用计数
· Pooling(池)
5.2 ByteBuf - 字节数据容器
当需要与远程进行交互时,需要以字节码发送/接收数据。由于各种原因,一个高效、方便、易用的数据接口是必须的,而Netty的ByteBuf满足这些需求,ByteBuf是一个很好的经过优化的数据容器,我们可以将字节数据有效的添加到ByteBuf中或从ByteBuf中获取数据。ByteBuf有2部分:一个用于读,一个用于写。我们可以按顺序的读取数据,并且可以跳到开始重新读一遍。所有的数据操作,我们只需要做的是调整读取数据索引和再次开始读操作。
5.2.1 ByteBuf如何在工作?
写入数据到ByteBuf后,写入索引是增加的字节数量。开始读字节后,读取索引增加。你可以读取字节,直到写入索引和读取索引处理相同的位置,次数若继续读取,则会抛出IndexOutOfBoundsException。调用ByteBuf的任何方法开始读/写都会单独维护读索引和写索引。ByteBuf的默认最大容量限制是Integer.MAX_VALUE,写入时若超出这个值将会导致一个异常。
ByteBuf类似于一个字节数组,最大的区别是读和写的索引可以用来控制对缓冲区数据的访问。下图显示了一个容量为16的ByteBuf:
5.2.2 不同类型的ByteBuf
使用Netty时会遇到3种不同类型的ByteBuf
Heap Buffer(堆缓冲区)
最常用的类型是ByteBuf将数据存储在JVM的堆空间,这是通过将数据存储在数组的实现。堆缓冲区可以快速分配,当不使用时也可以快速释放。它还提供了直接访问数组的方法,通过ByteBuf.array()来获取byte[]数据。
访问非堆缓冲区ByteBuf的数组会导致UnsupportedOperationException,可以使用ByteBuf.hasArray()来检查是否支持访问数组。
Direct Buffer(直接缓冲区)
直接缓冲区,在堆之外直接分配内存。直接缓冲区不会占用堆空间容量,使用时应该考虑到应用程序要使用的最大内存容量以及如何限制它。直接缓冲区在使用Socket传递数据时性能很好,因为若使用间接缓冲区,JVM会先将数据复制到直接缓冲区再进行传递;但是直接缓冲区的缺点是在分配内存空间和释放内存时比堆缓冲区更复杂,而Netty使用内存池来解决这样的问题,这也是Netty使用内存池的原因之一。直接缓冲区不支持数组访问数据,但是我们可以间接的访问数据数组,如下面代码:
[java] view plaincopy
1. ByteBuf directBuf = Unpooled.directBuffer(16);
2. if(!directBuf.hasArray()){
3. int len = directBuf.readableBytes();
4. byte[] arr = new byte[len];
5. directBuf.getBytes(0, arr);
6. }
访问直接缓冲区的数据数组需要更多的编码和更复杂的操作,建议若需要在数组访问数据使用堆缓冲区会更好。
Composite Buffer(复合缓冲区)
复合缓冲区,我们可以创建多个不同的ByteBuf,然后提供一个这些ByteBuf组合的视图。复合缓冲区就像一个列表,我们可以动态的添加和删除其中的ByteBuf,JDK的ByteBuffer没有这样的功能。Netty提供了CompositeByteBuf类来处理复合缓冲区,CompositeByteBuf只是一个视图,CompositeByteBuf.hasArray()总是返回false,因为它可能包含一些直接或间接的不同类型的ByteBuf。
例如,一条消息由header和body两部分组成,将header和body组装成一条消息发送出去,可能body相同,只是header不同,使用CompositeByteBuf就不用每次都重新分配一个新的缓冲区。下图显示CompositeByteBuf组成header和body:
若使用JDK的ByteBuffer就不能这样简单的实现,只能创建一个数组或创建一个新的ByteBuffer,再将内容复制到新的ByteBuffer中。下面是使用CompositeByteBuf的例子:
[java] view plaincopy
1. CompositeByteBuf compBuf = Unpooled.compositeBuffer();
2. ByteBuf heapBuf = Unpooled.buffer(8);
3. ByteBuf directBuf = Unpooled.directBuffer(16);
4. //添加ByteBuf到CompositeByteBuf
5. compBuf.addComponents(heapBuf,directBuf);
6. //删除第一个ByteBuf
7. compBuf.removeComponent(0);
8. Iterator<ByteBuf> iter = compBuf.iterator();
9. while(iter.hasNext()){
10. System.out.println(iter.next().toString());
11. }
12. //使用数组访问数据
13. if(!compBuf.hasArray()){
14. int len = compBuf.readableBytes();
15. byte[] arr = new byte[len];
16. compBuf.getBytes(0, arr);
17. }
CompositeByteBuf是ByteBuf的子类,我们可以像操作BytBuf一样操作CompositeByteBuf。并且Netty优化套接字读写的操作是尽可能的使用CompositeByteBuf来做的,使用CompositeByteBuf不会操作内存泄露问题。
5.3 ByteBuf的字节操作
ByteBuf提供了许多操作,允许修改其中的数据内容或只是读取数据。ByteBuf和JDK的ByteBuffer很像,但是ByteBuf提供了更好的性能。
5.3.1 随机访问索引
ByteBuf使用zero-based-indexing(从0开始的索引),第一个字节的索引是0,最后一个字节的索引是ByteBuf的capacity - 1,下面代码是遍历ByteBuf的所有字节:
[java] view plaincopy
1. //create a ByteBuf of capacity is 16
2. ByteBuf buf = Unpooled.buffer(16);
3. //write data to buf
4. for(int i=0;i<16;i++){
5. buf.writeByte(i+1);
6. }
7. //read data from buf
8. for(int i=0;i<buf.capacity();i++){
9. System.out.println(buf.getByte(i));
10. }
注意通过索引访问时不会推进读索引和写索引,我们可以通过ByteBuf的readerIndex()或writerIndex()来分别推进读索引或写索引。
5.3.2 顺序访问索引
ByteBuf提供两个指针变量支付读和写操作,读操作是使用readerIndex(),写操作时使用writerIndex()。这和JDK的ByteBuffer不同,ByteBuffer只有一个方法来设置索引,所以需要使用flip()方法来切换读和写模式。
ByteBuf一定符合:0 <= readerIndex <= writerIndex <= capacity。
5.3.3 Discardable bytes废弃字节
我们可以调用ByteBuf.discardReadBytes()来回收已经读取过的字节,discardReadBytes()将丢弃从索引0到readerIndex之间的字节。调用discardReadBytes()方法后会变成如下图:
ByteBuf.discardReadBytes()可以用来清空ByteBuf中已读取的数据,从而使ByteBuf有多余的空间容纳新的数据,但是discardReadBytes()可能会涉及内存复制,因为它需要移动ByteBuf中可读的字节到开始位置,这样的操作会影响性能,一般在需要马上释放内存的时候使用收益会比较大。
5.3.4 可读字节(实际内容)
任何读操作会增加readerIndex,如果读取操作的参数也是一个ByteBuf而没有指定目的索引,指定的目的缓冲区的writerIndex会一起增加,没有足够的内容时会抛出IndexOutOfBoundException。新分配、包装、复制的缓冲区的readerIndex的默认值都是0。下面代码显示了获取所有可读数据:
[java] view plaincopy
1. ByteBuf buf = Unpooled.buffer(16);
2. while(buf.isReadable()){
3. System.out.println(buf.readByte());
4. }
(代码于原书中有出入,原书可能是基于Netty4之前的版本讲解的,此处基于Netty4)
5.3.5 可写字节Writable bytes
任何写的操作会增加writerIndex。若写操作的参数也是一个ByteBuf并且没有指定数据源索引,那么指定缓冲区的readerIndex也会一起增加。若没有足够的可写字节会抛出IndexOutOfBoundException。新分配的缓冲区writerIndex的默认值是0。下面代码显示了随机一个int数字来填充缓冲区,直到缓冲区空间耗尽:
[java] view plaincopy
1. Random random = new Random();
2. ByteBuf buf = Unpooled.buffer(16);
3. while(buf.writableBytes() >= 4){
4. buf.writeInt(random.nextInt());
5. }
5.3.6 清除缓冲区索引Clearing the buffer indexs
调用ByteBuf.clear()可以设置readerIndex和writerIndex为0,clear()不会清除缓冲区的内容,只是将两个索引值设置为0。请注意ByteBuf.clear()与JDK的ByteBuffer.clear()的语义不同。
下图显示了ByteBuf调用clear()之前:
下图显示了调用clear()之后:
和discardReadBytes()相比,clear()是便宜的,因为clear()不会复制任何内存。
5.3.7 搜索操作Search operations
各种indexOf()方法帮助你定位一个值的索引是否符合,我们可以用ByteBufProcessor复杂动态顺序搜索实现简单的静态单字节搜索。如果你想解码可变长度的数据,如null结尾的字符串,你会发现bytesBefore(byte value)方法有用。例如我们写一个集成的flash sockets的应用程序,这个应用程序使用NULL结束的内容,使用bytesBefore(byte value)方法可以很容易的检查数据中的空字节。没有ByteBufProcessor的话,我们需要自己做这些事情,使用ByteBufProcessor效率更好。
5.3.8 标准和重置Mark and reset
每个ByteBuf有两个标注索引,一个存储readerIndex,一个存储writerIndex。你可以通过调用一个重置方法重新定位两个索引之一,它类似于InputStream的标注和重置方法,没有读限制。我们可以通过调用readerIndex(int readerIndex)和writerIndex(int writerIndex)移动读索引和写索引到指定位置,调用这两个方法设置指定索引位置时可能抛出IndexOutOfBoundException。
调用duplicate()、slice()、slice(int index, int length)、order(ByteOrder endianness)会创建一个现有缓冲区的视图。衍生的缓冲区有独立的readerIndex、writerIndex和标注索引。如果需要现有缓冲区的全新副本,可以使用copy()或copy(int index, int length)获得。看下面代码:
[java] view plaincopy
1. // get a Charset of UTF-8
2. Charset utf8 = Charset.forName("UTF-8");
3. // get a ByteBuf
4. ByteBuf buf = Unpooled.copiedBuffer("“Netty in Action rocks!“", utf8);
5. // slice
6. ByteBuf sliced = buf.slice(0, 14);
7. // copy
8. ByteBuf copy = buf.copy(0, 14);
9. // print "“Netty in Action rocks!“"
10. System.out.println(buf.toString(utf8));
11. // print "“Netty in Act"
12. System.out.println(sliced.toString(utf8));
13. // print "“Netty in Act"
14. System.out.println(copy.toString(utf8));
有两种主要类型的读写操作:
· get/set操作以索引为基础,在给定的索引设置或获取字节
· 从当前索引开始读写,递增当前的写索引或读索引
ByteBuf的各种读写方法或其他一些检查方法可以看ByteBuf的源码,这里不赘述了。
ByteBufHolder是一个辅助类,是一个接口,其实现类是DefaultByteBufHolder,还有一些实现了ByteBufHolder接口的其他接口类。ByteBufHolder的作用就是帮助更方便的访问ByteBuf中的数据,当缓冲区没用了后,可以使用这个辅助类释放资源。ByteBufHolder很简单,提供的可供访问的方法也很少。如果你想实现一个“消息对象”有效负载存储在ByteBuf,使用ByteBufHolder是一个好主意。
尽管Netty提供的各种缓冲区实现类已经很容易使用,但Netty依然提供了一些使用的工具类,使得创建和使用各种缓冲区更加方便。下面会介绍一些Netty中的缓冲区工具类。
Netty支持各种ByteBuf的池实现,来使Netty提供一种称为ByteBufAllocator成为可能。ByteBufAllocator负责分配ByteBuf实例,ByteBufAllocator提供了各种分配不同ByteBuf的方法,如需要一个堆缓冲区可以使用ByteBufAllocator.heapBuffer(),需要一个直接缓冲区可以使用ByteBufAllocator.directBuffer(),需要一个复合缓冲区可以使用ByteBufAllocator.compositeBuffer()。其他方法的使用可以看ByteBufAllocator源码及注释。
获取ByteBufAllocator对象很容易,可以从Channel的alloc()获取,也可以从ChannelHandlerContext的alloc()获取。看下面代码:
[java] view plaincopy
1. ServerBootstrap b = new ServerBootstrap();
2. b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
3. .childHandler(new ChannelInitializer<SocketChannel>() {
4. @Override
5. protected void initChannel(SocketChannel ch) throws Exception {
6. // get ByteBufAllocator instance by Channel.alloc()
7. ByteBufAllocator alloc0 = ch.alloc();
8. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
9. @Override
10. public void channelActive(ChannelHandlerContext ctx) throws Exception {
11. //get ByteBufAllocator instance by ChannelHandlerContext.alloc()
12. ByteBufAllocator alloc1 = ctx.alloc();
13. ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
14. }
15. });
16. }
17. });
Netty有两种不同的ByteBufAllocator实现,一个实现ByteBuf实例池将分配和回收成本以及内存使用降到最低;另一种实现是每次使用都创建一个新的ByteBuf实例。Netty默认使用PooledByteBufAllocator,我们可以通过ChannelConfig或通过引导设置一个不同的实现来改变。更多细节在后面讲述。
Unpooled也是用来创建缓冲区的工具类,Unpooled的使用也很容易。Unpooled提供了很多方法,详细方法及使用可以看API文档或Netty源码。看下面代码:
[java] view plaincopy
1. //创建复合缓冲区
2. CompositeByteBuf compBuf = Unpooled.compositeBuffer();
3. //创建堆缓冲区
4. ByteBuf heapBuf = Unpooled.buffer(8);
5. //创建直接缓冲区
6. ByteBuf directBuf = Unpooled.directBuffer(16);
ByteBufUtil提供了一些静态的方法,在操作ByteBuf时非常有用。ByteBufUtil提供了Unpooled之外的一些方法,也许最有价值的是hexDump(ByteBuf buffer)方法,这个方法返回指定ByteBuf中可读字节的十六进制字符串,可以用于调试程序时打印ByteBuf的内容,十六进制字符串相比字节而言对用户更友好。
本章主要学习Netty提供的缓冲区类ByteBuf的创建和简单实用以及一些操作ByteBuf的工具类。
6.处理器
· ChannelPipeline
· ChannelHandlerContext
· ChannelHandler
· Inbound vs outbound(入站和出站)
接受连接或创建他们只是你的应用程序的一部分,虽然这些任何很重要,但是一个网络应用程序旺旺是更复杂的,需要更多的代码编写,如处理传入和传出的数据。Netty提供了一个强大的处理这些事情的功能,允许用户自定义ChannelHandler的实现来处理数据。使得ChannelHandler更强大的是可以连接每个ChannelHandler来实现任务,这有助于代码的整洁和重用。但是处理数据只是ChannelHandler所做的事情之一,也可以压制I/O操作,例如写请求。所有这些都可以动态实现。
6.1 ChannelPipeline
ChannelPipeline是ChannelHandler实例的列表,用于处理或截获通道的接收和发送数据。ChannelPipeline提供了一种高级的截取过滤器模式,让用户可以在ChannelPipeline中完全控制一个事件及如何处理ChannelHandler与ChannelPipeline的交互。
对于每个新的通道,会创建一个新的ChannelPipeline并附加至通道。一旦连接,Channel和ChannelPipeline之间的耦合是永久性的。Channel不能附加其他的ChannelPipeline或从ChannelPipeline分离。
下图描述了ChannelHandler在ChannelPipeline中的I/O处理,一个I/O操作可以由一个ChannelInboundHandler或ChannelOutboundHandler进行处理,并通过调用ChannelInboundHandler处理入站IO或通过ChannelOutboundHandler处理出站IO。
如上图所示,ChannelPipeline是ChannelHandler的一个列表;如果一个入站I/O事件被触发,这个事件会从第一个开始依次通过ChannelPipeline中的ChannelHandler;若是一个入站I/O事件,则会从最后一个开始依次通过ChannelPipeline中的ChannelHandler。ChannelHandler可以处理事件并检查类型,如果某个ChannelHandler不能处理则会跳过,并将事件传递到下一个ChannelHandler。ChannelPipeline可以动态添加、删除、替换其中的ChannelHandler,这样的机制可以提高灵活性。
修改ChannelPipeline的方法:
· addFirst(...),添加ChannelHandler在ChannelPipeline的第一个位置
· addBefore(...),在ChannelPipeline中指定的ChannelHandler名称之前添加ChannelHandler
· addAfter(...),在ChannelPipeline中指定的ChannelHandler名称之后添加ChannelHandler
· addLast(ChannelHandler...),在ChannelPipeline的末尾添加ChannelHandler
· remove(...),删除ChannelPipeline中指定的ChannelHandler
· replace(...),替换ChannelPipeline中指定的ChannelHandler
[java] view plaincopy
1. ChannelPipeline pipeline = ch.pipeline();
2. FirstHandler firstHandler = new FirstHandler();
3. pipeline.addLast("handler1", firstHandler);
4. pipeline.addFirst("handler2", new SecondHandler());
5. pipeline.addLast("handler3", new ThirdHandler());
6. pipeline.remove("“handler3“");
7. pipeline.remove(firstHandler);
8. pipeline.replace("handler2", "handler4", new FourthHandler());
被添加到ChannelPipeline的ChannelHandler将通过IO-Thread处理事件,这意味了必须不能有其他的IO-Thread阻塞来影响IO的整体处理;有时候可能需要阻塞,例如JDBC。因此,Netty允许通过一个EventExecutorGroup到每一个ChannelPipeline.add*方法,自定义的事件会被包含在EventExecutorGroup中的EventExecutor来处理,默认的实现是DefaultEventExecutorGroup。
ChannelPipeline除了一些修改的方法,还有很多其他的方法,具体是方法及使用可以看API文档或源码。
6.2 ChannelHandlerContext
每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext并与之创建的ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler实现进行交互,这是相同ChannelPipeline的一部分。ChannelHandlerContext不会改变添加到其中的ChannelHandler,因此它是安全的。
6.2.1 通知下一个ChannelHandler
在相同的ChannelPipeline中通过调用ChannelInboundHandler和ChannelOutboundHandler中各个方法中的一个方法来通知最近的handler,通知开始的地方取决你如何设置。下图显示了ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系:
如果你想有一些事件流全部通过ChannelPipeline,有两个不同的方法可以做到:
· 调用Channel的方法
· 调用ChannelPipeline的方法
这两个方法都可以让事件流全部通过ChannelPipeline。无论从头部还是尾部开始,因为它主要依赖于事件的性质。如果是一个“入站”事件,它开始于头部;若是一个“出站”事件,则开始于尾部。
下面的代码显示了一个写事件如何通过ChannelPipeline从尾部开始:
[java] view plaincopy
1. @Override
2. protected void initChannel(SocketChannel ch) throws Exception {
3. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
4. @Override
5. public void channelActive(ChannelHandlerContext ctx) throws Exception {
6. //Event via Channel
7. Channel channel = ctx.channel();
8. channel.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));
9. //Event via ChannelPipeline
10. ChannelPipeline pipeline = ctx.pipeline();
11. pipeline.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));
12. }
13. });
14. }
下图表示通过Channel或ChannelPipeline的通知:
可能你想从ChannelPipeline的指定位置开始,不想流经整个ChannelPipeline,如下情况:
· 为了节省开销,不感兴趣的ChannelHandler不让通过
· 排除一些ChannelHandler
在这种情况下,你可以使用ChannelHandlerContext的ChannelHandler通知起点。它使用ChannelHandlerContext执行下一个ChannelHandler。下面代码显示了直接使用ChannelHandlerContext操作:
[java] view plaincopy
1. // Get reference of ChannelHandlerContext
2. ChannelHandlerContext ctx = ..;
3. // Write buffer via ChannelHandlerContext
4. ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
该消息流经ChannelPipeline到下一个ChannelHandler,在这种情况下使用ChannelHandlerContext开始下一个ChannelHandler。下图显示了事件流:
如上图显示的,从指定的ChannelHandlerContext开始,跳过前面所有的ChannelHandler,使用ChannelHandlerContext操作是常见的模式,最常用的是从ChannelHanlder调用操作,也可以在外部使用ChannelHandlerContext,因为这是线程安全的。
6.2.2 修改ChannelPipeline
调用ChannelHandlerContext的pipeline()方法能访问ChannelPipeline,能在运行时动态的增加、删除、替换ChannelPipeline中的ChannelHandler。可以保持ChannelHandlerContext供以后使用,如外部Handler方法触发一个事件,甚至从一个不同的线程。
下面代码显示了保存ChannelHandlerContext供之后使用或其他线程使用:
[java] view plaincopy
1. public class WriteHandler extends ChannelHandlerAdapter {
2. private ChannelHandlerContext ctx;
3.
4. @Override
5. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
6. this.ctx = ctx;
7. }
8.
9. public void send(String msg){
10. ctx.write(msg);
11. }
12. }
请注意,ChannelHandler实例如果带有@Sharable注解则可以被添加到多个ChannelPipeline。也就是说单个ChannelHandler实例可以有多个ChannelHandlerContext,因此可以调用不同ChannelHandlerContext获取同一个ChannelHandler。如果添加不带@Sharable注解的ChannelHandler实例到多个ChannelPipeline则会抛出异常;使用@Sharable注解后的ChannelHandler必须在不同的线程和不同的通道上安全使用。怎么是不安全的使用?看下面代码:
[java] view plaincopy
1. @Sharable
2. public class NotSharableHandler extends ChannelInboundHandlerAdapter {
3.
4. private int count;
5.
6. @Override
7. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
8. count++;
9. System.out.println("channelRead(...) called the " + count + " time“");
10. ctx.fireChannelRead(msg);
11. }
12.
13. }
上面是一个带@Sharable注解的Handler,它被多个线程使用时,里面count是不安全的,会导致count值错误。
为什么要共享ChannelHandler?使用@Sharable注解共享一个ChannelHandler在一些需求中还是有很好的作用的,如使用一个ChannelHandler来统计连接数或来处理一些全局数据等等。
6.3 状态模型
Netty有一个简单但强大的状态模型,并完美映射到ChannelInboundHandler的各个方法。下面是Channel生命周期四个不同的状态:
· channelUnregistered
· channelRegistered
· channelActive
· channelInactive
Channel的状态在其生命周期中变化,因为状态变化需要触发,下图显示了Channel状态变化:
还可以看到额外的状态变化,因为用户允许从EventLoop中注销Channel暂停事件执行,然后再重新注册。在这种情况下,你会看到多个channelRegistered和channelUnregistered状态的变化,而永远只有一个channelActive和channelInactive的状态,因为一个通道在其生命周期内只能连接一次,之后就会被回收;重新连接,则是创建一个新的通道。
下图显示了从EventLoop中注销Channel后再重新注册的状态变化:
6.4 ChannelHandler和其子类
Netty中有3个实现了ChannelHandler接口的类,其中2个是接口,一个是抽象类。如下图:
6.4.1 ChannelHandler中的方法
Netty定义了良好的类型层次结构来表示不同的处理程序类型,所有的类型的父类是ChannelHandler。ChannelHandler提供了在其生命周期内添加或从ChannelPipeline中删除的方法。
· handlerAdded,ChannelHandler添加到实际上下文中准备处理事件
· handlerRemoved,将ChannelHandler从实际上下文中删除,不再处理事件
· exceptionCaught,处理抛出的异常
上面三个方法都需要传递ChannelHandlerContext参数,每个ChannelHandler被添加到ChannelPipeline时会自动创建ChannelHandlerContext。ChannelHandlerContext允许在本地通道安全的存储和检索值。Netty还提供了一个实现了ChannelHandler的抽象类:ChannelHandlerAdapter。ChannelHandlerAdapter实现了父类的所有方法,基本上就是传递事件到ChannelPipeline中的下一个ChannelHandler直到结束。
6.4.2 ChannelInboundHandler
ChannelInboundHandler提供了一些方法再接收数据或Channel状态改变时被调用。下面是ChannelInboundHandler的一些方法:
· channelRegistered,ChannelHandlerContext的Channel被注册到EventLoop;
· channelUnregistered,ChannelHandlerContext的Channel从EventLoop中注销
· channelActive,ChannelHandlerContext的Channel已激活
· channelInactive,ChannelHanderContxt的Channel结束生命周期
· channelRead,从当前Channel的对端读取消息
· channelReadComplete,消息读取完成后执行
· userEventTriggered,一个用户事件被处罚
· channelWritabilityChanged,改变通道的可写状态,可以使用Channel.isWritable()检查
· exceptionCaught,重写父类ChannelHandler的方法,处理异常
Netty提供了一个实现了ChannelInboundHandler接口并继承ChannelHandlerAdapter的类:ChannelInboundHandlerAdapter。ChannelInboundHandlerAdapter实现了ChannelInboundHandler的所有方法,作用就是处理消息并将消息转发到ChannelPipeline中的下一个ChannelHandler。ChannelInboundHandlerAdapter的channelRead方法处理完消息后不会自动释放消息,若想自动释放收到的消息,可以使用SimpleChannelInboundHandler<I>。
看下面代码:
[java] view plaincopy
1. /**
2. * 实现ChannelInboundHandlerAdapter的Handler,不会自动释放接收的消息对象
3. * @author c.k
4. *
5. */
6. public class DiscardHandler extends ChannelInboundHandlerAdapter {
7. @Override
8. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
9. //手动释放消息
10. ReferenceCountUtil.release(msg);
11. }
12. }
[java] view plaincopy
1. /**
2. * 继承SimpleChannelInboundHandler,会自动释放消息对象
3. * @author c.k
4. *
5. */
6. public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {
7. @Override
8. protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
9. //不需要手动释放
10. }
11. }
如果需要其他状态改变的通知,可以重写Handler的其他方法。通常自定义消息类型来解码字节,可以实现ChannelInboundHandler或ChannelInboundHandlerAdapter。有一个更好的解决方法,使用编解码器的框架可以很容的实现。使用ChannelInboundHandler、ChannelInboundHandlerAdapter、SimpleChannelInboundhandler这三个中的一个来处理接收消息,使用哪一个取决于需求;大多数时候使用SimpleChannelInboundHandler处理消息,使用ChannelInboundHandlerAdapter处理其他的“入站”事件或状态改变。
ChannelInitializer用来初始化ChannelHandler,将自定义的各种ChannelHandler添加到ChannelPipeline中。
6.4.3 ChannelOutboundHandler
ChannelOutboundHandler用来处理“出站”的数据消息。ChannelOutboundHandler提供了下面一些方法:
· bind,Channel绑定本地地址
· connect,Channel连接操作
· disconnect,Channel断开连接
· close,关闭Channel
· deregister,注销Channel
· read,读取消息,实际是截获ChannelHandlerContext.read()
· write,写操作,实际是通过ChannelPipeline写消息,Channel.flush()属性到实际通道
· flush,刷新消息到通道
ChannelOutboundHandler是ChannelHandler的子类,实现了ChannelHandler的所有方法。所有最重要的方法采取ChannelPromise,因此一旦请求停止从ChannelPipeline转发参数则必须得到通知。Netty提供了ChannelOutboundHandler的实现:ChannelOutboundHandlerAdapter。ChannelOutboundHandlerAdapter实现了父类的所有方法,并且可以根据需要重写感兴趣的方法。所有这些方法的实现,在默认情况下,都是通过调用ChannelHandlerContext的方法将事件转发到ChannelPipeline中下一个ChannelHandler。
看下面的代码:
[java] view plaincopy
1. public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter {
2. @Override
3. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
4. ReferenceCountUtil.release(msg);
5. promise.setSuccess();
6. }
7. }
重要的是要记得释放致远并直通ChannelPromise,若ChannelPromise没有被通知可能会导致其中一个ChannelFutureListener不被通知去处理一个消息。
如果消息被消费并且没有被传递到ChannelPipeline中的下一个ChannelOutboundHandler,那么就需要调用ReferenceCountUtil.release(message)来释放消息资源。一旦消息被传递到实际的通道,它会自动写入消息或在通道关闭是释放。
7.编码器
· Codec,编解码器
· Decoder,解码器
· Encoder,编码器
Netty提供了编解码器框架,使得编写自定义的编解码器很容易,并且也很容易重用和封装。本章讨论Netty的编解码器框架以及使用。
7.1 编解码器Codec
编写一个网络应用程序需要实现某种编解码器,编解码器的作用就是讲原始字节数据与自定义的消息对象进行互转。网络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码,因为编解码器由两部分组成:
· Decoder(解码器)
· Encoder(编码器)
解码器负责将消息从字节或其他序列形式转成指定的消息对象,编码器则相反;解码器负责处理“入站”数据,编码器负责处理“出站”数据。编码器和解码器的结构很简单,消息被编码后解码后会自动通过ReferenceCountUtil.release(message)释放,如果不想释放消息可以使用ReferenceCountUtil.retain(message),这将会使引用数量增加而没有消息发布,大多数时候不需要这么做。
7.2 解码器
Netty提供了丰富的解码器抽象基类,我们可以很容易的实现这些基类来自定义解码器。下面是解码器的一个类型:
· 解码字节到消息
· 解码消息到消息
· 解码消息到字节
本章将概述不同的抽象基类,来帮助了解解码器的实现。深入了解Netty提供的解码器之前先了解解码器的作用是什么?解码器负责解码“入站”数据从一种格式到另一种格式,解码器处理入站数据是抽象ChannelInboundHandler的实现。实践中使用解码器很简单,就是将入站数据转换格式后传递到ChannelPipeline中的下一个ChannelInboundHandler进行处理;这样的处理时很灵活的,我们可以将解码器放在ChannelPipeline中,重用逻辑。
7.2.1 ByteToMessageDecoder
通常你需要将消息从字节解码成消息或者从字节解码成其他的序列化字节。这是一个常见的任务,Netty提供了抽象基类,我们可以使用它们来实现。Netty中提供的ByteToMessageDecoder可以将字节消息解码成POJO对象,下面列出了ByteToMessageDecoder两个主要方法:
· decode(ChannelHandlerContext, ByteBuf, List<Object>),这个方法是唯一的一个需要自己实现的抽象方法,作用是将ByteBuf数据解码成其他形式的数据。
· decodeLast(ChannelHandlerContext, ByteBuf, List<Object>),实际上调用的是decode(...)。
例如服务器从某个客户端接收到一个整数值的字节码,服务器将数据读入ByteBuf并经过ChannelPipeline中的每个ChannelInboundHandler进行处理,看下图:
上图显示了从“入站”ByteBuf读取bytes后由ToIntegerDecoder进行解码,然后向解码后的消息传递到ChannelPipeline中的下一个ChannelInboundHandler。看下面ToIntegerDecoder的实现代码:
[java] view plaincopy
1. /**
2. * Integer解码器,ByteToMessageDecoder实现
3. * @author c.k
4. *
5. */
6. public class ToIntegerDecoder extends ByteToMessageDecoder {
7.
8. @Override
9. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
10. if(in.readableBytes() >= 4){
11. out.add(in.readInt());
12. }
13. }
14. }
从上面的代码可能会发现,我们需要检查ByteBuf读之前是否有足够的字节,若没有这个检查岂不更好?是的,Netty提供了这样的处理允许byte-to-message解码,在下一节讲解。除了ByteToMessageDecoder之外,Netty还提供了许多其他的解码接口。
7.2.2 ReplayingDecoder
ReplayingDecoder是byte-to-message解码的一种特殊的抽象基类,读取缓冲区的数据之前需要检查缓冲区是否有足够的字节,使用ReplayingDecoder就无需自己检查;若ByteBuf中有足够的字节,则会正常读取;若没有足够的字节则会停止解码。也正因为这样的包装使得ReplayingDecoder带有一定的局限性。
· 不是所有的操作都被ByteBuf支持,如果调用一个不支持的操作会抛出DecoderException。
· ByteBuf.readableBytes()大部分时间不会返回期望值
如果你能忍受上面列出的限制,相比ByteToMessageDecoder,你可能更喜欢ReplayingDecoder。在满足需求的情况下推荐使用ByteToMessageDecoder,因为它的处理比较简单,没有ReplayingDecoder实现的那么复杂。ReplayingDecoder继承与ByteToMessageDecoder,所以他们提供的接口是相同的。下面代码是ReplayingDecoder的实现:
[java] view plaincopy
1. /**
2. * Integer解码器,ReplayingDecoder实现
3. * @author c.k
4. *
5. */
6. public class ToIntegerReplayingDecoder extends ReplayingDecoder<Void> {
7.
8. @Override
9. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
10. out.add(in.readInt());
11. }
12. }
当从接收的数据ByteBuf读取integer,若没有足够的字节可读,decode(...)会停止解码,若有足够的字节可读,则会读取数据添加到List列表中。使用ReplayingDecoder或ByteToMessageDecoder是个人喜好的问题,Netty提供了这两种实现,选择哪一个都可以。
上面讲了byte-to-message的解码实现方式,那message-to-message该如何实现呢?Netty提供了MessageToMessageDecoder抽象类。
7.2.3 MessageToMessageDecoder
将消息对象转成消息对象可是使用MessageToMessageDecoder,它是一个抽象类,需要我们自己实现其decode(...)。message-to-message同上面讲的byte-to-message的处理机制一样,看下图:
看下面的实现代码:
[java] view plaincopy
1. /**
2. * 将接收的Integer消息转成String类型,MessageToMessageDecoder实现
3. * @author c.k
4. *
5. */
6. public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
7.
8. @Override
9. protected void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
10. out.add(String.valueOf(msg));
11. }
12. }
7.2.4 解码器总结
解码器是用来处理入站数据,Netty提供了很多解码器的实现,可以根据需求详细了解。那我们发送数据需要将数据编码,Netty中也提供了编码器的支持。下一节将讲解如何实现编码器。
7.3 编码器
Netty提供了一些基类,我们可以很简单的编码器。同样的,编码器有下面两种类型:
· 消息对象编码成消息对象
· 消息对象编码成字节码
相对解码器,编码器少了一个byte-to-byte的类型,因为出站数据这样做没有意义。编码器的作用就是将处理好的数据转成字节码以便在网络中传输。对照上面列出的两种编码器类型,Netty也分别提供了两个抽象类:MessageToByteEncoder和MessageToMessageEncoder。下面是类关系图:
7.3.1 MessageToByteEncoder
MessageToByteEncoder是抽象类,我们自定义一个继承MessageToByteEncoder的编码器只需要实现其提供的encode(...)方法。其工作流程如下图:
实现代码如下:
[java] view plaincopy
1. /**
2. * 编码器,将Integer值编码成byte[],MessageToByteEncoder实现
3. * @author c.k
4. *
5. */
6. public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> {
7. @Override
8. protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
9. out.writeInt(msg);
10. }
11. }
7.3.2 MessageToMessageEncoder
需要将消息编码成其他的消息时可以使用Netty提供的MessageToMessageEncoder抽象类来实现。例如将Integer编码成String,其工作流程如下图:
代码实现如下:
[java] view plaincopy
1. /**
2. * 编码器,将Integer编码成String,MessageToMessageEncoder实现
3. * @author c.k
4. *
5. */
6. public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {
7.
8. @Override
9. protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
10. out.add(String.valueOf(msg));
11. }
12. }
7.4 编解码器
实际编码中,一般会将编码和解码操作封装太一个类中,解码处理“入站”数据,编码处理“出站”数据。知道了编码和解码器,对于下面的情况不会感觉惊讶:
· byte-to-message编码和解码
· message-to-message编码和解码
如果确定需要在ChannelPipeline中使用编码器和解码器,需要更好的使用一个抽象的编解码器。同样,使用编解码器的时候,不可能只删除解码器或编码器而离开ChannelPipeline导致某种不一致的状态。使用编解码器将强制性的要么都在ChannelPipeline,要么都不在ChannelPipeline。
考虑到这一点,我们在下面几节将更深入的分析Netty提供的编解码抽象类。
7.4.1 byte-to-byte编解码器
Netty4较之前的版本,其结构有很大的变化,在Netty4中实现byte-to-byte提供了2个类:ByteArrayEncoder和ByteArrayDecoder。这两个类用来处理字节到字节的编码和解码。下面是这两个类的源码,一看就知道是如何处理的:
[java] view plaincopy
1. public class ByteArrayDecoder extends MessageToMessageDecoder<ByteBuf> {
2. @Override
3. protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
4. // copy the ByteBuf content to a byte array
5. byte[] array = new byte[msg.readableBytes()];
6. msg.getBytes(0, array);
7.
8. out.add(array);
9. }
10. }
[java] view plaincopy
1. @Sharable
2. public class ByteArrayEncoder extends MessageToMessageEncoder<byte[]> {
3. @Override
4. protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception {
5. out.add(Unpooled.wrappedBuffer(msg));
6. }
7. }
7.4.2 ByteToMessageCodec
ByteToMessageCodec用来处理byte-to-message和message-to-byte。如果想要解码字节消息成POJO或编码POJO消息成字节,对于这种情况,ByteToMessageCodec<I>是一个不错的选择。ByteToMessageCodec是一种组合,其等同于ByteToMessageDecoder和MessageToByteEncoder的组合。MessageToByteEncoder是个抽象类,其中有2个方法需要我们自己实现:
· encode(ChannelHandlerContext, I, ByteBuf),编码
· decode(ChannelHandlerContext, ByteBuf, List<Object>),解码
7.4.3 MessageToMessageCodec
MessageToMessageCodec用于message-to-message的编码和解码,可以看成是MessageToMessageDecoder和MessageToMessageEncoder的组合体。MessageToMessageCodec是抽象类,其中有2个方法需要我们自己实现:
· encode(ChannelHandlerContext, OUTBOUND_IN, List<Object>)
· decode(ChannelHandlerContext, INBOUND_IN, List<Object>)
但是,这种编解码器能有用吗?
有许多用例,最常见的就是需要将消息从一个API转到另一个API。这种情况下需要自定义API或旧的API使用另一种消息类型。下面的代码显示了在WebSocket框架APIs之间转换消息:
[java] view plaincopy
1. package netty.in.action;
2.
3. import java.util.List;
4.
5. import io.netty.buffer.ByteBuf;
6. import io.netty.channel.ChannelHandlerContext;
7. import io.netty.channel.ChannelHandler.Sharable;
8. import io.netty.handler.codec.MessageToMessageCodec;
9. import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
10. import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
11. import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
12. import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
13. import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
14. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
15. import io.netty.handler.codec.http.websocketx.WebSocketFrame;
16.
17. @Sharable
18. public class WebSocketConvertHandler extends
19. MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {
20.
21. public static final WebSocketConvertHandler INSTANCE = new WebSocketConvertHandler();
22.
23. @Override
24. protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception {
25. switch (msg.getType()) {
26. case BINARY:
27. out.add(new BinaryWebSocketFrame(msg.getData()));
28. break;
29. case CLOSE:
30. out.add(new CloseWebSocketFrame(true, 0, msg.getData()));
31. break;
32. case PING:
33. out.add(new PingWebSocketFrame(msg.getData()));
34. break;
35. case PONG:
36. out.add(new PongWebSocketFrame(msg.getData()));
37. break;
38. case TEXT:
39. out.add(new TextWebSocketFrame(msg.getData()));
40. break;
41. case CONTINUATION:
42. out.add(new ContinuationWebSocketFrame(msg.getData()));
43. break;
44. default:
45. throw new IllegalStateException("Unsupported websocket msg " + msg);
46. }
47. }
48.
49. @Override
50. protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
51. if (msg instanceof BinaryWebSocketFrame) {
52. out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, msg.content().copy()));
53. return;
54. }
55. if (msg instanceof CloseWebSocketFrame) {
56. out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, msg.content().copy()));
57. return;
58. }
59. if (msg instanceof PingWebSocketFrame) {
60. out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, msg.content().copy()));
61. return;
62. }
63. if (msg instanceof PongWebSocketFrame) {
64. out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, msg.content().copy()));
65. return;
66. }
67. if (msg instanceof TextWebSocketFrame) {
68. out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, msg.content().copy()));
69. return;
70. }
71. if (msg instanceof ContinuationWebSocketFrame) {
72. out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, msg.content().copy()));
73. return;
74. }
75. throw new IllegalStateException("Unsupported websocket msg " + msg);
76. }
77.
78. public static final class MyWebSocketFrame {
79. public enum FrameType {
80. BINARY, CLOSE, PING, PONG, TEXT, CONTINUATION
81. }
82.
83. private final FrameType type;
84. private final ByteBuf data;
85.
86. public MyWebSocketFrame(FrameType type, ByteBuf data) {
87. this.type = type;
88. this.data = data;
89. }
90.
91. public FrameType getType() {
92. return type;
93. }
94.
95. public ByteBuf getData() {
96. return data;
97. }
98.
99. }
100. }
7.5 其他编解码方式
使用编解码器来充当编码器和解码器的组合失去了单独使用编码器或解码器的灵活性,编解码器是要么都有要么都没有。你可能想知道是否有解决这个僵化问题的方式,还可以让编码器和解码器在ChannelPipeline中作为一个逻辑单元。幸运的是,Netty提供了一种解决方案,使用CombinedChannelDuplexHandler。虽然这个类不是编解码器API的一部分,但是它经常被用来简历一个编解码器。
7.5.1 CombinedChannelDuplexHandler
如何使用CombinedChannelDuplexHandler来结合解码器和编码器呢?下面我们从两个简单的例子看了解。
[java] view plaincopy
1. /**
2. * 解码器,将byte转成char
3. * @author c.k
4. *
5. */
6. public class ByteToCharDecoder extends ByteToMessageDecoder {
7.
8. @Override
9. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
10. while(in.readableBytes() >= 2){
11. out.add(Character.valueOf(in.readChar()));
12. }
13. }
14.
15. }
[java] view plaincopy
1. /**
2. * 编码器,将char转成byte
3. * @author Administrator
4. *
5. */
6. public class CharToByteEncoder extends MessageToByteEncoder<Character> {
7.
8. @Override
9. protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception {
10. out.writeChar(msg);
11. }
12. }
[java] view plaincopy
1. /**
2. * 继承CombinedChannelDuplexHandler,用于绑定解码器和编码器
3. * @author c.k
4. *
5. */
6. public class CharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
7. public CharCodec(){
8. super(new ByteToCharDecoder(), new CharToByteEncoder());
9. }
10. }
从上面代码可以看出,使用CombinedChannelDuplexHandler绑定解码器和编码器很容易实现,比使用*Codec更灵活。
Netty还提供了其他的协议支持,放在io.netty.handler.codec包下,如:
· Google的protobuf,在io.netty.handler.codec.protobuf包下
· Google的SPDY协议
· RTSP(Real Time Streaming Protocol,实时流传输协议),在io.netty.handler.codec.rtsp包下
· SCTP(Stream Control Transmission Protocol,流控制传输协议),在io.netty.handler.codec.sctp包下
· ......
8.ChannelHandler和Codec
· 使用SSL/TLS创建安全的Netty程序
· 使用Netty创建HTTP/HTTPS程序
· 处理空闲连接和超时
· 解码分隔符和基于长度的协议
· 写大数据
· 序列化数据
上一章讲解了如何创建自己的编解码器,我们现在可以用上一章的知识来编写自己的编解码器。不过Netty提供了一些标准的ChannelHandler和Codec。Netty提供了很多协议的支持,所以我们不必自己发明轮子。Netty提供的这些实现可以解决我们的大部分需求。本章讲解Netty中使用SSL/TLS编写安全的应用程序,编写HTTP协议服务器,以及使用如WebSocket或Google的SPDY协议来使HTTP服务获得更好的性能;这些都是很常见的应用,本章还会介绍数据压缩,在数据量比较大的时候,压缩数据是很有必要的。
8.1 使用SSL/TLS创建安全的Netty程序
通信数据在网络上传输一般是不安全的,因为传输的数据可以发送纯文本或二进制的数据,很容易被破解。我们很有必要对网络上的数据进行加密。SSL和TLS是众所周知的标准和分层的协议,它们可以确保数据时私有的。例如,使用HTTPS或SMTPS都使用了SSL/TLS对数据进行了加密。
对于SSL/TLS,Java中提供了抽象的SslContext和SslEngine。实际上,SslContext可以用来获取SslEngine来进行加密和解密。使用指定的加密技术是高度可配置的,但是这不在本章范围。Netty扩展了Java的SslEngine,添加了一些新功能,使其更适合基于Netty的应用程序。Netty提供的这个扩展是SslHandler,是SslEngine的包装类,用来对网络数据进行加密和解密。
下图显示SslHandler实现的数据流:
上图显示了如何使用ChannelInitializer将SslHandler添加到ChannelPipeline,看下面代码:
[java] view plaincopy
1. public class SslChannelInitializer extends ChannelInitializer<Channel> {
2.
3. private final SSLContext context;
4. private final boolean client;
5. private final boolean startTls;
6.
7. public SslChannelInitializer(SSLContext context, boolean client, boolean startTls) {
8. this.context = context;
9. this.client = client;
10. this.startTls = startTls;
11. }
12.
13. @Override
14. protected void initChannel(Channel ch) throws Exception {
15. SSLEngine engine = context.createSSLEngine();
16. engine.setUseClientMode(client);
17. ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));
18. }
19. }
需要注意一点,SslHandler必须要添加到ChannelPipeline的第一个位置,可能有一些例外,但是最好这样来做。回想一下之前讲解的ChannelHandler,ChannelPipeline就像是一个在处理“入站”数据时先进先出,在处理“出站”数据时后进先出的队列。最先添加的SslHandler会啊在其他Handler处理逻辑数据之前对数据进行加密,从而确保Netty服务端的所有的Handler的变化都是安全的。
SslHandler提供了一些有用的方法,可以用来修改其行为或得到通知,一旦SSL/TLS完成握手(在握手过程中的两个对等通道互相验证对方,然后选择一个加密密码),SSL/TLS是自动执行的。看下面方法列表:
· setHandshakeTimeout(long handshakeTimeout, TimeUnit unit),设置握手超时时间,ChannelFuture将得到通知
· setHandshakeTimeoutMillis(long handshakeTimeoutMillis),设置握手超时时间,ChannelFuture将得到通知
· getHandshakeTimeoutMillis(),获取握手超时时间值
· setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit),设置关闭通知超时时间,若超时,ChannelFuture会关闭失败
· setHandshakeTimeoutMillis(long handshakeTimeoutMillis),设置关闭通知超时时间,若超时,ChannelFuture会关闭失败
· getCloseNotifyTimeoutMillis(),获取关闭通知超时时间
· handshakeFuture(),返回完成握手后的ChannelFuture
· close(),发送关闭通知请求关闭和销毁
8.2 使用Netty创建HTTP/HTTPS程序
HTTP/HTTPS是最常用的协议之一,可以通过HTTP/HTTPS访问网站,或者是提供对外公开的接口服务等等。Netty附带了使用HTTP/HTTPS的handlers,而不需要我们自己来编写编解码器。
8.2.1 Netty的HTTP编码器,解码器和编解码器
HTTP是请求-响应模式,客户端发送一个http请求,服务就响应此请求。Netty提供了简单的编码解码HTTP协议消息的Handler。下图显示了http请求和响应:
如上面两个图所示,一个HTTP请求/响应消息可能包含不止一个,但最终都会有LastHttpContent消息。FullHttpRequest和FullHttpResponse是Netty提供的两个接口,分别用来完成http请求和响应。所有的HTTP消息类型都实现了HttpObject接口。下面是类关系图:
Netty提供了HTTP请求和响应的编码器和解码器,看下面列表:
· HttpRequestEncoder,将HttpRequest或HttpContent编码成ByteBuf
· HttpRequestDecoder,将ByteBuf解码成HttpRequest和HttpContent
· HttpResponseEncoder,将HttpResponse或HttpContent编码成ByteBuf
· HttpResponseDecoder,将ByteBuf解码成HttpResponse和HttpContent
看下面代码:
[java] view plaincopy
1. public class HttpDecoderEncoderInitializer extends ChannelInitializer<Channel> {
2.
3. private final boolean client;
4.
5. public HttpDecoderEncoderInitializer(boolean client) {
6. this.client = client;
7. }
8.
9. @Override
10. protected void initChannel(Channel ch) throws Exception {
11. ChannelPipeline pipeline = ch.pipeline();
12. if (client) {
13. pipeline.addLast("decoder", new HttpResponseDecoder());
14. pipeline.addLast("", new HttpRequestEncoder());
15. } else {
16. pipeline.addLast("decoder", new HttpRequestDecoder());
17. pipeline.addLast("encoder", new HttpResponseEncoder());
18. }
19. }
20. }
如果你需要在ChannelPipeline中有一个解码器和编码器,还分别有一个在客户端和服务器简单的编解码器:HttpClientCodec和HttpServerCodec。
在ChannelPipelien中有解码器和编码器(或编解码器)后就可以操作不同的HttpObject消息了;但是HTTP请求和响应可以有很多消息数据,你需要处理不同的部分,可能也需要聚合这些消息数据,这是很麻烦的。为了解决这个问题,Netty提供了一个聚合器,它将消息部分合并到FullHttpRequest和FullHttpResponse,因此不需要担心接收碎片消息数据。
8.2.2 HTTP消息聚合
处理HTTP时可能接收HTTP消息片段,Netty需要缓冲直到接收完整个消息。要完成的处理HTTP消息,并且内存开销也不会很大,Netty为此提供了HttpObjectAggregator。通过HttpObjectAggregator,Netty可以聚合HTTP消息,使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一个ChannelHandler,这就消除了断裂消息,保证了消息的完整。下面代码显示了如何聚合:
[java] view plaincopy
1. /**
2. * 添加聚合http消息的Handler
3. *
4. * @author c.k
5. *
6. */
7. public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
8.
9. private final boolean client;
10.
11. public HttpAggregatorInitializer(boolean client) {
12. this.client = client;
13. }
14.
15. @Override
16. protected void initChannel(Channel ch) throws Exception {
17. ChannelPipeline pipeline = ch.pipeline();
18. if (client) {
19. pipeline.addLast("codec", new HttpClientCodec());
20. } else {
21. pipeline.addLast("codec", new HttpServerCodec());
22. }
23. pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024));
24. }
25.
26. }
如上面代码,很容使用Netty自动聚合消息。但是请注意,为了防止Dos攻击服务器,需要合理的限制消息的大小。应设置多大取决于实际的需求,当然也得有足够的内存可用。
8.2.3 HTTP压缩
使用HTTP时建议压缩数据以减少传输流量,压缩数据会增加CPU负载,现在的硬件设施都很强大,大多数时候压缩数据时一个好主意。Netty支持“gzip”和“deflate”,为此提供了两个ChannelHandler实现分别用于压缩和解压。看下面代码:
[java] view plaincopy
1. @Override
2. protected void initChannel(Channel ch) throws Exception {
3. ChannelPipeline pipeline = ch.pipeline();
4. if (client) {
5. pipeline.addLast("codec", new HttpClientCodec());
6. //添加解压缩Handler
7. pipeline.addLast("decompressor", new HttpContentDecompressor());
8. } else {
9. pipeline.addLast("codec", new HttpServerCodec());
10. //添加解压缩Handler
11. pipeline.addLast("decompressor", new HttpContentDecompressor());
12. }
13. pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024));
14. }
8.2.4 使用HTTPS
网络中传输的重要数据需要加密来保护,使用Netty提供的SslHandler可以很容易实现,看下面代码:
[java] view plaincopy
1. /**
2. * 使用SSL对HTTP消息加密
3. *
4. * @author c.k
5. *
6. */
7. public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
8.
9. private final SSLContext context;
10. private final boolean client;
11.
12. public HttpsCodecInitializer(SSLContext context, boolean client) {
13. this.context = context;
14. this.client = client;
15. }
16.
17. @Override
18. protected void initChannel(Channel ch) throws Exception {
19. SSLEngine engine = context.createSSLEngine();
20. engine.setUseClientMode(client);
21. ChannelPipeline pipeline = ch.pipeline();
22. pipeline.addFirst("ssl", new SslHandler(engine));
23. if (client) {
24. pipeline.addLast("codec", new HttpClientCodec());
25. } else {
26. pipeline.addLast("codec", new HttpServerCodec());
27. }
28. }
29.
30. }
8.2.5 WebSocket
HTTP是不错的协议,但是如果需要实时发布信息怎么做?有个做法就是客户端一直轮询请求服务器,这种方式虽然可以达到目的,但是其缺点很多,也不是优秀的解决方案,为了解决这个问题,便出现了WebSocket。
WebSocket允许数据双向传输,而不需要请求-响应模式。早期的WebSocket只能发送文本数据,然后现在不仅可以发送文本数据,也可以发送二进制数据,这使得可以使用WebSocket构建你想要的程序。下图是WebSocket的通信示例图:
在应用程序中添加WebSocket支持很容易,Netty附带了WebSocket的支持,通过ChannelHandler来实现。使用WebSocket有不同的消息类型需要处理。下面列表列出了Netty中WebSocket类型:
· BinaryWebSocketFrame,包含二进制数据
· TextWebSocketFrame,包含文本数据
· ContinuationWebSocketFrame,包含二进制数据或文本数据,BinaryWebSocketFrame和TextWebSocketFrame的结合体
· CloseWebSocketFrame,WebSocketFrame代表一个关闭请求,包含关闭状态码和短语
· PingWebSocketFrame,WebSocketFrame要求PongWebSocketFrame发送数据
· PongWebSocketFrame,WebSocketFrame要求PingWebSocketFrame响应
为了简化,我们只看看如何使用WebSocket服务器。客户端使用可以看Netty自带的WebSocket例子。
Netty提供了许多方法来使用WebSocket,但最简单常用的方法是使用WebSocketServerProtocolHandler。看下面代码:
[java] view plaincopy
1. /**
2. * WebSocket Server,若想使用SSL加密,将SslHandler加载ChannelPipeline的最前面即可
3. * @author c.k
4. *
5. */
6. public class WebSocketServerInitializer extends ChannelInitializer<Channel> {
7.
8. @Override
9. protected void initChannel(Channel ch) throws Exception {
10. ch.pipeline().addLast(new HttpServerCodec(),
11. new HttpObjectAggregator(65536),
12. new WebSocketServerProtocolHandler("/websocket"),
13. new TextFrameHandler(),
14. new BinaryFrameHandler(),
15. new ContinuationFrameHandler());
16. }
17.
18. public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
19. @Override
20. protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
21. // handler text frame
22. }
23. }
24.
25. public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame>{
26. @Override
27. protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {
28. //handler binary frame
29. }
30. }
31.
32. public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame>{
33. @Override
34. protected void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception {
35. //handler continuation frame
36. }
37. }
38. }
8.2.6 SPDY
SPDY(读作“SPeeDY”)是Google开发的基于TCP的应用层协议,用以最小化网络延迟,提升网络速度,优化用户的网络使用体验。SPDY并不是一种用于替代HTTP的协议,而是对HTTP协议的增强。新协议的功能包括数据流的多路复用、请求优先级以及HTTP报头压缩。谷歌表示,引入SPDY协议后,在实验室测试中页面加载速度比原先快64%。
SPDY的定位:
· 将页面加载时间减少50%。
· 最大限度地减少部署的复杂性。SPDY使用TCP作为传输层,因此无需改变现有的网络设施。
· 避免网站开发者改动内容。支持SPDY唯一需要变化的是客户端代理和Web服务器应用程序。
SPDY实现技术:
· 单个TCP连接支持并发的HTTP请求。
· 压缩报头和去掉不必要的头部来减少当前HTTP使用的带宽。
· 定义一个容易实现,在服务器端高效率的协议。通过减少边缘情况、定义易解析的消息格式来减少HTTP的复杂性。
· 强制使用SSL,让SSL协议在现存的网络设施下有更好的安全性和兼容性。
· 允许服务器在需要时发起对客户端的连接并推送数据。
SPDY具体的细节知识及使用可以查阅相关资料,这里不作赘述了。
8.3 处理空闲连接和超时
处理空闲连接和超时是网络应用程序的核心部分。当发送一条消息后,可以检测连接是否还处于活跃状态,若很长时间没用了就可以断开连接。Netty提供了很好的解决方案,有三种不同的ChannelHandler处理闲置和超时连接:
· IdleStateHandler,当一个通道没有进行读写或运行了一段时间后出发IdleStateEvent
· ReadTimeoutHandler,在指定时间内没有接收到任何数据将抛出ReadTimeoutException
· WriteTimeoutHandler,在指定时间内有写入数据将抛出WriteTimeoutException
最常用的是IdleStateHandler,下面代码显示了如何使用IdleStateHandler,如果60秒内没有接收数据或发送数据,操作将失败,连接将关闭:
[java] view plaincopy
1. public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
2.
3. @Override
4. protected void initChannel(Channel ch) throws Exception {
5. ChannelPipeline pipeline = ch.pipeline();
6. pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
7. pipeline.addLast(new HeartbeatHandler());
8. }
9.
10. public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
11. private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(
12. "HEARTBEAT", CharsetUtil.UTF_8));
13.
14. @Override
15. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
16. if (evt instanceof IdleStateEvent) {
17. ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
18. } else {
19. super.userEventTriggered(ctx, evt);
20. }
21. }
22. }
23. }
8.4 解码分隔符和基于长度的协议
使用Netty时会遇到需要解码以分隔符和长度为基础的协议,本节讲解Netty如何解码这些协议。
8.4.1 分隔符协议
经常需要处理分隔符协议或创建基于它们的协议,例如SMTP、POP3、IMAP、Telnet等等;Netty附带的handlers可以很容易的提取一些序列分隔:
· DelimiterBasedFrameDecoder,解码器,接收ByteBuf由一个或多个分隔符拆分,如NUL或换行符
· LineBasedFrameDecoder,解码器,接收ByteBuf以分割线结束,如"\n"和"\r\n"
下图显示了使用"\r\n"分隔符的处理:
下面代码显示使用LineBasedFrameDecoder提取"\r\n"分隔帧:
[java] view plaincopy
1. /**
2. * 处理换行分隔符消息
3. * @author c.k
4. *
5. */
6. public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
7.
8. @Override
9. protected void initChannel(Channel ch) throws Exception {
10. ch.pipeline().addLast(new LineBasedFrameDecoder(65 * 1204), new FrameHandler());
11. }
12.
13. public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
14. @Override
15. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
16. // do something with the frame
17. }
18. }
19. }
如果框架的东西除了换行符还有别的分隔符,可以使用DelimiterBasedFrameDecoder,只需要将分隔符传递到构造方法中。如果想实现自己的以分隔符为基础的协议,这些解码器是有用的。例如,现在有个协议,它只处理命令,这些命令由名称和参数形成,名称和参数由一个空格分隔,实现这个需求的代码如下:
[java] view plaincopy
1. /**
2. * 自定义以分隔符为基础的协议
3. * @author c.k
4. *
5. */
6. public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
7.
8. @Override
9. protected void initChannel(Channel ch) throws Exception {
10. ch.pipeline().addLast(new CmdDecoder(65 * 1024), new CmdHandler());
11. }
12.
13. public static final class Cmd {
14. private final ByteBuf name;
15. private final ByteBuf args;
16.
17. public Cmd(ByteBuf name, ByteBuf args) {
18. this.name = name;
19. this.args = args;
20. }
21.
22. public ByteBuf getName() {
23. return name;
24. }
25.
26. public ByteBuf getArgs() {
27. return args;
28. }
29. }
30.
31. public static final class CmdDecoder extends LineBasedFrameDecoder {
32.
33. public CmdDecoder(int maxLength) {
34. super(maxLength);
35. }
36.
37. @Override
38. protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
39. ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);
40. if (frame == null) {
41. return null;
42. }
43. int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), (byte) ' ');
44. return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index + 1, frame.writerIndex()));
45. }
46. }
47.
48. public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {
49. @Override
50. protected void channelRead0(ChannelHandlerContext ctx, Cmd msg) throws Exception {
51. // do something with the command
52. }
53. }
54.
55. }
8.4.2 长度为基础的协议
一般经常会碰到以长度为基础的协议,对于这种情况Netty有两个不同的解码器可以帮助我们来解码:
· FixedLengthFrameDecoder
· LengthFieldBasedFrameDecoder
下图显示了FixedLengthFrameDecoder的处理流程:
如上图所示,FixedLengthFrameDecoder提取固定长度,例子中的是8字节。大部分时候帧的大小被编码在头部,这种情况可以使用LengthFieldBasedFrameDecoder,它会读取头部长度并提取帧的长度。下图显示了它是如何工作的:
如果长度字段是提取框架的一部分,可以在LengthFieldBasedFrameDecoder的构造方法中配置,还可以指定提供的长度。FixedLengthFrameDecoder很容易使用,我们重点讲解LengthFieldBasedFrameDecoder。下面代码显示如何使用LengthFieldBasedFrameDecoder提取8字节长度:
[java] view plaincopy
1. public class LengthBasedInitializer extends ChannelInitializer<Channel> {
2.
3. @Override
4. protected void initChannel(Channel ch) throws Exception {
5. ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65*1024, 0, 8))
6. .addLast(new FrameHandler());
7. }
8.
9. public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{
10. @Override
11. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
12. //do something with the frame
13. }
14. }
15. }
8.5 写大数据
写大量的数据的一个有效的方法是使用异步框架,如果内存和网络都处于饱满负荷状态,你需要停止写,否则会报OutOfMemoryError。Netty提供了写文件内容时zero-memory-copy机制,这种方法再将文件内容写到网络堆栈空间时可以获得最大的性能。使用零拷贝写文件的内容时通过DefaultFileRegion、ChannelHandlerContext、ChannelPipeline,看下面代码:
[java] view plaincopy
1. @Override
2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
3. File file = new File("test.txt");
4. FileInputStream fis = new FileInputStream(file);
5. FileRegion region = new DefaultFileRegion(fis.getChannel(), 0, file.length());
6. Channel channel = ctx.channel();
7. channel.writeAndFlush(region).addListener(new ChannelFutureListener() {
8.
9. @Override
10. public void operationComplete(ChannelFuture future) throws Exception {
11. if(!future.isSuccess()){
12. Throwable cause = future.cause();
13. // do something
14. }
15. }
16. });
17. }
如果只想发送文件中指定的数据块应该怎么做呢?Netty提供了ChunkedWriteHandler,允许通过处理ChunkedInput来写大的数据块。下面是ChunkedInput的一些实现类:
· ChunkedFile
· ChunkedNioFile
· ChunkedStream
· ChunkedNioStream
看下面代码:
[java] view plaincopy
1. public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
2. private final File file;
3.
4. public ChunkedWriteHandlerInitializer(File file) {
5. this.file = file;
6. }
7.
8. @Override
9. protected void initChannel(Channel ch) throws Exception {
10. ch.pipeline().addLast(new ChunkedWriteHandler())
11. .addLast(new WriteStreamHandler());
12. }
13.
14. public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {
15. @Override
16. public void channelActive(ChannelHandlerContext ctx) throws Exception {
17. super.channelActive(ctx);
18. ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
19. }
20. }
21. }
8.6 序列化数据
开发网络程序过程中,很多时候需要传输结构化对象数据POJO,Java中提供了ObjectInputStream和ObjectOutputStream及其他的一些对象序列化接口。Netty中提供基于JDK序列化接口的序列化接口。
8.6.1 普通的JDK序列化
如果你使用ObjectInputStream和ObjectOutputStream,并且需要保持兼容性,不想有外部依赖,那么JDK的序列化是首选。Netty提供了下面的一些接口,这些接口放在io.netty.handler.codec.serialization包下面:
· CompatibleObjectEncoder
· CompactObjectInputStream
· CompactObjectOutputStream
· ObjectEncoder
· ObjectDecoder
· ObjectEncoderOutputStream
· ObjectDecoderInputStream
8.6.2 通过JBoss编组序列化
如果你想使用外部依赖的接口,JBoss编组是个好方法。JBoss Marshalling序列化的速度是JDK的3倍,并且序列化的结构更紧凑,从而使序列化后的数据更小。Netty附带了JBoss编组序列化的实现,这些实现接口放在io.netty.handler.codec.marshalling包下面:
· CompatibleMarshallingEncoder
· CompatibleMarshallingDecoder
· MarshallingEncoder
· MarshallingDecoder
看下面代码:
[java] view plaincopy
1. /**
2. * 使用JBoss Marshalling
3. * @author c.k
4. *
5. */
6. public class MarshallingInitializer extends ChannelInitializer<Channel> {
7. private final MarshallerProvider marshallerProvider;
8. private final UnmarshallerProvider unmarshallerProvider;
9.
10. public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider) {
11. this.marshallerProvider = marshallerProvider;
12. this.unmarshallerProvider = unmarshallerProvider;
13. }
14.
15. @Override
16. protected void initChannel(Channel ch) throws Exception {
17. ch.pipeline().addLast(new MarshallingDecoder(unmarshallerProvider))
18. .addLast(new MarshallingEncoder(marshallerProvider))
19. .addLast(new ObjectHandler());
20. }
21.
22. public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
23. @Override
24. protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception {
25. // do something
26. }
27. }
28. }
8.6.3 使用ProtoBuf序列化
最有一个序列化方案是Netty附带的ProtoBuf。protobuf是Google开源的一种编码和解码技术,它的作用是使序列化数据更高效。并且谷歌提供了protobuf的不同语言的实现,所以protobuf在跨平台项目中是非常好的选择。Netty附带的protobuf放在io.netty.handler.codec.protobuf包下面:
· ProtobufDecoder
· ProtobufEncoder
· ProtobufVarint32FrameDecoder
· ProtobufVarint32LengthFieldPrepender
看下面代码:
[java] view plaincopy
1. /**
2. * 使用protobuf序列化数据,进行编码解码
3. * 注意:使用protobuf需要protobuf-java-2.5.0.jar
4. * @author Administrator
5. *
6. */
7. public class ProtoBufInitializer extends ChannelInitializer<Channel> {
8.
9. private final MessageLite lite;
10.
11. public ProtoBufInitializer(MessageLite lite) {
12. this.lite = lite;
13. }
14.
15. @Override
16. protected void initChannel(Channel ch) throws Exception {
17. ch.pipeline().addLast(new ProtobufVarint32FrameDecoder())
18. .addLast(new ProtobufEncoder())
19. .addLast(new ProtobufDecoder(lite))
20. .addLast(new ObjectHandler());
21. }
22.
23. public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
24. @Override
25. protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception {
26. // do something
27. }
28. }
29. }
9.引导Netty的程序应用程序
· 引导客户端和服务器
· 从Channel引导客户端
· 添加多个ChannelHandler
· 使用通道选项和属性
上一章学习了编写自己的ChannelHandler和编解码器并将它们添加到Channel的ChannelPipeline中。本章将讲解如何将它们结合在一起使用。
Netty提供了简单统一的方法来引导服务器和客户端。引导是配置Netty服务器和客户端程序的一个过程,Bootstrap允许这些应用程序很容易的重复使用。Netty程序的客户端和服务器都可以使用Bootstrap,其目的是简化编码过程,Bootstrap还提供了一个机制就是让一些组件(channels,pipeline,handlers等等)都可以在后台工作。本章将具体结合以下部分一起使用开发Netty程序:
· EventLoopGroup
· Channel
· 设置ChannelOption
· Channel被注册后将调用ChannelHandler
· 添加指定的属性到Channel
· 设置本地和远程地址
· 绑定、连接(取决于类型)
知道如何使用各个Bootstrap后就可以使用它们配置服务器和客户端了。本章还将学习在什么会后可以共享一个Bootstrap以及为什么这样做,结合我们之前学习的知识点来编写Netty程序。
Netty包含了2个不同类型的引导,第一个是使用服务器的ServerBootstrap,用来接受客户端连接以及为已接受的连接创建子通道;第二个是用于客户端的Bootstrap,不接受新的连接,并且是在父通道类完成一些操作。
还有一种情况是处理DatagramChannel实例,这些用于UDP协议,是无连接的。换句话说,由于UDP的性质,所以当处理UDP数据时没有必要每个连接通道与TCP连接一样。因为通道不需要连接后才能发送数据,UDP是无连接协议。一个通道可以处理所有的数据而不需要依赖子通道。
下图是引导的类关系图:
我们在前面讨论了许多用于客户端和服务器的知识,为了对客户端和服务器之间的关系提供了一个共同点,Netty使用AbstractBootstrap类。通过一个共同的父类,在本章中讨论的客户端和服务器的引导程序能够重复使用通用功能,而无需复制代码或逻辑。通常情况下,多个通道使用相同或非常类似的设置时有必要的。而不是为每一个通道创建一个新的引导,Netty使得AbstractBootstrap可复制。也就是说克隆一个已配置的引导,其返回的是一个可重用而无需配置的引导。Netty的克隆操作只能浅拷贝引导的EventLoopGroup,也就是说EventLoopGroup在所有的克隆的通道中是共享的。这是一个好事情,克隆的通道一般是短暂的,例如一个通道创建一个HTTP请求。
本章主要讲解Bootstrap和ServerBootstrap,首先我们来看看ServerBootstrap。
当需要引导客户端或一些无连接协议时,需要使用Bootstrap类。
创建Bootstrap实例使用new关键字,下面是Bootstrap的方法:
· group(...),设置EventLoopGroup,EventLoopGroup用来处理所有通道的IO事件
· channel(...),设置通道类型
· channelFactory(...),使用ChannelFactory来设置通道类型
· localAddress(...),设置本地地址,也可以通过bind(...)或connect(...)
· option(ChannelOption<T>, T),设置通道选项,若使用null,则删除上一个设置的ChannelOption
· attr(AttributeKey<T>, T),设置属性到Channel,若值为null,则指定键的属性被删除
· handler(ChannelHandler),设置ChannelHandler用于处理请求事件
· clone(),深度复制Bootstrap,Bootstrap的配置相同
· remoteAddress(...),设置连接地址
· connect(...),连接远程通道
· bind(...),创建一个新的Channel并绑定
引导负责客户端通道连接或断开连接,因此它将在调用bind(...)或connect(...)后创建通道。下图显示了如何工作:
下面代码显示了引导客户端使用NIO TCP传输:
[html] view plaincopy
1. package netty.in.action;
2.
3. import io.netty.bootstrap.Bootstrap;
4. import io.netty.buffer.ByteBuf;
5. import io.netty.channel.ChannelFuture;
6. import io.netty.channel.ChannelFutureListener;
7. import io.netty.channel.ChannelHandlerContext;
8. import io.netty.channel.EventLoopGroup;
9. import io.netty.channel.SimpleChannelInboundHandler;
10. import io.netty.channel.nio.NioEventLoopGroup;
11. import io.netty.channel.socket.nio.NioSocketChannel;
12.
13. /**
14. * 引导配置客户端
15. *
16. * @author c.k
17. *
18. */
19. public class BootstrapingClient {
20.
21. public static void main(String[] args) throws Exception {
22. EventLoopGroup group = new NioEventLoopGroup();
23. Bootstrap b = new Bootstrap();
24. b.group(group).channel(NioSocketChannel.class).handler(new SimpleChannelInboundHandler<ByteBuf>() {
25. @Override
26. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
27. System.out.println("Received data");
28. msg.clear();
29. }
30. });
31. ChannelFuture f = b.connect("127.0.0.1", 2048);
32. f.addListener(new ChannelFutureListener() {
33. @Override
34. public void operationComplete(ChannelFuture future) throws Exception {
35. if (future.isSuccess()) {
36. System.out.println("connection finished");
37. } else {
38. System.out.println("connection failed");
39. future.cause().printStackTrace();
40. }
41. }
42. });
43. }
44. }
Channel的实现和EventLoop的处理过程在EventLoopGroup中必须兼容,哪些Channel是和EventLoopGroup是兼容的可以查看API文档。经验显示,相兼容的实现一般在同一个包下面,例如使用NioEventLoop,NioEventLoopGroup和NioServerSocketChannel在一起。请注意,这些都是前缀“Nio”,然后不会用这些代替另一个实现和另一个前缀,如“Oio”,也就是说OioEventLoopGroup和NioServerSocketChannel是不相容的。
Channel和EventLoopGroup的EventLoop必须相容,例如NioEventLoop、NioEventLoopGroup、NioServerSocketChannel是相容的,但是OioEventLoopGroup和NioServerSocketChannel是不相容的。从类名可以看出前缀是“Nio”的只能和“Nio”的一起使用,“Oio”前缀的只能和Oio*一起使用,将不相容的一起使用会导致错误异常,如OioSocketChannel和NioEventLoopGroup一起使用时会抛出异常:Exception in thread "main" java.lang.IllegalStateException: incompatible event loop type。
先看看ServerBootstrap提供了哪些方法
· group(...),设置EventLoopGroup事件循环组
· channel(...),设置通道类型
· channelFactory(...),使用ChannelFactory来设置通道类型
· localAddress(...),设置本地地址,也可以通过bind(...)或connect(...)
· option(ChannelOption<T>, T),设置通道选项,若使用null,则删除上一个设置的ChannelOption
· childOption(ChannelOption<T>, T),设置子通道选项
· attr(AttributeKey<T>, T),设置属性到Channel,若值为null,则指定键的属性被删除
· childAttr(AttributeKey<T>, T),设置子通道属性
· handler(ChannelHandler),设置ChannelHandler用于处理请求事件
· childHandler(ChannelHandler),设置子ChannelHandler
· clone(),深度复制ServerBootstrap,且配置相同
· bind(...),创建一个新的Channel并绑定
下图显示ServerBootstrap管理子通道:
child*方法是在子Channel上操作,通过ServerChannel来管理。
下面代码显示使用ServerBootstrap引导配置服务器:
[java] view plaincopy
1. package netty.in.action;
2.
3. import io.netty.bootstrap.ServerBootstrap;
4. import io.netty.buffer.ByteBuf;
5. import io.netty.channel.ChannelFuture;
6. import io.netty.channel.ChannelFutureListener;
7. import io.netty.channel.ChannelHandlerContext;
8. import io.netty.channel.EventLoopGroup;
9. import io.netty.channel.SimpleChannelInboundHandler;
10. import io.netty.channel.nio.NioEventLoopGroup;
11. import io.netty.channel.socket.nio.NioServerSocketChannel;
12.
13. /**
14. * 引导服务器配置
15. * @author c.k
16. *
17. */
18. public class BootstrapingServer {
19.
20. public static void main(String[] args) throws Exception {
21. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
22. EventLoopGroup workerGroup = new NioEventLoopGroup();
23. ServerBootstrap b = new ServerBootstrap();
24. b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
25. .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
26. @Override
27. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
28. System.out.println("Received data");
29. msg.clear();
30. }
31. });
32. ChannelFuture f = b.bind(2048);
33. f.addListener(new ChannelFutureListener() {
34. @Override
35. public void operationComplete(ChannelFuture future) throws Exception {
36. if (future.isSuccess()) {
37. System.out.println("Server bound");
38. } else {
39. System.err.println("bound fail");
40. future.cause().printStackTrace();
41. }
42. }
43. });
44. }
45. }
有时候需要从另一个Channel引导客户端,例如写一个代理或需要从其他系统检索数据。从其他系统获取数据时比较常见的,有很多Netty应用程序必须要和企业现有的系统集成,如Netty程序与内部系统进行身份验证,查询数据库等。
当然,你可以创建一个新的引导,这样做没有什么不妥,只是效率不高,因为要为新创建的客户端通道使用另一个EventLoop,如果需要在已接受的通道和客户端通道之间交换数据则需要切换上下文线程。Netty对这方面进行了优化,可以讲已接受的通道通过eventLoop(...)传递到EventLoop,从而使客户端通道在相同的EventLoop里运行。这消除了额外的上下文切换工作,因为EventLoop继承于EventLoopGroup。除了消除上下文切换,还可以在不需要创建多个线程的情况下使用引导。
为什么要共享EventLoop呢?一个EventLoop由一个线程执行,共享EventLoop可以确定所有的Channel都分配给同一线程的EventLoop,这样就避免了不同线程之间切换上下文,从而减少资源开销。
下图显示相同的EventLoop管理两个Channel:
看下面代码:
[java] view plaincopy
1. package netty.in.action;
2.
3. import java.net.InetSocketAddress;
4.
5. import io.netty.bootstrap.Bootstrap;
6. import io.netty.bootstrap.ServerBootstrap;
7. import io.netty.buffer.ByteBuf;
8. import io.netty.channel.ChannelFuture;
9. import io.netty.channel.ChannelFutureListener;
10. import io.netty.channel.ChannelHandlerContext;
11. import io.netty.channel.EventLoopGroup;
12. import io.netty.channel.SimpleChannelInboundHandler;
13. import io.netty.channel.nio.NioEventLoopGroup;
14. import io.netty.channel.socket.nio.NioServerSocketChannel;
15. import io.netty.channel.socket.nio.NioSocketChannel;
16.
17. /**
18. * 从Channel引导客户端
19. *
20. * @author c.k
21. *
22. */
23. public class BootstrapingFromChannel {
24.
25. public static void main(String[] args) throws Exception {
26. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
27. EventLoopGroup workerGroup = new NioEventLoopGroup();
28. ServerBootstrap b = new ServerBootstrap();
29. b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
30. .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
31. ChannelFuture connectFuture;
32.
33. @Override
34. public void channelActive(ChannelHandlerContext ctx) throws Exception {
35. Bootstrap b = new Bootstrap();
36. b.channel(NioSocketChannel.class).handler(
37. new SimpleChannelInboundHandler<ByteBuf>() {
38. @Override
39. protected void channelRead0(ChannelHandlerContext ctx,
40. ByteBuf msg) throws Exception {
41. System.out.println("Received data");
42. msg.clear();
43. }
44. });
45. b.group(ctx.channel().eventLoop());
46. connectFuture = b.connect(new InetSocketAddress("127.0.0.1", 2048));
47. }
48.
49. @Override
50. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
51. throws Exception {
52. if (connectFuture.isDone()) {
53. // do something with the data
54. }
55. }
56. });
57. ChannelFuture f = b.bind(2048);
58. f.addListener(new ChannelFutureListener() {
59. @Override
60. public void operationComplete(ChannelFuture future) throws Exception {
61. if (future.isSuccess()) {
62. System.out.println("Server bound");
63. } else {
64. System.err.println("bound fail");
65. future.cause().printStackTrace();
66. }
67. }
68. });
69. }
70. }
在所有的例子代码中,我们在引导过程中通过handler(...)或childHandler(...)都只添加了一个ChannelHandler实例,对于简单的程序可能足够,但是对于复杂的程序则无法满足需求。例如,某个程序必须支持多个协议,如HTTP、WebSocket。若在一个ChannelHandler中处理这些协议将导致一个庞大而复杂的ChannelHandler。Netty通过添加多个ChannelHandler,从而使每个ChannelHandler分工明确,结构清晰。
Netty的一个优势是可以在ChannelPipeline中堆叠很多ChannelHandler并且可以最大程度的重用代码。如何添加多个ChannelHandler呢?Netty提供ChannelInitializer抽象类用来初始化ChannelPipeline中的ChannelHandler。ChannelInitializer是一个特殊的ChannelHandler,通道被注册到EventLoop后就会调用ChannelInitializer,并允许将ChannelHandler添加到CHannelPipeline;完成初始化通道后,这个特殊的ChannelHandler初始化器会从ChannelPipeline中自动删除。
听起来很复杂,其实很简单,看下面代码:
[java] view plaincopy
1. package netty.in.action;
2.
3. import io.netty.bootstrap.ServerBootstrap;
4. import io.netty.channel.Channel;
5. import io.netty.channel.ChannelFuture;
6. import io.netty.channel.ChannelInitializer;
7. import io.netty.channel.EventLoopGroup;
8. import io.netty.channel.nio.NioEventLoopGroup;
9. import io.netty.channel.socket.nio.NioServerSocketChannel;
10. import io.netty.handler.codec.http.HttpClientCodec;
11. import io.netty.handler.codec.http.HttpObjectAggregator;
12.
13. /**
14. * 使用ChannelInitializer初始化ChannelHandler
15. * @author c.k
16. *
17. */
18. public class InitChannelExample {
19.
20. public static void main(String[] args) throws Exception {
21. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
22. EventLoopGroup workerGroup = new NioEventLoopGroup();
23. ServerBootstrap b = new ServerBootstrap();
24. b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
25. .childHandler(new ChannelInitializerImpl());
26. ChannelFuture f = b.bind(2048).sync();
27. f.channel().closeFuture().sync();
28. }
29.
30. static final class ChannelInitializerImpl extends ChannelInitializer<Channel>{
31. @Override
32. protected void initChannel(Channel ch) throws Exception {
33. ch.pipeline().addLast(new HttpClientCodec())
34. .addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
35. }
36. }
37.
38. }
比较麻烦的是创建通道后不得不手动配置每个通道,为了避免这种情况,Netty提供了ChannelOption来帮助引导配置。这些选项会自动应用到引导创建的所有通道,可用的各种选项可以配置底层连接的详细信息,如通道“keep-alive(保持活跃)”或“timeout(超时)”的特性。
Netty应用程序通常会与组织或公司其他的软件进行集成,在某些情况下,Netty的组件如通道、传递和Netty正常生命周期外使用;在这样的情况下并不是所有的一般属性和数据时可用的。这只是一个例子,但在这样的情况下,Netty提供了通道属性(channel attributes)。
属性可以将数据和通道以一个安全的方式关联,这些属性只是作用于客户端和服务器的通道。例如,例如客户端请求web服务器应用程序,为了跟踪通道属于哪个用户,应用程序可以存储用的ID作为通道的一个属性。任何对象或数据都可以使用属性被关联到一个通道。
使用ChannelOption和属性可以让事情变得很简单,例如Netty WebSocket服务器根据用户自动路由消息,通过使用属性,应用程序能在通道存储用户ID以确定消息应该发送到哪里。应用程序可以通过使用一个通道选项进一步自动化,给定时间内没有收到消息将自动断开连接。看下面代码:
[java] view plaincopy
1. public static void main(String[] args) {
2. //创建属性键对象
3. final AttributeKey<Integer> id = AttributeKey.valueOf("ID");
4. //客户端引导对象
5. Bootstrap b = new Bootstrap();
6. //设置EventLoop,设置通道类型
7. b.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
8. //设置ChannelHandler
9. .handler(new SimpleChannelInboundHandler<ByteBuf>() {
10. @Override
11. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
12. throws Exception {
13. System.out.println("Reveived data");
14. msg.clear();
15. }
16.
17. @Override
18. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
19. //通道注册后执行,获取属性值
20. Integer idValue = ctx.channel().attr(id).get();
21. System.out.println(idValue);
22. //do something with the idValue
23. }
24. });
25. //设置通道选项,在通道注册后或被创建后设置
26. b.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
27. //设置通道属性
28. b.attr(id, 123456);
29. ChannelFuture f = b.connect("www.manning.com",80);
30. f.syncUninterruptibly();
31. }
前面都是引导基于TCP的SocketChannel,引导也可以用于无连接的传输协议如UDP,Netty提供了DatagramChannel,唯一的区别是不会connecte(...),只能bind(...)。看下面代码:
[java] view plaincopy
1. public static void main(String[] args) {
2. Bootstrap b = new Bootstrap();
3. b.group(new OioEventLoopGroup()).channel(OioDatagramChannel.class)
4. .handler(new SimpleChannelInboundHandler<DatagramPacket>() {
5. @Override
6. protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg)
7. throws Exception {
8. // do something with the packet
9. }
10. });
11. ChannelFuture f = b.bind(new InetSocketAddress(0));
12. f.addListener(new ChannelFutureListener() {
13. @Override
14. public void operationComplete(ChannelFuture future) throws Exception {
15. if (future.isSuccess()) {
16. System.out.println("Channel bound");
17. } else {
18. System.err.println("Bound attempt failed");
19. future.cause().printStackTrace();
20. }
21. }
22. });
23. }
Netty有默认的配置设置,多数情况下,我们不需要改变这些配置,但是在需要时,我们可以细粒度的控制如何工作及处理数据。
In this chapter you learned how to bootstrap your Netty-based server and client implementation. You learned how you can specify configuration options that affect the and how you can use attributes to attach information to a channel and use it later. You also learned how to bootstrap connectionless protocol-based applications and how they are different from connection-based ones. The next chapters will focus on Netty in Action by using it to implement real-world applications. This will help you extract all interesting pieces for reuse in your next application. At this point you should be able to start coding!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。