赞
踩
所谓ECHO服务器就是客户端发送到服务器端什么内容,服务器端就返回什么内容的一种服务器,者几乎是最简单的网络服务器(当然还有更简单的抛弃服务器)
阅读需要基础:JavaNIO基础
NIO核心组件主要包括Selector
和Channel
,而Buffer
主要用于和Channel
进行数据交互,所以不在此作详细的使用介绍。
public class NioServer {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private int port;
public NioServer(int port) throws IOException {
// 打开一个ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞模式才能注册到Selector
serverSocketChannel.configureBlocking(false);
// 打开一个选择器
selector = Selector.open();
this.port = port;
}
// 启动服务器的方法
private void startServer() {
try {
serverSocketChannel.bind(new InetSocketAddress(port));
// 注册该通道到选择器,注意兴趣操作是SelectionKey.OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectLoop();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
需要了解的是Channel
需要设置为非阻塞模式才能注册到选择器
Channel
调用register()
方法时需要指定兴趣操作,意思就是该选择器会监听这个通道有没有准备好可以执行的操作,兴趣操作有:SelectionKey.OP_ACCEPT
,SelectionKey.OP_READ
,SelectionKey.OP_WRITE
,SelectionKey.OP_CONNECT
,分别对应的是ServerSocketChannel
的accept()
方法可以执行(不需阻塞),SocketChannel
的read()/write()
方法可以执行(不需阻塞),以及SocketChannel
内含的Socket
的connect()
方法可以调用(不需阻塞)。
如果不太了解NIO对应的操作模型,可以去参考我的上一篇博客:IO多路复用和NIO
private void acceptClient(SelectionKey selectionKey) throws IOException {
// 与对端Socket建立连接
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
System.err.println("接收到一个连接,对端IP为:"+socketChannel.socket().getInetAddress());
}
// 将接收到的SocketChannel注册到Selector,注意此时通道要设置为非阻塞模式,且兴趣操作为OP_READ
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
使用和传统ServerSocket
的accept()
方法流程一致,需要注意的是,传统的accept()
调用时会阻塞直到建立一个TCP连接,而使用Selector
选择器可以避免阻塞,确保调用该方法时一定有一个(或多个)Socket
连接已经在等待建立。
可以看到一个java.nio.channels.Selector
可以注册多个通道,Selector
可以监听注册到自身的通道的状态。
private void selectLoop() throws IOException {
while(true) {
// select()方法会阻塞,直到有注册到该选择器的通道有兴趣事件准备完毕
selector.select();
// selectedKeys()方法会获得有兴趣事件发生的通道,注册得到的SelectionKey的集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 循环判断其中的key
if (selectionKey.isAcceptable()) {
// 如果key处于可接受状态,就进入接收函数
acceptClient(selectionKey);
}else if(selectionKey.isReadable()) {
// 如果key处于可读状态,就进入读函数
readDate(selectionKey);
}
}
// 每次处理完通道事件以后,要进行一次清空
selectionKeys.clear();
}
}
可以看到,通过调用选择器的select()
会不断的得到将要发生事件通道,只要是注册到该选择器的通道,都会被轮询一次,而我们通过while
循环,可以做到单线程无阻塞I/O。
private void readDate(SelectionKey selectionKey) throws IOException {
// 每一次都先获取之前绑定在这个key上的buffer
ByteBuffer oldBuffer = (ByteBuffer)selectionKey.attachment();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer newBuffer = ByteBuffer.allocate(64);
int read;
while((read = socketChannel.read(newBuffer))<=0) {
return;
}
newBuffer.flip();
// 读取Buffer,看是否有换行符
String line = readLine(newBuffer);
if (line != null) {
// 如果这次读到了行结束符,就将原来不含有行结束符的buffer合并位一行
String sendData = readLine(mergeBuffer(oldBuffer, newBuffer));
if (readLineContent(sendData).equalsIgnoreCase("exit")) { // 如果这一行的内容是exit就断开连接
socketChannel.close();
return;
}
// 然后直接发送回到客户端
ByteBuffer sendBuffer = ByteBuffer.wrap(sendData.getBytes("utf-8"));
while (sendBuffer.hasRemaining()) {
socketChannel.write(sendBuffer);
}
selectionKey.attach(null);
}else {
// 如果这次没读到行结束付,就将这次读的内容和原来的内容合并,并刷新绑定到key对象上
selectionKey.attach(mergeBuffer(oldBuffer, newBuffer));
}
}
/**
* 读取ByteBuffer直到一行的末尾
* 返回这一行的内容,包括换行符
*
* @param buffer
* @return String 读取到行末的内容,包括换行符 ; null 如果没有换行符
* @throws UnsupportedEncodingException
*/
private String readLine(ByteBuffer buffer) throws UnsupportedEncodingException {
// windows中的换行符表示手段 "\r\n"
// 基于windows的软件发送的换行符是会是CR和LF
char CR = '\r';
char LF = '\n';
boolean crFound = false;
int index = 0;
int len = buffer.limit();
buffer.rewind();
while(index < len) {
byte temp = buffer.get();
if (temp == CR) {
crFound = true;
}
if (crFound && temp == LF) {
// Arrays.copyOf(srcArr,length)方法会返回一个 源数组中的长度到length位 的新数组
return new String(Arrays.copyOf(buffer.array(), index+1),"utf-8");
}
index ++;
}
return null;
}
/**
* 获取一行的内容,不包括换行符
* @param buffer
* @return String 行的内容
* @throws UnsupportedEncodingException
*/
private String readLineContent(String line) throws UnsupportedEncodingException {
return line.substring(0, line.length() - 2);
}
/**
* 对传入的Buffer进行拼接
* @param oldBuffer
* @param newBuffer
* @return ByteBuffer 拼接后的Buffer
*/
public static ByteBuffer mergeBuffer(ByteBuffer oldBuffer,ByteBuffer newBuffer) {
// 如果原来的Buffer是null就直接返回
if (oldBuffer == null) {
return newBuffer;
}
// 如果原来的Buffer的剩余长度可容纳新的buffer则直接拼接
newBuffer.rewind();
if (oldBuffer.remaining() > (newBuffer.limit()-newBuffer.position())) {
return oldBuffer.put(newBuffer);
}
// 如果不是以上两种情况就构建新的Buffer进行拼接
int oldSize = oldBuffer != null?oldBuffer.limit():0;
int newSize = newBuffer != null?newBuffer.limit():0;
ByteBuffer result = ByteBuffer.allocate(oldSize+newSize);
result.put(Arrays.copyOfRange(oldBuffer.array(), 0, oldSize));
result.put(Arrays.copyOfRange(newBuffer.array(), 0, newSize));
return result;
}
这些代码是为了实现ECHO返回而实现的辅助方法,主要是进行Buffer的处理。
使用telnet
进行连接测试,实现了ECHO服务器的功能,而且输入exit
会关闭该连接。
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
public class NioServer {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private int port;
public NioServer(int port) throws IOException {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
selector = Selector.open();
this.port = port;
}
private void selectLoop() throws IOException {
while(true) {
// select()方法会阻塞,直到有注册到该选择器的通道有兴趣事件发生
selector.select();
// selectedKeys()方法会获得有兴趣事件发生的通道,注册得到的SelectionKey的集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 循环判断其中的key
if (selectionKey.isAcceptable()) {
// 如果key处于可接受状态,就进入接收函数
acceptClient(selectionKey);
}else if(selectionKey.isReadable()) {
// 如果key处于可读状态,就进入读函数
readDate(selectionKey);
}
}
selectionKeys.clear();
}
}
/**
* 接收连接并将建立的通道注册到选择器
*
* @param selectionKey
* @throws IOException
*/
private void acceptClient(SelectionKey selectionKey) throws IOException {
// 与对端Socket建立连接
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
System.err.println("接收到一个连接,对端IP为:"+socketChannel.socket().getInetAddress());
}
// 将接收到的SocketChannel注册到Selector,注意此时通道要设置为非阻塞模式,且兴趣操作为OP_READ
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
private void readDate(SelectionKey selectionKey) throws IOException {
// 每一次都先获取之前绑定在这个key上的buffer
ByteBuffer oldBuffer = (ByteBuffer)selectionKey.attachment();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer newBuffer = ByteBuffer.allocate(64);
int read;
while((read = socketChannel.read(newBuffer))<=0) {
return;
}
newBuffer.flip();
String line = readLine(newBuffer);
if (line != null) {
// 如果这次读到了行结束符,就将原来不含有行结束符的buffer合并位一行
String sendData = readLine(mergeBuffer(oldBuffer, newBuffer));
if (readLineContent(sendData).equalsIgnoreCase("exit")) { // 如果这一行的内容是exit就断开连接
socketChannel.close();
return;
}
// 然后直接发送回到客户端
ByteBuffer sendBuffer = ByteBuffer.wrap(sendData.getBytes("utf-8"));
while (sendBuffer.hasRemaining()) {
socketChannel.write(sendBuffer);
}
selectionKey.attach(null);
}else {
// 如果这次没读到行结束付,就将这次读的内容和原来的内容合并,并刷新绑定到key对象上
selectionKey.attach(mergeBuffer(oldBuffer, newBuffer));
}
}
/**
* 读取ByteBuffer直到一行的末尾
* 返回这一行的内容,包括换行符
*
* @param buffer
* @return String 读取到行末的内容,包括换行符 ; null 如果没有换行符
* @throws UnsupportedEncodingException
*/
private String readLine(ByteBuffer buffer) throws UnsupportedEncodingException {
// windows中的换行符表示手段 "\r\n"
// 基于windows的软件发送的换行符是会是CR和LF
char CR = '\r';
char LF = '\n';
boolean crFound = false;
int index = 0;
int len = buffer.limit();
buffer.rewind();
while(index < len) {
byte temp = buffer.get();
if (temp == CR) {
crFound = true;
}
if (crFound && temp == LF) {
// Arrays.copyOf(srcArr,length)方法会返回一个 源数组中的长度到length位 的新数组
return new String(Arrays.copyOf(buffer.array(), index+1),"utf-8");
}
index ++;
}
return null;
}
/**
* 获取一行的内容,不包括换行符
* @param buffer
* @return String 行的内容
* @throws UnsupportedEncodingException
*/
private String readLineContent(String line) throws UnsupportedEncodingException {
return line.substring(0, line.length() - 2);
}
/**
* 对传入的Buffer进行拼接
* @param oldBuffer
* @param newBuffer
* @return ByteBuffer 拼接后的Buffer
*/
public static ByteBuffer mergeBuffer(ByteBuffer oldBuffer,ByteBuffer newBuffer) {
// 如果原来的Buffer是null就直接返回
if (oldBuffer == null) {
return newBuffer;
}
// 如果原来的Buffer的剩余长度可容纳新的buffer则直接拼接
newBuffer.rewind();
if (oldBuffer.remaining() > (newBuffer.limit()-newBuffer.position())) {
return oldBuffer.put(newBuffer);
}
// 如果不是以上两种情况就构建新的Buffer进行拼接
int oldSize = oldBuffer != null?oldBuffer.limit():0;
int newSize = newBuffer != null?newBuffer.limit():0;
ByteBuffer result = ByteBuffer.allocate(oldSize+newSize);
result.put(Arrays.copyOfRange(oldBuffer.array(), 0, oldSize));
result.put(Arrays.copyOfRange(newBuffer.array(), 0, newSize));
return result;
}
private void startServer() {
try {
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectLoop();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) throws UnsupportedEncodingException {
try {
new NioServer(12345).startServer();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。