DMA,全称Direct Memory Access,即直接存储器访问。
直接I/O,Linux sendfile模式
还有mmap形式,可以详细看下 Netty之美–零拷贝 ,比较专一的讲了DMA的形式。
使用 selector.select() 方法阻塞等待事件触发,调用processSelectedKeys()进行事件获取进行相应的动作,通过创建子线程方式执行相应的消息读取、消息处理等相关任务。
* {@link io.netty.channel.nio.NioEventLoop#select()}
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return selector.select();
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
/** * {@link io.netty.channel.nio.NioEventLoop#run()} */ protected void run() { int selectCnt = 0; for (;;) { try { ... case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { ... } ... if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } } ... }
读取时会通过一系列的PipChannel内链路,通过 invokeChannelRead 进行数据读取(不是偷懒不想写,耦合度太高了,相互调用的情况太多,且不同情况路径还会有差异,这里只写Socket读取密切相关的地方)。
调用读取逻辑,会将读取的值加入ByteBufAllocator,通过ByteBufAllocator进行链接,做一次 零拷贝 处理
/** * {@link io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read()} */ @Override public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); // 此处为重点,会将读取的值加入ByteBufAllocator,做一次 零拷贝 处理 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
将数据读入 ByteBuf (PooledUnsafeDirectByteBuf)
* {@link io.netty.channel.socket.nio.NioSocketChannel#doReadBytes(ByteBuf)}
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
* {@link io.netty.buffer.AbstractByteBuf#writeBytes(java.nio.channels.ScatteringByteChannel, int)}
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
int writtenBytes = setBytes(writerIndex, in, length);
if (writtenBytes > 0) {
writerIndex += writtenBytes;
return writtenBytes;
* {@link io.netty.buffer.PooledByteBuf#setBytes(int, java.nio.channels.ScatteringByteChannel, int)}
public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
try {
return in.read(internalNioBuffer(index, length));
} catch (ClosedChannelException ignored) {
return -1;
ByteBuffer创建,创建出 java.nio.DirectByteBuffer
* {@link io.netty.buffer.PooledByteBuf#internalNioBuffer()}
protected final ByteBuffer internalNioBuffer() {
ByteBuffer tmpNioBuf = this.tmpNioBuf;
if (tmpNioBuf == null) {
// 创建出 java.nio.DirectByteBuffer
this.tmpNioBuf = tmpNioBuf = newInternalNioBuffer(memory);
} else {
return tmpNioBuf;
/** * socketChannel read * {@link sun.nio.ch.SocketChannelImpl#read(ByteBuffer)} */ @Override public int read(ByteBuffer buf) throws IOException { Objects.requireNonNull(buf); readLock.lock(); try { boolean blocking = isBlocking(); int n = 0; try { beginRead(blocking); // check if connection has been reset if (connectionReset) throwConnectionReset(); // check if input is shutdown if (isInputClosed) return IOStatus.EOF; // 这里读取 n = IOUtil.read(fd, buf, -1, nd); if (blocking) { while (IOStatus.okayToRetry(n) && isOpen()) { park(Net.POLLIN); n = IOUtil.read(fd, buf, -1, nd); } } } catch (ConnectionResetException e) { connectionReset = true; throwConnectionReset(); } finally { endRead(blocking, n > 0); if (n <= 0 && isInputClosed) return IOStatus.EOF; } return IOStatus.normalize(n); } finally { readLock.unlock(); } }
/** * io 读取 * {@link sun.nio.ch.IOUtil.read(FileDescriptor, ByteBuffer, long, boolean, int, NativeDispatcher)} */ static int read(FileDescriptor fd, ByteBuffer dst, long position, boolean directIO, int alignment, NativeDispatcher nd) throws IOException { if (dst.isReadOnly()) throw new IllegalArgumentException("Read-only buffer"); // 注意这里的读取,如果不是DirectByteBuffer需要再做其他的处理 if (dst instanceof DirectBuffer) return readIntoNativeBuffer(fd, dst, position, directIO, alignment, nd); // Substitute a native buffer ByteBuffer bb; int rem = dst.remaining(); // 可以看出这里必须要使用DirectBuffer类型进行读取 if (directIO) { Util.checkRemainingBufferSizeAligned(rem, alignment); bb = Util.getTemporaryAlignedDirectBuffer(rem, alignment); } else { bb = Util.getTemporaryDirectBuffer(rem); } try { int n = readIntoNativeBuffer(fd, bb, position, directIO, alignment,nd); bb.flip(); if (n > 0) // 读取完成后再进行一次拷贝 dst.put(bb); return n; } finally { Util.offerFirstTemporaryDirectBuffer(bb); } }
* {@link sun.nio.ch.SocketDispatcher.read0(FileDescriptor, long, int)}
private static native int read0(FileDescriptor fd, long address, int len) throws IOException;
到这里为止,可以看出这的关键点在于java.nio.DirectByteBuffer的使用,关于DirectByteBuffer的详细介绍和作用可以看些这篇文章 Java NIO学习笔记三(堆外内存之 DirectByteBuffer 详解) 这里不再赘述,简而言之是开辟一个堆外内存(记个笔记,这个堆外内存通过jdk.internal.ref.Cleaner管理,依然可以被gc回收,所以有些文章说需要自己处理堆外内存,可能是老版本或先入为主了,关键是你没法操作这个堆外内存,也没有提供相应的public方法去处理),这里有个重要的点,无法直接操作这个堆外内存,也就是说如果需要处理数据则需要再拷贝成byteArray才能进行处理。
package person.pluto.natcross2.nio; import java.io.IOException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import lombok.AccessLevel; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import person.pluto.natcross2.executor.NatcrossExecutor; import person.pluto.natcross2.utils.Assert; import person.pluto.natcross2.utils.CountWaitLatch; /** * <p> * nio 容器 * </p> * * @author Pluto * @since 2021-04-13 09:25:51 */ @Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class NioHallows implements Runnable { public static final NioHallows INSTANCE = new NioHallows(); /** * 注册监听动作 * <p> * 要注意这里只拿最后的一次注册为准,即 {@code channel} 只能与一个 {@code proccesser} 动作对应 * * @param channel * @param ops 依据以下值进行或运算进行最后结果设定,并且 {@code channel} 要支持相应的动作 * <p> * - {@link SelectionKey#OP_ACCEPT} * <p> * - {@link SelectionKey#OP_CONNECT} * <p> * - {@link SelectionKey#OP_READ} * <p> * - {@link SelectionKey#OP_WRITE} * @param proccesser 要执行的动作 * @throws IOException * @author Pluto * @since 2021-04-26 15:55:38 */ public static void register(SelectableChannel channel, int ops, INioProcesser proccesser) throws IOException { INSTANCE.register0(channel, ops, proccesser); } /** * 根据 {@link SelectionKey} 恢复监听事件的注册 * * @param key 原始的key * @param ops 要与通过 {@link #register(SelectableChannel, int, INioProcesser)} * 注册的事件统一 * @throws IOException * @author Pluto * @since 2021-05-07 13:21:49 */ public static boolean reRegisterByKey(SelectionKey key, int ops) { return INSTANCE.reRegisterByKey0(key, ops); } /** * 释放注册 * * @param channel * @author Pluto * @since 2021-04-26 16:03:51 */ public static void release(SelectableChannel channel) { INSTANCE.release0(channel); } private volatile Thread myThread = null; private volatile boolean alive = false; private volatile boolean canceled = false; private volatile Selector selector; private final Object selectorLock = new Object(); private final CountWaitLatch countWaitLatch = new CountWaitLatch(); private final Map<SelectableChannel, ProcesserHolder> channelProcesserMap = new ConcurrentHashMap<>(); @Setter @Getter private long selectTimeout = 10L; @Setter @Getter private long wakeupSleepNanos = 1000000L; /** * 获取 {@link #selector} * <p> * 若 {@link #selector} 未有值,则会进行初始化:打开selector,并执行 {@link #start()} * * @return * @throws IOException * @author Pluto * @since 2021-04-26 16:04:30 */ public Selector getSelector() throws IOException { // 判空、返回逻辑,按第一次取值进行,缺点是不能判断是否已经关闭,但与 this.cancel() // 方法中的执行顺序来看,会先被设置为null,再去close,所以可以大概率认为若不为null即为没有关闭 Selector selector = this.selector; if (Objects.isNull(selector)) { synchronized (this.selectorLock) { // 二次校验 // 若是主动退出,则不在创建,避免退出时有新任务而被重启,若要重新启用,则需要主动调用 start() 方法来启动 if (Objects.isNull(this.selector) && !this.canceled) { this.selector = Selector.open(); this.start(); } } selector = this.selector; if (Objects.isNull(selector)) { throw new IOException("NioHallows's selector is closed"); } } return selector; } /** * 获取唤醒后的 {@link #selector} * <p> * 注意,若 {@link #run()} 快于你的任务,还是会被再次阻塞,只是执行了一次 {@link Selector#wakeup()} * * @return * @throws IOException * @author Pluto * @since 2021-04-26 16:07:00 */ public Selector getWakeupSelector() throws IOException { return this.getSelector().wakeup(); } /** * 注册监听动作 * <p> * 要注意这里只拿最后的一次注册为准,即 {@code channel} 只能与一个 {@code proccesser} 动作对应 * * @param channel * @param ops 依据以下值进行或运算进行最后结果设定,并且 {@code channel} 要支持相应的动作 * <p> * - {@link SelectionKey#OP_ACCEPT} * <p> * - {@link SelectionKey#OP_CONNECT} * <p> * - {@link SelectionKey#OP_READ} * <p> * - {@link SelectionKey#OP_WRITE} * @param proccesser 要执行的动作 * @throws IOException * @author Pluto * @since 2021-04-26 15:55:38 */ public void register0(SelectableChannel channel, int ops, INioProcesser proccesser) throws IOException { Objects.requireNonNull(channel, "channel non null"); try { this.channelProcesserMap.put(channel, ProcesserHolder.of(channel, ops, proccesser)); channel.configureBlocking(false); this.countWaitLatch.countUp(); // 这里有个坑点,如果在select中,这里会被阻塞 channel.register(this.getWakeupSelector(), ops); } catch (Throwable e) { this.channelProcesserMap.remove(channel); throw e; } finally { this.countWaitLatch.countDown(); } } /** * 根据 {@link SelectionKey} 恢复监听事件的注册 * * @param key 原始的key * @param ops 要与通过 {@link #register0(SelectableChannel, int, INioProcesser)} * 注册的事件统一 * @throws IOException * @author Pluto * @since 2021-05-07 13:21:49 */ public boolean reRegisterByKey0(SelectionKey key, int ops) { Objects.requireNonNull(key, "key non null"); Assert.state(key.selector() == this.selector, "this SelectionKey is not belong NioHallows's selector"); if (!key.isValid()) { return false; } // 通过事件和源码分析,恢复注册是通过updateKeys.addLast进行,虽然没有被阻塞,但是需要进行一次唤醒才可以成功恢复事件监听 // 因无法获知是否成功注入selector,所以必须要进行一次唤醒操作,并且没有阻塞的问题,所以这里不通过countWaitLatch进行同步 key.interestOps(ops); try { this.getWakeupSelector(); } catch (IOException e) { // 出错了交给其他的流程逻辑,这里只进行一次唤醒 } return true; } /** * 释放注册 * * @param channel * @author Pluto * @since 2021-04-26 16:03:51 */ public void release0(SelectableChannel channel) { if (Objects.isNull(channel)) { return; } this.channelProcesserMap.remove(channel); SelectionKey key = channel.keyFor(this.selector); if (Objects.nonNull(key)) { key.cancel(); } } @Override public void run() { CountWaitLatch countWaitLatch = this.countWaitLatch; Map<SelectableChannel, ProcesserHolder> chanelProcesserMap = this.channelProcesserMap; for (; this.alive;) { // 给注册事务一个时间,如果等待时间太长(可能需要注入的太多),就跳出再去获取新事件,防止饿死 try { countWaitLatch.await(this.getWakeupSleepNanos(), TimeUnit.NANOSECONDS); } catch (InterruptedException e) { log.warn("selector wait register timeout"); } Selector selector; try { selector = getSelector(); // 采用有期限的监听,以免线程太快,没有来的及注册,就永远阻塞在那里了 int select = selector.select(this.getSelectTimeout()); if (select <= 0) { continue; } } catch (IOException e) { log.error("NioHallows run exception", e); continue; } Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); for (; iterator.hasNext();) { SelectionKey key = iterator.next(); iterator.remove(); key.interestOps(0); ProcesserHolder processerHolder = chanelProcesserMap.get(key.channel()); if (Objects.isNull(processerHolder)) { key.cancel(); continue; } NatcrossExecutor.executeNioAction(() -> { processerHolder.proccess(key); }); } } } /** * 启动nio事件监听 * * @author Pluto * @since 2021-04-26 16:33:07 */ public synchronized void start() { this.canceled = false; this.alive = true; if (this.myThread == null || !this.myThread.isAlive()) { this.myThread = new Thread(this); this.myThread.setName("nio-hallows"); this.myThread.start(); log.info("NioHallows is started!"); } } /** * 退出nio事件监听 * * @author Pluto * @since 2021-04-26 16:33:33 */ public void cancel() { // 假设A线程执行到了 this.selector = Selector.open() 但是调用 this.cancel() // 方法的B线程抢占cpu成功,并一直到执行完成,此时A线程抢占CPU继续执行,又会进行重启,与关停项目时的关停期望不同。 // // 此处锁定 this.selectorLock 后再去设置 this.canceled,形成与 this.getSelector() // 的线程同步,同时避免了被动调用 this.start() 时与 this.cancel() 的同步问题,最终可关闭。 // 虽与主动调用 this.start() 有不同步的风险,但 this.start() 、 this.cancel() // 主动调用的场景有极大对立性,所以不进行过多的关照。 // // 注意:若 this.cancel() 添加了synchronized,存在死锁的可能!!! synchronized (this.selectorLock) { this.canceled = true; } log.info("NioHallows cancel"); this.alive = false; Selector selector; if ((selector = this.selector) != null) { this.selector = null; try { selector.close(); } catch (IOException e) { // do nothing } } Thread myThread; if ((myThread = this.myThread) != null) { this.myThread = null; myThread.interrupt(); } } }
@Data @AllArgsConstructor(staticName = "of") public class ProcesserHolder { private SelectableChannel channel; private int interestOps; private INioProcesser processer; /** * 执行事件的任务 * * @param key * @author Pluto * @since 2021-04-26 16:35:36 */ public void proccess(SelectionKey key) { this.processer.proccess(key); if (!NioHallows.reRegisterByKey(key, this.interestOps)) { NioHallows.release(this.channel); } } }
@Slf4j public class SimplePassway implements Runnable, INioProcesser { ... /** * 向输出通道输出数据 * <p> * 这里不只是为了DMA而去用DMA,而是这里有奇葩问题 * <p> * 如能采用了SocketChannel,而去用outputStream的时候,不管输入输出,都会有奇怪的问题,比如输出会莫名的阻塞住 * <p> * 整体就是如果能用nio的方法,但是用了bio形式都会各种什么 NullPointException、IllageSateException 等等错误 * <p> * 经过实验,是java8会出现阻塞的情况,java14没有出些这些奇怪的问题,估计是jni进行了变更吧 * </p> * * @param byteBuffer * @throws IOException * @author Pluto * @since 2021-04-09 16:37:33 */ private void write(ByteBuffer byteBuffer) throws IOException { SocketChannel outputChannel; OutputStream outputStream; if (Objects.nonNull((outputChannel = this.getOutputChannel()))) { // 这里要注意,可能缓存空间不足,而没有完全写出byteBuffer,所以需要循环处理进行全部输出(或其他的方式保证输出完毕) while (byteBuffer.hasRemaining()) { outputChannel.write(byteBuffer); } } else { outputStream = this.getOutputStream(); outputStream.write(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit()); outputStream.flush(); } } // ============== nio ================= @Setter(AccessLevel.NONE) @Getter(AccessLevel.NONE) private ByteBuffer byteBuffer; private ByteBuffer obtainByteBuffer() { if (Objects.isNull(byteBuffer)) { if (Objects.isNull(this.getOutputChannel())) { // 如果需要处理数据的话,这里直接声明为 java.nio.HeapByteBuffer.HeapByteBuffer 就好了, // java.nio.DirectByteBuffer#get(byte[], int, int)用的也是UNSAFE.copyMemory byteBuffer = ByteBuffer.allocate(streamCacheSize); } else { // 输入输出可以使用channel,此处则使用DirectByteBuffer,这时候才真正体现出了DMA byteBuffer = ByteBuffer.allocateDirect(streamCacheSize); } } return byteBuffer; } @Override public void proccess(SelectionKey key) { if (alive && key.isValid()) { ByteBuffer buffer = this.obtainByteBuffer(); SocketChannel inputChannel = (SocketChannel) key.channel(); try { int len = -1; do { buffer.clear(); len = inputChannel.read(buffer); if (len > 0) { buffer.flip(); if (buffer.hasRemaining()) { this.write(buffer); } } } while (len > 0); // 如果不是负数,则还没有断开连接,返回继续等待 if (len >= 0) { return; } } catch (IOException e) { // } } log.debug("one InputToOutputThread closed"); this.cancell(); } ... }
在需要处理数据的情况下,java中channel还是需要一次拷贝的过程;然后我们看下InputStream.read的实现,还是通过一路追踪过去,找到 java.net.SocketInputStream#socketRead0(FileDescriptor, byte[], int, int, int) 为最终实现方法;这里并没有再次拷贝的过程,直接写入了目标byteArray,我们知道一般这种jni中的都会相对较快一些,难道stream真的比channel要省一步拷贝过程吗?
