当前位置:   article > 正文

JavaNIO--2.实现ECHO服务器_用 nio技术替代一客户一线程技术改写 echo 项目的客户机/服务器设计。

用 nio技术替代一客户一线程技术改写 echo 项目的客户机/服务器设计。

JavaNIO技术实现ECHO服务器

所谓ECHO服务器就是客户端发送到服务器端什么内容,服务器端就返回什么内容的一种服务器,者几乎是最简单的网络服务器(当然还有更简单的抛弃服务器)

阅读需要基础:JavaNIO基础

1.NIO核心组件的使用

NIO核心组件主要包括SelectorChannel,而Buffer主要用于和Channel进行数据交互,所以不在此作详细的使用介绍。

1.1初始化NIO组件

public class NioServer {

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private int port;

    public NioServer(int port) throws IOException {
        // 打开一个ServerSocketChannel
        serverSocketChannel = ServerSocketChannel.open();
        // 设置为非阻塞模式才能注册到Selector
        serverSocketChannel.configureBlocking(false);
        // 打开一个选择器
        selector = Selector.open();
        this.port = port;   
    }

    // 启动服务器的方法
    private void startServer() {
        try {
            serverSocketChannel.bind(new InetSocketAddress(port));
            // 注册该通道到选择器,注意兴趣操作是SelectionKey.OP_ACCEPT
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            selectLoop();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

这里写图片描述

  1. 需要了解的是Channel需要设置为非阻塞模式才能注册到选择器

  2. Channel调用register()方法时需要指定兴趣操作,意思就是该选择器会监听这个通道有没有准备好可以执行的操作,兴趣操作有:SelectionKey.OP_ACCEPTSelectionKey.OP_READSelectionKey.OP_WRITESelectionKey.OP_CONNECT,分别对应的是ServerSocketChannelaccept()方法可以执行(不需阻塞),SocketChannelread()/write()方法可以执行(不需阻塞),以及SocketChannel内含的Socketconnect()方法可以调用(不需阻塞)。

如果不太了解NIO对应的操作模型,可以去参考我的上一篇博客:IO多路复用和NIO

1.2Accept组件

    private void acceptClient(SelectionKey selectionKey) throws IOException {
        // 与对端Socket建立连接
        SocketChannel socketChannel = serverSocketChannel.accept();

        if (socketChannel != null) {
            System.err.println("接收到一个连接,对端IP为:"+socketChannel.socket().getInetAddress());
        }
        // 将接收到的SocketChannel注册到Selector,注意此时通道要设置为非阻塞模式,且兴趣操作为OP_READ
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

使用和传统ServerSocketaccept()方法流程一致,需要注意的是,传统的accept()调用时会阻塞直到建立一个TCP连接,而使用Selector选择器可以避免阻塞,确保调用该方法时一定有一个(或多个)Socket连接已经在等待建立。

1.3SelectLoop(核心组件)

可以看到一个java.nio.channels.Selector可以注册多个通道Selector可以监听注册到自身的通道的状态。

    private void selectLoop() throws IOException {
        while(true) {
            // select()方法会阻塞,直到有注册到该选择器的通道有兴趣事件准备完毕
            selector.select();
            // selectedKeys()方法会获得有兴趣事件发生的通道,注册得到的SelectionKey的集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while(iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                // 循环判断其中的key
                if (selectionKey.isAcceptable()) {
                    // 如果key处于可接受状态,就进入接收函数
                    acceptClient(selectionKey);
                }else if(selectionKey.isReadable()) {
                    // 如果key处于可读状态,就进入读函数
                    readDate(selectionKey);
                }
            }
            // 每次处理完通道事件以后,要进行一次清空
            selectionKeys.clear();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

可以看到,通过调用选择器的select()会不断的得到将要发生事件通道,只要是注册到该选择器的通道,都会被轮询一次,而我们通过while循环,可以做到单线程无阻塞I/O。

2.NIO通道读写(Buffer)

2.1读取通道内容

    private void readDate(SelectionKey selectionKey) throws IOException {

        // 每一次都先获取之前绑定在这个key上的buffer
        ByteBuffer oldBuffer = (ByteBuffer)selectionKey.attachment();

        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer newBuffer = ByteBuffer.allocate(64);

        int read;
        while((read = socketChannel.read(newBuffer))<=0) {
            return;
        }

        newBuffer.flip();
        // 读取Buffer,看是否有换行符
        String line = readLine(newBuffer);
        if (line != null) {

            // 如果这次读到了行结束符,就将原来不含有行结束符的buffer合并位一行
            String sendData = readLine(mergeBuffer(oldBuffer, newBuffer));
            if (readLineContent(sendData).equalsIgnoreCase("exit")) { // 如果这一行的内容是exit就断开连接
                socketChannel.close();
                return;
            }
            // 然后直接发送回到客户端
            ByteBuffer sendBuffer = ByteBuffer.wrap(sendData.getBytes("utf-8"));
            while (sendBuffer.hasRemaining()) {
                socketChannel.write(sendBuffer);
            }
            selectionKey.attach(null);
        }else {
            // 如果这次没读到行结束付,就将这次读的内容和原来的内容合并,并刷新绑定到key对象上
            selectionKey.attach(mergeBuffer(oldBuffer, newBuffer));
        }

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

这里写图片描述

2.2Buffer处理辅助方法

/**
     * 读取ByteBuffer直到一行的末尾
     * 返回这一行的内容,包括换行符
     * 
     * @param buffer
     * @return String 读取到行末的内容,包括换行符 ; null 如果没有换行符
     * @throws UnsupportedEncodingException
     */
    private String readLine(ByteBuffer buffer) throws UnsupportedEncodingException {
        // windows中的换行符表示手段 "\r\n"
        // 基于windows的软件发送的换行符是会是CR和LF
        char CR = '\r';
        char LF = '\n';

        boolean crFound = false;
        int index = 0;
        int len = buffer.limit();
        buffer.rewind();
        while(index < len) {
            byte temp = buffer.get();
            if (temp == CR) {
                crFound = true;
            }
            if (crFound && temp == LF) {
                // Arrays.copyOf(srcArr,length)方法会返回一个 源数组中的长度到length位 的新数组
                return new String(Arrays.copyOf(buffer.array(), index+1),"utf-8");
            }
            index ++;
        }
        return null;
    }

    /**
     * 获取一行的内容,不包括换行符
     * @param buffer
     * @return String 行的内容
     * @throws UnsupportedEncodingException
     */
    private String readLineContent(String line) throws UnsupportedEncodingException {
        return line.substring(0, line.length() - 2);
    }

    /**
     * 对传入的Buffer进行拼接
     * @param oldBuffer
     * @param newBuffer
     * @return ByteBuffer 拼接后的Buffer
     */
    public static ByteBuffer mergeBuffer(ByteBuffer oldBuffer,ByteBuffer newBuffer) {
        // 如果原来的Buffer是null就直接返回
        if (oldBuffer == null) {
            return newBuffer;
        }
        // 如果原来的Buffer的剩余长度可容纳新的buffer则直接拼接
        newBuffer.rewind();
        if (oldBuffer.remaining() > (newBuffer.limit()-newBuffer.position())) {
            return oldBuffer.put(newBuffer);
        }

        // 如果不是以上两种情况就构建新的Buffer进行拼接
        int oldSize = oldBuffer != null?oldBuffer.limit():0;
        int newSize = newBuffer != null?newBuffer.limit():0;
        ByteBuffer result = ByteBuffer.allocate(oldSize+newSize);

        result.put(Arrays.copyOfRange(oldBuffer.array(), 0, oldSize));
        result.put(Arrays.copyOfRange(newBuffer.array(), 0, newSize));

        return result;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

这些代码是为了实现ECHO返回而实现的辅助方法,主要是进行Buffer的处理。

3.测试结果

这里写代码片
这里写图片描述
这里写图片描述

使用telnet进行连接测试,实现了ECHO服务器的功能,而且输入exit会关闭该连接。

4.完整代码

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;

public class NioServer {

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private int port;

    public NioServer(int port) throws IOException {
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        selector = Selector.open();
        this.port = port;   
    }

    private void selectLoop() throws IOException {
        while(true) {
            // select()方法会阻塞,直到有注册到该选择器的通道有兴趣事件发生
            selector.select();
            // selectedKeys()方法会获得有兴趣事件发生的通道,注册得到的SelectionKey的集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while(iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                // 循环判断其中的key
                if (selectionKey.isAcceptable()) {
                    // 如果key处于可接受状态,就进入接收函数
                    acceptClient(selectionKey);
                }else if(selectionKey.isReadable()) {
                    // 如果key处于可读状态,就进入读函数
                    readDate(selectionKey);
                }
            }
            selectionKeys.clear();
        }
    }

    /**
     * 接收连接并将建立的通道注册到选择器
     * 
     * @param selectionKey
     * @throws IOException
     */
    private void acceptClient(SelectionKey selectionKey) throws IOException {
        // 与对端Socket建立连接
        SocketChannel socketChannel = serverSocketChannel.accept();

        if (socketChannel != null) {
            System.err.println("接收到一个连接,对端IP为:"+socketChannel.socket().getInetAddress());
        }
        // 将接收到的SocketChannel注册到Selector,注意此时通道要设置为非阻塞模式,且兴趣操作为OP_READ
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    private void readDate(SelectionKey selectionKey) throws IOException {

        // 每一次都先获取之前绑定在这个key上的buffer
        ByteBuffer oldBuffer = (ByteBuffer)selectionKey.attachment();

        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer newBuffer = ByteBuffer.allocate(64);

        int read;
        while((read = socketChannel.read(newBuffer))<=0) {
            return;
        }

        newBuffer.flip();
        String line = readLine(newBuffer);
        if (line != null) {

            // 如果这次读到了行结束符,就将原来不含有行结束符的buffer合并位一行
            String sendData = readLine(mergeBuffer(oldBuffer, newBuffer));
            if (readLineContent(sendData).equalsIgnoreCase("exit")) { // 如果这一行的内容是exit就断开连接
                socketChannel.close();
                return;
            }
            // 然后直接发送回到客户端
            ByteBuffer sendBuffer = ByteBuffer.wrap(sendData.getBytes("utf-8"));
            while (sendBuffer.hasRemaining()) {
                socketChannel.write(sendBuffer);
            }
            selectionKey.attach(null);
        }else {
            // 如果这次没读到行结束付,就将这次读的内容和原来的内容合并,并刷新绑定到key对象上
            selectionKey.attach(mergeBuffer(oldBuffer, newBuffer));
        }

    }

    /**
     * 读取ByteBuffer直到一行的末尾
     * 返回这一行的内容,包括换行符
     * 
     * @param buffer
     * @return String 读取到行末的内容,包括换行符 ; null 如果没有换行符
     * @throws UnsupportedEncodingException
     */
    private String readLine(ByteBuffer buffer) throws UnsupportedEncodingException {
        // windows中的换行符表示手段 "\r\n"
        // 基于windows的软件发送的换行符是会是CR和LF
        char CR = '\r';
        char LF = '\n';

        boolean crFound = false;
        int index = 0;
        int len = buffer.limit();
        buffer.rewind();
        while(index < len) {
            byte temp = buffer.get();
            if (temp == CR) {
                crFound = true;
            }
            if (crFound && temp == LF) {
                // Arrays.copyOf(srcArr,length)方法会返回一个 源数组中的长度到length位 的新数组
                return new String(Arrays.copyOf(buffer.array(), index+1),"utf-8");
            }
            index ++;
        }
        return null;
    }

    /**
     * 获取一行的内容,不包括换行符
     * @param buffer
     * @return String 行的内容
     * @throws UnsupportedEncodingException
     */
    private String readLineContent(String line) throws UnsupportedEncodingException {
        return line.substring(0, line.length() - 2);
    }

    /**
     * 对传入的Buffer进行拼接
     * @param oldBuffer
     * @param newBuffer
     * @return ByteBuffer 拼接后的Buffer
     */
    public static ByteBuffer mergeBuffer(ByteBuffer oldBuffer,ByteBuffer newBuffer) {
        // 如果原来的Buffer是null就直接返回
        if (oldBuffer == null) {
            return newBuffer;
        }
        // 如果原来的Buffer的剩余长度可容纳新的buffer则直接拼接
        newBuffer.rewind();
        if (oldBuffer.remaining() > (newBuffer.limit()-newBuffer.position())) {
            return oldBuffer.put(newBuffer);
        }

        // 如果不是以上两种情况就构建新的Buffer进行拼接
        int oldSize = oldBuffer != null?oldBuffer.limit():0;
        int newSize = newBuffer != null?newBuffer.limit():0;
        ByteBuffer result = ByteBuffer.allocate(oldSize+newSize);

        result.put(Arrays.copyOfRange(oldBuffer.array(), 0, oldSize));
        result.put(Arrays.copyOfRange(newBuffer.array(), 0, newSize));

        return result;
    }

    private void startServer() {
        try {
            serverSocketChannel.bind(new InetSocketAddress(port));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            selectLoop();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }


    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            new NioServer(12345).startServer();

        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/160765
推荐阅读
相关标签
  

闽ICP备14008679号