赞
踩
网络IO是一个很庞大的体系,不是简单的学会了NIO就可以。所以本章会将会从网络IO的历史开始谈起,一步一步从代码到操作系统内核的推导。本章干货较多,我相信看完你一定会有很大收货。
本章知识点关键字:
在开始网络IO之前,先普及一下Socket的知识点,因为BIO、NIO操作的对象就是Socket。
Socket即套接字,就是对网络中不同主机上的应用进程之间进行双向通信的端点的抽象。这段话是百度百科上面的标准解释,看完是不是一脸懵逼?我先在这里给出相对好理解的解释:Socket就是客户端与服务端经历TCP三次握手后建立的连接,在软件工程学中也被称为四元组。是不是又一脸懵逼了,什么是四元组?其实很好理解既然服务端与客户端需要通过三次握手建立连接,那服务端IP和端口以及客户端IP和端口是必不可少的,这就是四元组,是不是有点感觉了?下面对Socket相关的知识点,通过代码以及操作系统内核数据的演示,来深入的学习Socket。
先来看一组代码。代码中只有一个主函数,并在其中开启8385的端口监听并接收新的客户端连接,在得到连接后开启一个新的线程去处理数据的读取,这也就是标准的BIO 服务端代码,不过需要主要一个点服务端在开启端口监听后,在服务端开始接收新的连接之前,插入了一段阻塞代码,主要是为了演示Socket。
package com.dxg.socket; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socket; import java.util.Objects; public class SocketBIO { /** * 服务器监听的端口 */ public static final int PORT = 8385; public static void main(String[] args) throws Exception { ServerSocket serverSocket = new ServerSocket(PORT); System.out.println("Server Start " + PORT); // 阻塞 验证socket是内核级别的 System.in.read(); while (true) { // 阻塞1 System.out.println("Prepare Accept Client"); Socket socket = serverSocket.accept(); System.out.println("接入新连接:" + socket.getPort()); // 抛出一个新的线程去处理socket的读写 new Thread(() -> { InputStream inputStream = null; BufferedReader reader = null; try { inputStream = socket.getInputStream(); reader = new BufferedReader(new InputStreamReader(inputStream)); while (true) { // 阻塞2 String data = reader.readLine(); if (Objects.nonNull(data)) { System.out.println("Server rev:" + data); } else { socket.close(); System.out.println("Client Close"); return; } } } catch (IOException e) { e.printStackTrace(); } finally { if (Objects.nonNull(reader)) { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } if (Objects.nonNull(inputStream)) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } }).start(); } } }
将代码在linux上执行,先来看运行后控制台的打印:
此时代码阻塞至System.in.read()
,也就是此时服务端仅仅开启了端口监听,但还没有开始去接收新连接。
此时查看linux的网络控制台以及当前Java进程内的fd文件:
如上两图,都可以看到此时端口已经处于监听状态,等待新连接的接入。其实在想想,我们再程序中创建了一个ServerSocekt并绑定一个端口,相当与在kernel中创建了一个fd5。
此时我们程序中并没有开启新连接的接收,那么此时客户端连接进来,能否被接收呢?
现在继续保持服务端的阻塞不变,开启新的客户端接入。首先通过命令nc 127.0.0.1 8385
连接到本地服务端。
继续查看linux网络控制台以及Java进程内的fd变化情况。
Java进程内的fd无任何变化但在linux的网络监听控制台出现了两个关于8385条目。这两个条目是服务端与客户端通过三次握手各自建立的Socket连接。
现在既然连接也已经建立了,Java进程也没有接收,也就是说Socket是内核级别。那么现在客户端发送的数据能否被接收呢?
继续保持Java进程阻塞,通过在客户端向服务端发送数据,继续观察,如下图:
客户端向服务端发送了10个字节的数据,在服务端的接收队列中也是可以收到客户端发送的数据。
注意!Send-Q、Recv-Q 的大小是有限制的,取决于网卡缓冲区的大小。
也是就说服务端客户端双方在建立连接之后缓冲区资源已经被开辟。
现在开始尝试让代码继续往下跑,开始接收连接,继续观察,看看会有什么新的变化?
首先看看服务端代码控制台:
客户端接收到了新的连接,并将开始存在与缓存区的数据进行了消费。(多打印的数据是正确的,为了验证接收缓存区的大小又多向服务端发送了几次数据)。
继续观察Java进程内的fd文件:
在程序开始接收新的连接之后明显此处多出了一个fd6,也就是我们内核中之前未被分配的Socket条目。程序中的Socket就是对fd6的抽象。fd上维护着一个索引,我们读取的数据就是通过当前socket fd去进行控制。
脑袋逐渐清晰,那么再看看linux内核中的变化。
内核中之前未被分配的socket也被分配至Java进程,未被消费的数据也被Java进程消费。
根据上面的实验已经可以得到结论。
服务端客户端通过三次握手建立连接,双方内核开辟资源得到Socket,为对方服务。
也就是说Socket是内核级别的,在程序开始接收连接得到的Socket是Java抽象出来的一个概念,其底层映射的是内核中的fd,后续数据的R/W都是通过访问此fd。
为了加深印象,再来看一组更清晰的连接示意图:
如图:两个客户端都去连接Server端,三次握手,建立资源。其都满足四元组的概念。
上面我们了解到了Socket在内核中的连接的过程,那么我们Java程序中是通过调用Kernel的函数去实现的端口的监听,连接的获取以及数据的读取呢?
在执行以上代码时,加入了Strace追踪系统调用,来看看结果吧:
当Java程序中开启了8385的监听后,内核的调用步骤如下:
到这里,现在上对于Socket自身的概念以及与Java和Kernel的关系,想毕已经很清楚了,那下面我们开始进入BIO的学习。
BIO俗称同步阻塞式IO。那在学习下面的知识之前我们得先搞清楚几个概念:
本篇主要围绕IO过程,至于IO完成后程序的处理这里不关注,以下几个概念也是针对IO
同步:数据的读写需要程序从kernel缓存区复制到程序内部的缓存区;
异步:数据的读取由内核直接复制到程序的内部的缓冲区;
阻塞:没有数据主线程阻塞,直至等待到有效返回;
非阻塞:不管有没有数据,都会直接返。有数据返回fd,无数据返回-1;
接下来看下,在演示scoket时写的代码,其导包都是来自java.io.*
这就是经典的BIO处理方式。下面对以上代码做个详细的解析,来看代码:
图中:
1.阻塞1: 服务端在接收客户端连接时调用了一个accept()
其实也是调用了kernel的accept()
调用,由于kernel的accept方法时阻塞的,所以此时服务端就阻塞到此处,直至有效返回;
2. 阻塞2:服务端接收到了客户端的连接,并去读取客户端上的数据,程序上调用了readLine()
其实也是调用的kernel的read()
,此方法也是阻塞的;
3. 既然服务端在接收到连接,后数据的读取是阻塞的,如果一直阻塞,新加入一个连接怎么办?所以在处理数据的读取时,创建一个新的线程去处理数据,这样就不影响主线接收连接了。
下面看下BIO的简易示意图:
在开始互联网开始的时候流量小这样确实可以满足需求,但是随着互联网的发展,用户数的暴增,就出现了C10K乃至现在的C10M的问题,一万用户就需要创建一万个线程,这个对于内存的消耗,以及线程切换所带来的损耗是不可估计的。至此找到了BIO的弊端,那么接下来看看NIO是怎么解决这个问题的?
NIO即同步非阻塞式IO,Java中称为 new IO,linux中称为NONBLOCK。NIO其实是通过一对多的方式去解决,BIO无限增长的线程数的,即一个线程可以负责多个客户端的读写。本文中代码就是一个线程负责数据客户端的接收,将接收到的连接存起来,在固定的线程内处理数据的读取。下面看代码:
package com.dxg.socket; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; public class SocketNIO { public static final int PORT = 8385; public static final List<SocketChannel> CLIENTS = new ArrayList<>(); public static void main(String[] args) throws Exception { ServerSocketChannel serverSocket = ServerSocketChannel.open(); // 设置服务端为非阻塞 serverSocket.configureBlocking(false); serverSocket.bind(new InetSocketAddress(PORT)); System.out.println("Server Start "+ PORT +" Listen..."); while (true) { // 由于服务端设置连接为非阻塞,即就是在ACCEPT时,没有接收到新的链接会直接返回。测试机器,频繁调触发系统调用,我怕我机器顶不住 TimeUnit.MILLISECONDS.sleep(500); SocketChannel client = serverSocket.accept(); if (Objects.nonNull(client)) { // 获取到客户端链接,设置其数据的读取为非阻塞 client.configureBlocking(false); CLIENTS.add(client); System.out.println("new client:" + client.getRemoteAddress()); } // 无论客户端是否发送数据,每次都需要遍历全量的客户端连接,造成不必要的资源浪费。 CLIENTS.parallelStream().forEach(chient -> { try { // 分配堆外内存 ByteBuffer tmpBuffer = ByteBuffer.allocateDirect(1024); int num = chient.read(tmpBuffer); // 不阻塞,一定会返回 if (0 < num) { // 有数据 tmpBuffer.flip(); byte[] datas = new byte[tmpBuffer.limit()]; tmpBuffer.get(datas); String str = new String(datas); System.out.println(str); } else if (0 == num) { // 接收到空数据 } else { // 小于0 客户端断开 tmpBuffer.clear(); System.out.println("client close:" + client.getRemoteAddress()); client.close(); } } catch (IOException e) { e.printStackTrace(); } }); } } }
先不看代码的具体内容,先看看运行效果,为了更好方便大家看到非阻塞的效果,在代码中加入了两行打印:
好咧,接下来使用命令javac SocketNIO.java && strace -ff -o /home/sysio/trace/socketNio java SocketNIO
把代码跑起来看看效果:
这个时候还没有客户端连接,最明显的就是可以看到服务端调用了accept()
程序并没有阻塞,而是继续往下走。
接下来使用命令nc 127.0.0.1 8385
连接一个新的客户端进来,继续查看服务端控制台的打印:
服务端调用read()
也没有阻塞,数据都是正常的打印。
发送点数据看看服务端能否正常接收:
接收也是正常的,我在尝试多开了几个客户端,其效果都是一样的,服务端都是正常接收的。
其实只要这两个点不阻塞了,所有的事情一个线程都可以完成,由此看到NIO确实是解决了BIO的弊端。
看了上面的演示,对于BIO和NIO的区别肯定是看出来了,代码可能还是有些困惑,那么先来屡屡代码:
首先导包就可以看到不同,NIO的代码导入了java.io.*
的包:
接下来,在成员变量处于BIO多了一个保存客户端连接的容器,因为所有的操作都是非阻塞的,我就可以把所有的连接存起来,使用一个或者多个线程去处理。
Main方法主体:
socket.configureBlocking(false)
设置socekt非阻塞,这样调用accept()
就不会阻塞;Socket.configureBlocking(false)
也设置为非阻塞,这样调用read()
就不会阻塞;NIO的流程也清楚了,下面来探究在程序内部的非阻塞是如何实现?
其实在服务端但凡涉及到端口的绑定监听到数据的读取都需要经过如BIO的一系列步骤:socke>bind>listen>。
在NIO这里唯一不同的就是,在创建socket时候,新增了一个参数NONBLOCK
将服务端设置为非阻塞。通过此设置就可以在调用accept()
和read()
时,不需要等待有效返回,直接返回,以达到不阻塞。
如图NIO通过在使用有限个线程完成了对连接的处理,完全解决了BIO线程数无限增长的问题。但是同时也带来了新的问题。如图中我们看到NIO是有两个无限循环的系统调用,其实无限循环以及系统调用本身并没有问题,但是如果说每次无限循环都是无用的遍历,那这个其实就是严重的浪费资源。比如并没有客户端的连接或者已连接的客户端并没有发送数据至服务端,但是服务端依然还是依然去全量遍历,如果一个两个无所谓,那如果是C10K呢?一万个连接中只有一个发送数据,我就要遍历一万次,其复杂度是一个O(N),再说系统调用是非常消耗资源的,所以就有了多路复用器,一起来看看吧。
上面也发现了NIO的弊端,每次都需要拿到fd去内核中询问是否存在数据,如果存在多个fd就要询问多次,其多路复用器就是将所有的文件描述符一次性发送至内核由内核去进行查看这些fd是否可以读取,这样就是减少了系统调用的次数。简单的说多路复用器的核心就是将NIO由用户态的遍历,切换至内核态,应用程序只需要拿到有数据的fd去进行遍历获取数据。多路复用器的实现方式有三种:select/poll/epoll
。其中select
是基于posix接口规范的,也就是说在所有的操作系统中都会存在,但是pool/epoll
是只有linux中存在,所以说在使用过程中需要注意。还有一点切记!多路复用器依然是同步模型!!!
先说说select和poll,这两个系统调用本质上是一样的,只不过select每次调用每次只允许传输1024个fd,而poll没有限制,所以现在基本都是使用的poll。下面主要来详细说明下poll和epoll的运作流程。
这里是一个应用程序服务端开启服务,等待接收连接的示意图。如图:
scoket()
得到了fd3并将端口绑定并监听;poll(fd3)
将fd3传入,内核查看在fd3中是否存在数据,存在就返回,不存在根据传入的参数决定;
poll()
调用监控到fd3存在数据,会将fd3返回给服务端,由服务端去读取fd3上的数据;其实在这里的poll()
是可以传入多个fd文件的,如果服务端已经接收到N个客户端,那么这个poll(fds)
传入的就是这N个客户端的fd。
在这里再想想,其实poll()
调用也是存在自己的弊端。
带着疑问我们一起来探究一下epoll是不是解决了现存的问题?
先来看下epoll的运行流程图:
scoket()
得到了fd3并将端口绑定并监听;epoll_create()
在内核中创建了红黑树,用来保存服务端传入的fd,这样服务端只需要将fd传输一次;epoll_ctl(fd3)
将需要监听的fd传入红黑树;epoll_waite()
等待kernel中的一块链表内存区域中的数据,这个区域是用来存放红黑树中可以读写的fd,是否返回也是与传入的时间参数有关;epoll_wait()
返回fd3至服务端;从此流程明显可以看到,epoll通过以空间换时间的思想提高了代码执行的效率并通过延伸中断回调将poll的遍历优化至事件通知的方式,提升了执行效率,并减少了系统资源的使用。
现在关于多路复用器的理论基本上是比较清晰,知道理论不实战等同与纸上谈兵,下面开始进入多路复用器的实战阶段。
直接上代码
package com.dxg.socket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * NIO至多路复用器单线程版本 */ public class SocketNIOSingleMultiplexer { public static final int PORT = 8385; public static Selector selector = null; public static void main(String[] args) throws Exception { ServerSocketChannel serverSocket = ServerSocketChannel.open(); serverSocket.configureBlocking(false); serverSocket.bind(new InetSocketAddress(PORT)); selector = Selector.open(); serverSocket.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Server Start " + PORT + " And Start Accept..."); while (true) { Set<SelectionKey> keys = selector.selectedKeys(); int num = selector.select(); if (0 < num) { // 这里只能使用单线程串行化处理。如果为多线程并行化处理,A客户端数据在Thread1还未处理完成, // Thread2,Thread3等线程也会继续处理,验证造成数据包无法解析。 Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { acceptHander(key); } else if (key.isReadable()) { readHandler(key); } else if (key.isWritable()) { writeHandle(key); } } } } } /** * 接收连接的处理器 */ public static void acceptHander(SelectionKey key) throws Exception { // 接收连接只能是ServerSocket ServerSocketChannel serverSocket = (ServerSocketChannel) key.channel(); SocketChannel clientSocket = serverSocket.accept(); // 拿到连接,设置其为非阻塞 clientSocket.configureBlocking(false); // 定义当前Socket所使用的buffer ByteBuffer buffer = ByteBuffer.allocateDirect(8192); // 将新接收的文件描述符,注册至多路复用器 clientSocket.register(selector, SelectionKey.OP_READ, buffer); System.out.println("new client:" + clientSocket.getRemoteAddress()); } /** * 数据读取处理器 */ public static void readHandler(SelectionKey key) throws Exception { // 客户端连接 SocketChannel clientSocket = (SocketChannel) key.channel(); // 获取buffer ByteBuffer buffer = (ByteBuffer) key.attachment(); buffer.clear(); while (true) { int num = clientSocket.read(buffer); if (0 < num) { // 将读取到的数据写回给客户端 clientSocket.register(selector, SelectionKey.OP_WRITE, buffer); } else if (0 == num) { // 空数据 break; } else { // 断开连接的处理 System.out.println("client close:" + clientSocket.getRemoteAddress()); key.cancel(); break; } } } public static void writeHandle(SelectionKey key) throws Exception { SocketChannel clientSocket = (SocketChannel) key.channel(); // 注册多路复用器时,将此Socket的buffer也附加上了并将获取到数据写入此buffer,在此处直接取出使用 ByteBuffer buffer = (ByteBuffer) key.attachment(); // 移动buffer前后两个指针,使得数据可以被写出 buffer.flip(); while (buffer.hasRemaining()) { clientSocket.write(buffer); } buffer.clear(); // 写事件,关注的是数据能不能写,也就是内核中的Send-Q是否已满,而不是你要写。所以在写完需要移除掉写事件的监听 key.cancel(); } }
当前代码为多路复用器服务端代码,主要的执行流程如下其中第一列为poll,第二个列为epoll:
> socket() > bind() > listen()
> socket() > bind() > listen()
> JVM 中开辟一个数组
> epoll_create()
> 将fd放如数组中
> epoll_ctl()
> poll(fd)
> epoll_wait()
> 将新的fd继续存放入上面开辟的数组
> 调用epoll_ctl()将新接收的fd放置红黑树
> 注册写事件,读取数据并写出
> 调用epoll_ctl()注册写事件,读取数据并写出
这里有三个注意点:
老规矩执行javac SocketNIOSingleMultiplexer.java && strace -ff -o /home/sysio/trace/out java SocketNIOSingleMultiplexer
让代码跑起来
执行nc 127.0.0.1 8385
模拟客户端连接服务:
服务端打印新客户端的接入,下来断开客户端:
服务端接收到了断开的信息包,并继续监听。下面新开客户端并输入字符看看是否正常返回:
服务端接收到新连接,客户端接收到服务端写会的数据。测试成功。
下面通过追踪情况看看系统调用是不是如上面所说:
创建红黑树fd7
将fd4(服务端socket)放入红黑树,并接收到有数据的有效返回。
将上步骤读取得到的新客户端fd8加入红黑树,并接收到f8上有消息的有效返回。
在fd8上注册写事件。
移除fd8上的写事件,并继续等待有效返回。
其实看看这些都是和上面的代码完全对应起来。这里使用的JDK1.8所以在linux平台默认使用epoll,所以这里也只模拟了epoll的系统调用,感兴趣的可以自己验证下poll的调用。在启动程序增加如下参数及即可指定:
到这里关于网络IO模型相关的基本知识就完结了,下一章节开始进入多路复用器至NETTY模型推导,欢迎关注。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。