赞
踩
点击上方 Java后端,选择 设为星标
优质文章,及时送达作者 | HongJie
链接 | javadoop.com/post/nio-and-aio 本文将介绍 Java NIO 中三大组件 Buffer、Channel、Selector 的使用。 本来要一起介绍非阻塞 IO 和 JDK7 的异步 IO 的,不过因为之前的文章真的太长了,有点影响读者阅读,所以这里将它们放到另一篇文章中进行介绍。allocate(int capacity)
帮助我们快速实例化一个 Buffer。如:
ByteBuffer byteBuf = ByteBuffer.allocate(1024);IntBuffer intBuf = IntBuffer.allocate(1024);LongBuffer longBuf = LongBuffer.allocate(1024);// ...
另外,我们经常使用 wrap 方法来初始化一个 Buffer。
publicstatic ByteBuffer wrap(byte[] array) { ...}
// 填充一个 byte 值publicabstract ByteBuffer put(byte b);// 在指定位置填充一个 int 值publicabstract ByteBuffer put(int index, byte b);// 将一个数组中的值填充进去publicfinal ByteBuffer put(byte[] src) {...}public ByteBuffer put(byte[] src, int offset, int length) {...}
上述这些方法需要自己控制 Buffer 大小,不能超过 capacity,超过会抛 java.nio.BufferOverflowException 异常。
对于 Buffer 来说,另一个常见的操作中就是,我们要将来自 Channel 的数据填充到 Buffer 中,在系统层面上,这个操作我们称为读操作,因为数据是从外部(文件或网络等)读到内存中。
int num = channel.read(buf);
上述方法会返回从 Channel 中读入到 Buffer 的数据大小。
publicfinal Buffer flip() { limit = position; // 将 limit 设置为实际写入的数据数量 position = 0; // 重置 position 为 0 mark = -1; // mark 之后再说return this;}
对应写入操作的一系列 put 方法,读操作提供了一系列的 get 方法:
// 根据 position 来获取数据publicabstractbyteget();// 获取指定位置的数据publicabstractbyteget(int index);// 将 Buffer 中的数据写入到数组中public ByteBuffer get(byte[] dst)
附一个经常使用的方法:
new String(buffer.array()).trim();
当然了,除了将数据从 Buffer 取出来使用,更常见的操作是将我们写入的数据传输到 Channel 中,如通过 FileChannel 将数据写入到文件中,通过 SocketChannel 将数据写入网络发送到远程机器等。对应的,这种操作,我们称之为写操作。
int num = channel.write(buf);
publicfinal Buffer mark() { mark = position;return this;}
那到底什么时候用呢?考虑以下场景,我们在 position 为 5 的时候,先 mark() 一下,然后继续往下读,读到第 10 的时候,我想重新回到 position 为 5 的地方重新来一遍,那只要调一下 reset() 方法,position 就回到 5 了。
publicfinal Buffer reset() {int m = mark;if (m < 0)throw new InvalidMarkException(); position = m;return this;}
publicfinal Buffer rewind() { position = 0; mark = -1;return this;}
clear():有点重置 Buffer 的意思,相当于重新实例化了一样。
通常,我们会先填充 Buffer,然后从 Buffer 读取数据,之后我们再重新往里填充新的数据,我们一般在重新填充之前先调用 clear()。
publicfinal Buffer clear() { position = 0; limit = capacity; mark = -1;return this;}
compact():和 clear() 一样的是,它们都是在准备往 Buffer 填充新的数据之前调用。
前面说的 clear() 方法会重置几个属性,但是我们要看到,clear() 方法并不会将 Buffer 中的数据清空,只不过后续的写入会覆盖掉原来的数据,也就相当于清空了数据了。
而 compact() 方法有点不一样,调用这个方法以后,会先处理还没有读取的数据,也就是 position 到 limit 之间的数据(还没有读过的数据),先将这些数据移到左边,然后在这个基础上再开始写入。很明显,此时 limit 还是等于 capacity,position 指向原来数据的右边。
FileChannel:文件通道,用于文件的读和写
DatagramChannel:用于 UDP 连接的接收和发送
SocketChannel:把它理解为 TCP 连接通道,简单理解就是 TCP 客户端
ServerSocketChannel:TCP 对应的服务端,用于监听某个端口进来的请求
FileInputStream inputStream = new FileInputStream(new File("/data.txt"));FileChannel fileChannel = inputStream.getChannel();
当然了,我们也可以从 RandomAccessFile#getChannel 来得到 FileChannel。
读取文件内容:
ByteBuffer buffer = ByteBuffer.allocate(1024);int num = fileChannel.read(buffer);
前面我们也说了,所有的 Channel 都是和 Buffer 打交道的。
写入文件内容:
ByteBuffer buffer = ByteBuffer.allocate(1024);buffer.put("随机写入一些内容到 Buffer 中".getBytes());// Buffer 切换为读模式buffer.flip();while(buffer.hasRemaining()) {// 将 Buffer 中的内容写入文件 fileChannel.write(buffer);}
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("https://www.javadoop.com", 80));
当然了,上面的这行代码等价于下面的两行:
// 打开一个通道SocketChannel socketChannel = SocketChannel.open();// 发起连接socketChannel.connect(new InetSocketAddress("https://www.javadoop.com", 80));
SocketChannel 的读写和 FileChannel 没什么区别,就是操作缓冲区。
// 读取数据socketChannel.read(buffer);// 写入数据到网络连接中while(buffer.hasRemaining()) { socketChannel.write(buffer);}
不要在这里停留太久,先继续往下走。
// 实例化ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 监听 8080 端口serverSocketChannel.socket().bind(new InetSocketAddress(8080));while (true) {// 一旦有一个 TCP 连接进来,就对应创建一个 SocketChannel 进行处理 SocketChannel socketChannel = serverSocketChannel.accept();}
这里我们可以看到 SocketChannel 的第二个实例化方式到这里,我们应该能理解 SocketChannel 了,它不仅仅是 TCP 客户端,它代表的是一个网络通道,可读可写。 ServerSocketChannel 不和 Buffer 打交道了,因为它并不实际处理数据,它一旦接收到请求后,实例化 SocketChannel,之后在这个连接通道上的数据传递它就不管了,因为它需要继续监听端口,等待下一个连接。
科普一下,UDP 是面向无连接的,不需要和对方握手,不需要通知对方,就可以直接将数据包投出去,至于能不能送达,它是不知道的监听端口:
DatagramChannel channel = DatagramChannel.open();channel.socket().bind(new InetSocketAddress(9090));
ByteBuffer buf = ByteBuffer.allocate(48);channel.receive(buf);
发送数据:
String newData = "New String to write to file..." + System.currentTimeMillis();ByteBuffer buf = ByteBuffer.allocate(48);buf.put(newData.getBytes());buf.flip();int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com", 80));
首先,我们开启一个 Selector。你们爱翻译成选择器也好,多路复用器也好。
Selector selector = Selector.open();
将 Channel 注册到 Selector 上。前面我们说了,Selector 建立在非阻塞模式之上,所以注册到 Selector 的 Channel 必须要支持非阻塞模式,FileChannel 不支持非阻塞,我们这里讨论最常见的 SocketChannel 和 ServerSocketChannel。
// 将通道设置为非阻塞模式,因为默认都是阻塞模式的channel.configureBlocking(false);// 注册SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
register 方法的第二个 int 型参数(使用二进制的标记位)用于表明需要监听哪些感兴趣的事件,共以下四种事件:
我们可以同时监听一个 Channel 中的发生的多个事件,比如我们要监听 ACCEPT 和 READ 事件,那么指定参数为二进制的 00010001 即十进制数值 17 即可。
注册方法返回值是 SelectionKey 实例,它包含了 Channel 和 Selector 信息,也包括了一个叫做 Interest Set 的信息,即我们设置的我们感兴趣的正在监听的事件集合。
SelectionKey.OP_READ
对应 00000001,通道中有数据可以进行读取
SelectionKey.OP_WRITE
对应 00000100,可以往通道中写入数据
SelectionKey.OP_CONNECT
对应 00001000,成功建立 TCP 连接
SelectionKey.OP_ACCEPT
对应 00010000,接受 TCP 连接
调用 select() 方法获取通道信息。用于判断是否有我们感兴趣的事件已经发生了。
Selector 的操作就是以上 3 步,这里来一个简单的示例,大家看一下就好了。之后在介绍非阻塞 IO 的时候,会演示一份可执行的示例代码。Selector selector = Selector.open();channel.configureBlocking(false);SelectionKey key = channel.register(selector, SelectionKey.OP_READ);while(true) {// 判断是否有事件准备好int readyChannels = selector.select();if(readyChannels == 0) continue;// 遍历 Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator();while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next();if(key.isAcceptable()) {// a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) {// a connection was established with a remote server. } else if (key.isReadable()) {// a channel is ready for reading } else if (key.isWritable()) {// a channel is ready for writing } keyIterator.remove(); }}
对于 Selector,我们还需要非常熟悉以下几个方法:
select()
调用此方法,会将上次 select 之后的准备好的 channel 对应的 SelectionKey 复制到 selected set 中。如果没有任何通道准备好,这个方法会阻塞,直到至少有一个通道准备好。
selectNow()
功能和 select 一样,区别在于如果没有准备好的通道,那么此方法会立即返回 0。
select(long timeout)
看了前面两个,这个应该很好理解了,如果没有通道准备好,此方法会等待一会
wakeup()
这个方法是用来唤醒等待在 select() 和 select(timeout) 上的线程的。如果 wakeup() 先被调用,此时没有线程在 select 上阻塞,那么之后的一个 select() 或 select(timeout) 会立即返回,而不会阻塞,当然,它只会作用一次。
学Java,请关注公众号:Java后端
喜欢文章,点个 在看Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。