当前位置:   article > 正文

基于Java NIO Selector的多路复用IO模型(同步非阻塞)及IO不可中断等待状态线程的改写_结合selector实现非阻塞服务器,通过selector实现工0多路复用模型、

结合selector实现非阻塞服务器,通过selector实现工0多路复用模型、

上篇基于Java Socket实现同步非阻塞通信中展示了非阻塞的聊天示例,ServerSocket#accept接收连接后,会创建一个不断轮训是否有读写数据的线程。不断轮训是很消耗CPU资源的,本篇基于Java NIO Selector的多路复用IO模型,将解决这一问题。
方法:Selector可监听Channel的OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE状态,然后分发给Handler处理器。关联了Socketd的SocketChannel可以使用configureBlocking方法将自己设置为非阻塞状态。
示例代码使用Selector来监听SocketChannel中是否有数据可读、ServerSocketChannel是否接收的新的连接请求。该检测是阻塞的,当有事件可处理时才通知处理器去处理。因为是聊天示例,所以需要对每个网络连接关联一个阻塞的发消息线程,平常挂起,有输入时发送消息,但是对每个连接都不断地轮训Socket中是否有数据了。

多路复用IO模型代码
Server.java: IO模型的主要部分。如果Channel在向Selector注册时附加了Buffer,当buffer中数据没有处理完的话,下次select()还会选中该SocketChannel。如果事情处理完了,可以取消注册,即使Selector中没有注册Channel,select()方法还是会阻塞的。
Client.java : 客户端是否阻塞都行,为了练习NIO,我将客户端写成了使用Selector、非阻塞的形式。
ConsoleThread.java 发送消息的线程类。

IO不可中断等待状态线程的改写

结束通信时需要关闭SocketChannel和发消息线程。如果在发消息线程中使用了BufferedReader#readLine()方法,当该方法阻塞时,是不被外部控制的,即使发消息线程调用interrupt方法,该线程也不会被中断结束。
在这里插入图片描述
因此,需要将读取操作改为挂起可中断的IO读形式:先用不可阻塞的BufferedReader#ready()方法判断数据是否准备好,若无则使用Thread.sleep(1000)方法将线程挂起1秒,否则读取数据。详细写法见ConsoleThread.java代码片段。
在这里插入图片描述

附I/O复用模型图
在这里插入图片描述

package syncnonblocking;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class Server {
    private static final int BUF_SIZE = 256;
    public static void main(String[] args) {
        try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
             Selector selector = Selector.open();){
            serverSocketChannel.configureBlocking(false); // non blocking
            serverSocketChannel.socket().bind(new InetSocketAddress(13579));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while(selector.select() > 0) {  // blocking method. when a client request for accept, selector.selector() > 1 is true
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while(iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    switch (key.readyOps()) {
                        case SelectionKey.OP_ACCEPT:  // accept from connection
                            ServerSocketChannel clientChannel = (ServerSocketChannel) key.channel();
                            SocketChannel socketChannel = clientChannel.accept();
                            socketChannel.configureBlocking(false);
                            // 如果注册时添加了ByteBuffer,只要buf不空,OP_READ会被一直调用
                            ConsoleThread thread = new ConsoleThread(socketChannel);
                            thread.start();
                            socketChannel.register(selector, SelectionKey.OP_READ, thread);
                            break;
                        case SelectionKey.OP_READ:  // read from a channel
                            ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
                            SocketChannel channel = (SocketChannel) key.channel();
                            channel.read(buffer);
                            buffer.flip();
                            byte[] tmp = new byte[buffer.limit() - buffer.position()];
                            buffer.get(tmp);
                            // buffer.clear();
                            String msg = new String(tmp);
                            if (msg.equals("exit")) {
                                channel.write(ByteBuffer.wrap("exit".getBytes()));
                                ((ConsoleThread)key.attachment()).interrupt();
                                key.cancel();
                            } else {
                                System.out.println(msg);
                            }
                            break;
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
package syncnonblocking;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Client {
    private static final int BUF_SIZE = 1024 * 2;
    public static void main(String[] args) {
        ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
        try (SocketChannel clientChannel = SocketChannel.open();
             Selector selector = Selector.open();) {
            clientChannel.connect(new InetSocketAddress("127.0.0.1", 13579));
            clientChannel.configureBlocking(false);
            ConsoleThread thread = new ConsoleThread(clientChannel);;
            clientChannel.register(selector, SelectionKey.OP_READ, thread);
            thread.start();
            boolean flag = true;
            // 客户端没有必要使用Selector,我是为了练习而使用的。或者P2P通信时可以这样写
            while(flag && selector.select() > 0) { // 不要先select()再flag,因为select会阻塞flag的判断
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                while(iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isReadable()) {
                        SocketChannel readChannel = (SocketChannel) key.channel();
                        buf.clear();
                        readChannel.read(buf);
                        buf.flip();
                        byte[] tmp = new byte[buf.limit()];
                        buf.get(tmp);
                        buf.clear();
                        String msg = new String(tmp);
                        if (msg.equals("exit")) {
                            readChannel.write(ByteBuffer.wrap("exit".getBytes()));
                            ((ConsoleThread)key.attachment()).interrupt();
                            key.cancel();
                            flag = false;
                        } else {
                            System.out.println(msg);
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
package syncnonblocking;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class ConsoleThread extends Thread {
    private SocketChannel socketChannel;
    public ConsoleThread(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));) {
            while (!isInterrupted()) {
                if (reader.ready()) {
                    String msg = reader.readLine();
                    ByteBuffer buf = ByteBuffer.wrap(msg.getBytes());
                    socketChannel.write(buf);
                } else {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        // System.out.println("ConsoleThread is over.");
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
顺带撸一遍Java NIO包含什么
  • 通道Channel: FileChannel、DatagramChannel、SocketChannel、ServerSocketChannel、AsynchronousFileChannel
  • 缓冲Buffer:ByteBuffer, CharBuffer, ShortBuffer, IntBuffer, FloatBuffer, LongBuffer, DoubleBuffer, MappedByteBUffer。有时需要注意数据的的大小端问题。
  • 选择器Selector:open、select方法、SelectionKey。
  • Pipe管道
  • Paths path: get, normalize
  • Files: exies, createDirectory, copy, move, move, walkFileTree
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/160767
推荐阅读
相关标签
  

闽ICP备14008679号