赞
踩
上篇基于Java Socket实现同步非阻塞通信中展示了非阻塞的聊天示例,ServerSocket#accept接收连接后,会创建一个不断轮训是否有读写数据的线程。不断轮训是很消耗CPU资源的,本篇基于Java NIO Selector的多路复用IO模型,将解决这一问题。
方法:Selector可监听Channel的OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE状态,然后分发给Handler处理器。关联了Socketd的SocketChannel可以使用configureBlocking方法将自己设置为非阻塞状态。
示例代码使用Selector来监听SocketChannel中是否有数据可读、ServerSocketChannel是否接收的新的连接请求。该检测是阻塞的,当有事件可处理时才通知处理器去处理。因为是聊天示例,所以需要对每个网络连接关联一个阻塞的发消息线程,平常挂起,有输入时发送消息,但是对每个连接都不断地轮训Socket中是否有数据了。
多路复用IO模型代码
Server.java
: IO模型的主要部分。如果Channel在向Selector注册时附加了Buffer,当buffer中数据没有处理完的话,下次select()还会选中该SocketChannel。如果事情处理完了,可以取消注册,即使Selector中没有注册Channel,select()方法还是会阻塞的。
Client.java
: 客户端是否阻塞都行,为了练习NIO,我将客户端写成了使用Selector、非阻塞的形式。
ConsoleThread.java
发送消息的线程类。
IO不可中断等待状态线程的改写
结束通信时需要关闭SocketChannel和发消息线程。如果在发消息线程中使用了
BufferedReader#readLine()
方法,当该方法阻塞时,是不被外部控制的,即使发消息线程调用interrupt方法,该线程也不会被中断结束。
因此,需要将读取操作改为挂起可中断的IO读形式:先用不可阻塞的BufferedReader#ready()方法判断数据是否准备好,若无则使用Thread.sleep(1000)方法将线程挂起1秒,否则读取数据。详细写法见ConsoleThread.java代码片段。
附I/O复用模型图
package syncnonblocking; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class Server { private static final int BUF_SIZE = 256; public static void main(String[] args) { try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); Selector selector = Selector.open();){ serverSocketChannel.configureBlocking(false); // non blocking serverSocketChannel.socket().bind(new InetSocketAddress(13579)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while(selector.select() > 0) { // blocking method. when a client request for accept, selector.selector() > 1 is true Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while(iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); switch (key.readyOps()) { case SelectionKey.OP_ACCEPT: // accept from connection ServerSocketChannel clientChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = clientChannel.accept(); socketChannel.configureBlocking(false); // 如果注册时添加了ByteBuffer,只要buf不空,OP_READ会被一直调用 ConsoleThread thread = new ConsoleThread(socketChannel); thread.start(); socketChannel.register(selector, SelectionKey.OP_READ, thread); break; case SelectionKey.OP_READ: // read from a channel ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); SocketChannel channel = (SocketChannel) key.channel(); channel.read(buffer); buffer.flip(); byte[] tmp = new byte[buffer.limit() - buffer.position()]; buffer.get(tmp); // buffer.clear(); String msg = new String(tmp); if (msg.equals("exit")) { channel.write(ByteBuffer.wrap("exit".getBytes())); ((ConsoleThread)key.attachment()).interrupt(); key.cancel(); } else { System.out.println(msg); } break; } } } } catch (IOException e) { e.printStackTrace(); } } }
package syncnonblocking; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class Client { private static final int BUF_SIZE = 1024 * 2; public static void main(String[] args) { ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); try (SocketChannel clientChannel = SocketChannel.open(); Selector selector = Selector.open();) { clientChannel.connect(new InetSocketAddress("127.0.0.1", 13579)); clientChannel.configureBlocking(false); ConsoleThread thread = new ConsoleThread(clientChannel);; clientChannel.register(selector, SelectionKey.OP_READ, thread); thread.start(); boolean flag = true; // 客户端没有必要使用Selector,我是为了练习而使用的。或者P2P通信时可以这样写 while(flag && selector.select() > 0) { // 不要先select()再flag,因为select会阻塞flag的判断 Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while(iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { SocketChannel readChannel = (SocketChannel) key.channel(); buf.clear(); readChannel.read(buf); buf.flip(); byte[] tmp = new byte[buf.limit()]; buf.get(tmp); buf.clear(); String msg = new String(tmp); if (msg.equals("exit")) { readChannel.write(ByteBuffer.wrap("exit".getBytes())); ((ConsoleThread)key.attachment()).interrupt(); key.cancel(); flag = false; } else { System.out.println(msg); } } } } } catch (IOException e) { e.printStackTrace(); } } }
package syncnonblocking; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class ConsoleThread extends Thread { private SocketChannel socketChannel; public ConsoleThread(SocketChannel socketChannel) { this.socketChannel = socketChannel; } @Override public void run() { try (BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));) { while (!isInterrupted()) { if (reader.ready()) { String msg = reader.readLine(); ByteBuffer buf = ByteBuffer.wrap(msg.getBytes()); socketChannel.write(buf); } else { try { Thread.sleep(1000); } catch (InterruptedException e) { break; } } } } catch (IOException e) { e.printStackTrace(); } // System.out.println("ConsoleThread is over."); } }
顺带撸一遍Java NIO包含什么
- 通道Channel: FileChannel、DatagramChannel、SocketChannel、ServerSocketChannel、AsynchronousFileChannel
- 缓冲Buffer:ByteBuffer, CharBuffer, ShortBuffer, IntBuffer, FloatBuffer, LongBuffer, DoubleBuffer, MappedByteBUffer。有时需要注意数据的的大小端问题。
- 选择器Selector:open、select方法、SelectionKey。
- Pipe管道
- Paths path: get, normalize
- Files: exies, createDirectory, copy, move, move, walkFileTree
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。