赞
踩
IO、NIO、AIO 内部原理分析
NIO 之 Selector实现原理
NIO 之 Channel实现原理
NIO 之 ByteBuffer实现原理
服务器使用NIO来实现一个echo协议的服务器。
echo协议简单也很有用,可以测试网络连接。
消息的格式为:消息长度(int)+消息内容
通过消息长度来进行socket分包,防止读取出现半包、粘包等问题。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioEchoServer {
public static void server() {
ServerSocketChannel ssc = null;
Selector selector = null;
try {
ssc = ServerSocketChannel.open();
//设置为非阻塞
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(9000), 10);
selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
while (true) {
try {
selector.select();
} catch (IOException e) {
e.printStackTrace();
}
try {
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> it = set.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
set.remove(key);
try {
if (key.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
MessageInfo messageInfo = new MessageInfo();
clientChannel.register(selector, SelectionKey.OP_READ, messageInfo);
} else if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
MessageInfo messageInfo = (MessageInfo)key.attachment();
if(messageInfo.getLenBuf() == null){
messageInfo.setLenBuf(ByteBuffer.allocate(4));
}
//消息长度
int len = messageInfo.getLen();
//判断消息长度是否读取完成
if(len!=-1){
if(messageInfo.contentBuf == null){
//创建指定消息长度的buf
messageInfo.contentBuf = ByteBuffer.allocate(len);
}
//如果消息未读取完成,继续读取
if(messageInfo.contentBuf.position()<len){
if(clientChannel.read(messageInfo.contentBuf)==-1){
clientChannel.register(selector, SelectionKey.OP_WRITE, messageInfo);
}
}
if(len==0){
System.out.println("空消息");
key.cancel();
key.channel().close();
}else if(messageInfo.contentBuf.position()==len){
messageInfo.contentBuf.flip();
System.out.println(new String(messageInfo.contentBuf.array()));
// clientChannel.register(selector, SelectionKey.OP_WRITE, messageInfo);
key.interestOps(SelectionKey.OP_WRITE);
}
}
//消息长度未读取完成
else{
int i = clientChannel.read(messageInfo.getLenBuf());
if(i==-1){
System.out.println("消息异常:"+messageInfo.getLenBuf());
break;
}
if(messageInfo.getLenBuf().position()==4){
messageInfo.getLenBuf().flip();
messageInfo.setLen(messageInfo.getLenBuf().getInt());
}
}
}else if(key.isWritable()){
SocketChannel clientChannel = (SocketChannel) key.channel();
MessageInfo messageInfo = (MessageInfo)key.attachment();
messageInfo.lenBuf.rewind();
messageInfo.contentBuf.rewind();
ByteBuffer[] bufs = new ByteBuffer[]{messageInfo.lenBuf, messageInfo.contentBuf};
System.out.println(messageInfo.contentBuf.limit()+":"+messageInfo.contentBuf.capacity());
clientChannel.write(bufs);
key.cancel();
key.channel().close();
}
} catch (Exception e) {
key.cancel();
key.channel().close();
}
}
} catch (Exception e) {
e.printStackTrace();
break;
}
}
System.out.println("");
}
public static void main(String[] args) {
server();
}
//消息对象
static class MessageInfo {
private int len=-1;
private ByteBuffer lenBuf;
private ByteBuffer contentBuf;
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public ByteBuffer getLenBuf() {
return lenBuf;
}
public void setLenBuf(ByteBuffer lenBuf) {
this.lenBuf = lenBuf;
}
public ByteBuffer getContentBuf() {
return contentBuf;
}
public void setContentBuf(ByteBuffer contentBuf) {
this.contentBuf = contentBuf;
}
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class NioEchoClient {
public static void main(String[] args) {
SocketAddress sa = new InetSocketAddress("127.0.0.1", 9000);
try {
SocketChannel client = SocketChannel.open(sa);
byte[] data = "大家好".getBytes();
ByteBuffer lenBuf = ByteBuffer.wrap(int2byte(data.length));
ByteBuffer textBuf = ByteBuffer.wrap(data);
ByteBuffer[] bufArray = new ByteBuffer[]{lenBuf, textBuf};
client.write(bufArray);
lenBuf.clear();
textBuf.clear();
client.read(bufArray);
System.out.println("echo:"+new String(textBuf.array()));
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* int 转 byte[] 数组
* @param len
* @return
*/
public static byte[] int2byte(int len) {
byte[] b = new byte[4];
b[0] = (byte) (len >> 24);
b[1] = (byte) (len >> 16 & 0XFF);
b[2] = (byte) (len >> 8 & 0XFF);
b[3] = (byte) (len & 0XFF);
return b;
}
}
本人简书blog地址:http://www.jianshu.com/u/1f0067e24ff8
点击这里快速进入简书
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。