赞
踩
之前的socket实现rpc中我们使用了类似下面的代码来从服务端获取客户端的输入:
while(true){
line = br.readLine();
if(line.equals("goodbye")){
break;
}
// handle business
}
这种的处理明显是有些问题的。如果因为某些原因如突然断电等原因导致客户端退出或卡顿,即使没有数据的交互服务器也必须保持这个线程。这样会造成服务器资源的严重浪费。甚至使服务器无法继续提供服务。这种处理方式称为同步-阻塞io.
服务器开启后一直等待客户端连接,一旦有客户端连接后使用主线程与客户端交互,等客户端断开后,再次监听新的客户端连接。在这种情况下服务器一次只能服务一个客户端,必须等客户端完成后才能再次服务一个客户端。效率极低。
服务器开启后一直等待客户端连接,一旦有客户端连接后启动一个新线程来与客户端交互。
首先向内核注册一个监听器(selector),一旦操作系统发现你注册的selector有事件到来,会回调通知应用。接到通知后再注册建立连接的监听。一旦内核与客户端建立了连接,然后会再次接到连接成功的通知。应用程序再次注册一个数据可读的监听,等到客户端有数据到来时应用程序会再次收到通知,此时可以直接读入数据。
这样基于监听的机制使得服务器拥有极高的性能。
nio并不是快,它的流程很复杂。在发送文件的场景下,传统io的效率要远高于nio。
nio的优势不在于数据传输的速度,它的优势体现在能增加服务端的并发量和响应速度。
传统io中accept过程是一个阻塞过程,在实际网络过程中可能阻塞很久。然后为一个客户端启动一个Thread,read和write过程中的阻塞也会很严重。
传统io还有一个问题,系统容纳的Thread是有限的,如果大量客户端连接,Thread也会很多最终降低服务器性能。
nio如何解决这个问题? – 通过上面所述的监听机制。
nio的代码写起来比较繁琐,这里直接给出来了。关键的地方有注释。
MultiplexerTimeServer.java
package com.mrbcy.bigdata.basic.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; public MultiplexerTimeServer(int port){ try { selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port),1024); servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port: " + port); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); System.exit(1); } } public void stop(){ this.stop = true; } @Override public void run() { while (!stop) { try { selector.select(1000); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch (Exception e) { // TODO: handle exception if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(selector != null){ try { selector.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ // 处理新接入的请求消息 if (key.isAcceptable()){ // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // Add new connection to selector sc.register(selector, SelectionKey.OP_READ); } if(key.isReadable()) { // read data SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if(readBytes > 0){ readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER" .equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); }else if (readBytes < 0) { // 对端链路关闭 key.cancel(); sc.close(); } } } } private void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }
TimeClientHandle.java
package com.mrbcy.bigdata.basic.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class TimeClientHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandle(String host, int port) { this.host = host == null ? "127.0.0.1" : host; this.port = port; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); } } // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void doConnect() throws IOException { // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答 if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else socketChannel.register(selector, SelectionKey.OP_CONNECT); } private void doWrite(SocketChannel sc) throws IOException { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) System.out.println("Send order 2 server succeed."); } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 判断是否连接成功 SocketChannel sc = (SocketChannel) key.channel(); if (key.isConnectable()) { if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWrite(sc); } else System.exit(1);// 连接失败,进程退出 } if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); this.stop = true; } else if (readBytes < 0) { // 对端链路关闭 key.cancel(); sc.close(); } else ; // 读到0字节,忽略 } } } }
TimeServer.java
package com.mrbcy.bigdata.basic.nio; import java.io.IOException; public class TimeServer { /** * @param args * @throws IOException * @author blackcoder */ public static void main(String[] args) throws IOException { int port = 5566; if (args != null && args.length<0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start(); } }
TimeClient.java
package com.mrbcy.bigdata.basic.nio; public class TimeClient { /** * @param args */ public static void main(String[] args) { int port = 5566; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001") .start(); } }
服务器端输出:
The time server is start in port: 5566
The time server receive order : QUERY TIME ORDER
客户端输出:
Send order 2 server succeed.
Now is : Thu Feb 02 01:04:36 CST 2017
那个轮询器看着很别扭。可以到时候再看看AIO
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。