当前位置:   article > 正文

一些关于DMA的见解和在JAVA中的简单使用(Linux、Socket、Netty方向)_java dma

java dma

一些关于DMA的见解和在JAVA中的简单使用(Linux、Socket、Netty方向)

DMA

DMA,全称Direct Memory Access,即直接存储器访问。

DMA传输将数据从一个地址空间复制到另一个地址空间,提供在外设和存储器之间或者存储器和存储器之间的高速数据传输。当CPU初始化这个传输动作,传输动作本身是由DMA控制器来实现和完成的。DMA传输方式无需CPU直接控制传输,也没有中断处理方式那样保留现场和恢复现场过程,通过硬件为RAM和IO设备开辟一条直接传输数据的通道,使得CPU的效率大大提高。

传统IO

传统IO
如上图,假设需求是将一个磁盘文件发布到网络上。具体步骤是:

  1. 应用程序调用系统方法read发起读文件操作,同时CPU由用户态转为内核态,
  2. 系统通过DMA控制器将文件拷贝到内核缓冲区,该操作基本不需要CPU参与;
  3. CPU将数据从内核缓冲区拷贝到用户缓冲区,发生一次CPU拷贝,同时CPU由内核态转为用户态;到此为止,系统调用read方法返回;
  4. 程序继续调用write方法,同时CPU由用户态转为内核态,CPU将数据又从用户缓冲区拷贝到socket buffer;
  5. 数据通过DMA被拷贝到协议引擎,如网卡等,同时write方法返回,CPU由内核态转为用户态。

总共需要2次CPU拷贝、2次DMA拷贝,4次上下文切换,其中read和write各占一半。

直接I/O,Linux sendfile模式

sendfile

如图,步骤是:

  1. 程序调用sendfile方法,使用DMA的方式将磁盘数据读取到内核缓冲区;
  2. 直接拷贝到协议栈。

总共需要2次DMA拷贝,1次CPU拷贝,2次上下文切换。但是缺点也是很明显的,由于完全没有经过用户态,sendfile只能简单的传送数据而不能对其进行修改。

还有mmap形式,可以详细看下 Netty之美–零拷贝 ,比较专一的讲了DMA的形式。

Netty中对DMA的使用

以下代码将会使用Javadoc形式进行连接代码出处,建议在阅读是将netty包引入demo工程,建议使用Java14环境下查看java相关代码

<!--netty 的maven包-->
<dependency>
	<groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.60.Final</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

使用 selector.select() 方法阻塞等待事件触发,调用processSelectedKeys()进行事件获取进行相应的动作,通过创建子线程方式执行相应的消息读取、消息处理等相关任务。

等待select通知

/**
 * {@link io.netty.channel.nio.NioEventLoop#select()}
 */
private int select(long deadlineNanos) throws IOException {
    if (deadlineNanos == NONE) {
        return selector.select();
    }
    // Timeout will only be 0 if deadline is within 5 microsecs
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

循环select并处理selectedKeys

/**
 * {@link io.netty.channel.nio.NioEventLoop#run()}
 */
protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
            ...
                case SelectStrategy.SELECT:
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE; // nothing on the calendar
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        if (!hasTasks()) {
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        // This update is just to help block unnecessary selector wakeups
                        // so use of lazySet is ok (no race condition)
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                ...
            }
...
            if (ioRatio == 100) {
                try {
                    if (strategy > 0) {
                        processSelectedKeys();
                    }
                } finally {
                    // Ensure we always run tasks.
                    ranTasks = runAllTasks();
                }
            } 
            ...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

读取时会通过一系列的PipChannel内链路,通过 invokeChannelRead 进行数据读取(不是偷懒不想写,耦合度太高了,相互调用的情况太多,且不同情况路径还会有差异,这里只写Socket读取密切相关的地方)。

看下读取过程:

调用读取逻辑,会将读取的值加入ByteBufAllocator,通过ByteBufAllocator进行链接,做一次 零拷贝 处理

/**
 * {@link io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read()}
 */
@Override
public final void read() {
    final ChannelConfig config = config();
    if (shouldBreakReadReady(config)) {
        clearReadPending();
        return;
    }
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            byteBuf = allocHandle.allocate(allocator);
            // 此处为重点,会将读取的值加入ByteBufAllocator,做一次 零拷贝 处理
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                // nothing was read. release the buffer.
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

将数据读入 ByteBuf (PooledUnsafeDirectByteBuf)

/**
 * {@link io.netty.channel.socket.nio.NioSocketChannel#doReadBytes(ByteBuf)}
 */
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

读取channel中的数据

/**
 * {@link io.netty.buffer.AbstractByteBuf#writeBytes(java.nio.channels.ScatteringByteChannel, int)}
 */
@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, length);
    if (writtenBytes > 0) {
        writerIndex += writtenBytes;
    }
    return writtenBytes;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

实际读取,通过channel和ByteBuffer

/**
 * {@link io.netty.buffer.PooledByteBuf#setBytes(int, java.nio.channels.ScatteringByteChannel, int)}
 */
@Override
public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    try {
        return in.read(internalNioBuffer(index, length));
    } catch (ClosedChannelException ignored) {
        return -1;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

ByteBuffer创建,创建出 java.nio.DirectByteBuffer

/**
 * {@link io.netty.buffer.PooledByteBuf#internalNioBuffer()}
 */
protected final ByteBuffer internalNioBuffer() {
    ByteBuffer tmpNioBuf = this.tmpNioBuf;
    if (tmpNioBuf == null) {
        // 创建出 java.nio.DirectByteBuffer
        this.tmpNioBuf = tmpNioBuf = newInternalNioBuffer(memory);
    } else {
        tmpNioBuf.clear();
    }
    return tmpNioBuf;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

读取的关键在于channel读入java.nio.DirectByteBuffer中,下面看下读取的过程:

通过IOUtil.read进行读取

/**
 * socketChannel read
 * {@link sun.nio.ch.SocketChannelImpl#read(ByteBuffer)}
 */
@Override
public int read(ByteBuffer buf) throws IOException {
    Objects.requireNonNull(buf);

    readLock.lock();
    try {
        boolean blocking = isBlocking();
        int n = 0;
        try {
            beginRead(blocking);

            // check if connection has been reset
            if (connectionReset)
                throwConnectionReset();

            // check if input is shutdown
            if (isInputClosed)
                return IOStatus.EOF;
			// 这里读取
            n = IOUtil.read(fd, buf, -1, nd);
            if (blocking) {
                while (IOStatus.okayToRetry(n) && isOpen()) {
                    park(Net.POLLIN);
                    n = IOUtil.read(fd, buf, -1, nd);
                }
            }
        } catch (ConnectionResetException e) {
            connectionReset = true;
            throwConnectionReset();
        } finally {
            endRead(blocking, n > 0);
            if (n <= 0 && isInputClosed)
                return IOStatus.EOF;
        }
        return IOStatus.normalize(n);
    } finally {
        readLock.unlock();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

注意这里的读取,如果不是DirectBuffer需要创建DirectBuffer,再put进dst,这里是用的DirectByteBuffer

/**
 * io 读取
 * {@link sun.nio.ch.IOUtil.read(FileDescriptor, ByteBuffer, long, boolean, int, NativeDispatcher)}
 */
static int read(FileDescriptor fd, ByteBuffer dst, long position,
                boolean directIO, int alignment, NativeDispatcher nd)
    throws IOException
{
    if (dst.isReadOnly())
        throw new IllegalArgumentException("Read-only buffer");
    // 注意这里的读取,如果不是DirectByteBuffer需要再做其他的处理
    if (dst instanceof DirectBuffer)
        return readIntoNativeBuffer(fd, dst, position, directIO, alignment, nd);

    // Substitute a native buffer
    ByteBuffer bb;
    int rem = dst.remaining();
    // 可以看出这里必须要使用DirectBuffer类型进行读取
    if (directIO) {
        Util.checkRemainingBufferSizeAligned(rem, alignment);
        bb = Util.getTemporaryAlignedDirectBuffer(rem, alignment);
    } else {
        bb = Util.getTemporaryDirectBuffer(rem);
    }
    try {
        int n = readIntoNativeBuffer(fd, bb, position, directIO, alignment,nd);
        bb.flip();
        if (n > 0)
            // 读取完成后再进行一次拷贝
            dst.put(bb);
        return n;
    } finally {
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

一路跟踪代码来到了这里,用native修饰的方法:

/**
 * {@link sun.nio.ch.SocketDispatcher.read0(FileDescriptor, long, int)}
 */
private static native int read0(FileDescriptor fd, long address, int len) throws IOException;
  • 1
  • 2
  • 3
  • 4

到这里为止,可以看出这的关键点在于java.nio.DirectByteBuffer的使用,关于DirectByteBuffer的详细介绍和作用可以看些这篇文章 Java NIO学习笔记三(堆外内存之 DirectByteBuffer 详解) 这里不再赘述,简而言之是开辟一个堆外内存(记个笔记,这个堆外内存通过jdk.internal.ref.Cleaner管理,依然可以被gc回收,所以有些文章说需要自己处理堆外内存,可能是老版本或先入为主了,关键是你没法操作这个堆外内存,也没有提供相应的public方法去处理),这里有个重要的点,无法直接操作这个堆外内存,也就是说如果需要处理数据则需要再拷贝成byteArray才能进行处理。

那Netty是用DirectByteBuffer读取的,如果进行业务处理呢?又是一路的追踪:

  1. 通过io.netty.buffer.ByteBuf#readBytes(byte[])读取;
  2. io.netty.buffer.PooledUnsafeDirectByteBuf#getBytes(int, byte[], int, int)
  3. io.netty.buffer.UnsafeByteBufUtil#getBytes(AbstractByteBuf, long, int, byte[], int, int)
  4. io.netty.util.internal.PlatformDependent#copyMemory(long, byte[], int, long)
  5. io.netty.util.internal.PlatformDependent0#copyMemory(Object, long, Object, long, long)
  6. jdk.internal.misc.Unsafe#copyMemory0(Object, long, Object, long, long)

简单讲就是从堆外内存拷贝到了堆内(UNSAFE真香)。

根据Netty的思路我们简化一下,给出一个实例

这个是selector的处理:

package person.pluto.natcross2.nio;

import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import person.pluto.natcross2.executor.NatcrossExecutor;
import person.pluto.natcross2.utils.Assert;
import person.pluto.natcross2.utils.CountWaitLatch;

/**
 * <p>
 * nio 容器
 * </p>
 *
 * @author Pluto
 * @since 2021-04-13 09:25:51
 */
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class NioHallows implements Runnable {

	public static final NioHallows INSTANCE = new NioHallows();

	/**
	 * 注册监听动作
	 * <p>
	 * 要注意这里只拿最后的一次注册为准,即 {@code channel} 只能与一个 {@code proccesser} 动作对应
	 *
	 * @param channel
	 * @param ops        依据以下值进行或运算进行最后结果设定,并且 {@code channel} 要支持相应的动作
	 *                   <p>
	 *                   - {@link SelectionKey#OP_ACCEPT}
	 *                   <p>
	 *                   - {@link SelectionKey#OP_CONNECT}
	 *                   <p>
	 *                   - {@link SelectionKey#OP_READ}
	 *                   <p>
	 *                   - {@link SelectionKey#OP_WRITE}
	 * @param proccesser 要执行的动作
	 * @throws IOException
	 * @author Pluto
	 * @since 2021-04-26 15:55:38
	 */
	public static void register(SelectableChannel channel, int ops, INioProcesser proccesser) throws IOException {
		INSTANCE.register0(channel, ops, proccesser);
	}

	/**
	 * 根据 {@link SelectionKey} 恢复监听事件的注册
	 *
	 * @param key 原始的key
	 * @param ops 要与通过 {@link #register(SelectableChannel, int, INioProcesser)}
	 *            注册的事件统一
	 * @throws IOException
	 * @author Pluto
	 * @since 2021-05-07 13:21:49
	 */
	public static boolean reRegisterByKey(SelectionKey key, int ops) {
		return INSTANCE.reRegisterByKey0(key, ops);
	}

	/**
	 * 释放注册
	 *
	 * @param channel
	 * @author Pluto
	 * @since 2021-04-26 16:03:51
	 */
	public static void release(SelectableChannel channel) {
		INSTANCE.release0(channel);
	}

	private volatile Thread myThread = null;

	private volatile boolean alive = false;
	private volatile boolean canceled = false;

	private volatile Selector selector;
	private final Object selectorLock = new Object();

	private final CountWaitLatch countWaitLatch = new CountWaitLatch();

	private final Map<SelectableChannel, ProcesserHolder> channelProcesserMap = new ConcurrentHashMap<>();

	@Setter
	@Getter
	private long selectTimeout = 10L;
	@Setter
	@Getter
	private long wakeupSleepNanos = 1000000L;

	/**
	 * 获取 {@link #selector}
	 * <p>
	 * 若 {@link #selector} 未有值,则会进行初始化:打开selector,并执行 {@link #start()}
	 *
	 * @return
	 * @throws IOException
	 * @author Pluto
	 * @since 2021-04-26 16:04:30
	 */
	public Selector getSelector() throws IOException {
		// 判空、返回逻辑,按第一次取值进行,缺点是不能判断是否已经关闭,但与 this.cancel()
		// 方法中的执行顺序来看,会先被设置为null,再去close,所以可以大概率认为若不为null即为没有关闭
		Selector selector = this.selector;
		if (Objects.isNull(selector)) {
			synchronized (this.selectorLock) {
				// 二次校验
				// 若是主动退出,则不在创建,避免退出时有新任务而被重启,若要重新启用,则需要主动调用 start() 方法来启动
				if (Objects.isNull(this.selector) && !this.canceled) {
					this.selector = Selector.open();
					this.start();
				}
			}
			selector = this.selector;
			if (Objects.isNull(selector)) {
				throw new IOException("NioHallows's selector is closed");
			}
		}

		return selector;
	}

	/**
	 * 获取唤醒后的 {@link #selector}
	 * <p>
	 * 注意,若 {@link #run()} 快于你的任务,还是会被再次阻塞,只是执行了一次 {@link Selector#wakeup()}
	 *
	 * @return
	 * @throws IOException
	 * @author Pluto
	 * @since 2021-04-26 16:07:00
	 */
	public Selector getWakeupSelector() throws IOException {
		return this.getSelector().wakeup();
	}

	/**
	 * 注册监听动作
	 * <p>
	 * 要注意这里只拿最后的一次注册为准,即 {@code channel} 只能与一个 {@code proccesser} 动作对应
	 *
	 * @param channel
	 * @param ops        依据以下值进行或运算进行最后结果设定,并且 {@code channel} 要支持相应的动作
	 *                   <p>
	 *                   - {@link SelectionKey#OP_ACCEPT}
	 *                   <p>
	 *                   - {@link SelectionKey#OP_CONNECT}
	 *                   <p>
	 *                   - {@link SelectionKey#OP_READ}
	 *                   <p>
	 *                   - {@link SelectionKey#OP_WRITE}
	 * @param proccesser 要执行的动作
	 * @throws IOException
	 * @author Pluto
	 * @since 2021-04-26 15:55:38
	 */
	public void register0(SelectableChannel channel, int ops, INioProcesser proccesser) throws IOException {
		Objects.requireNonNull(channel, "channel non null");
		try {
			this.channelProcesserMap.put(channel, ProcesserHolder.of(channel, ops, proccesser));
			channel.configureBlocking(false);

			this.countWaitLatch.countUp();
			// 这里有个坑点,如果在select中,这里会被阻塞
			channel.register(this.getWakeupSelector(), ops);
		} catch (Throwable e) {
			this.channelProcesserMap.remove(channel);
			throw e;
		} finally {
			this.countWaitLatch.countDown();
		}
	}

	/**
	 * 根据 {@link SelectionKey} 恢复监听事件的注册
	 *
	 * @param key 原始的key
	 * @param ops 要与通过 {@link #register0(SelectableChannel, int, INioProcesser)}
	 *            注册的事件统一
	 * @throws IOException
	 * @author Pluto
	 * @since 2021-05-07 13:21:49
	 */
	public boolean reRegisterByKey0(SelectionKey key, int ops) {
		Objects.requireNonNull(key, "key non null");

		Assert.state(key.selector() == this.selector, "this SelectionKey is not belong NioHallows's selector");

		if (!key.isValid()) {
			return false;
		}

		// 通过事件和源码分析,恢复注册是通过updateKeys.addLast进行,虽然没有被阻塞,但是需要进行一次唤醒才可以成功恢复事件监听
		// 因无法获知是否成功注入selector,所以必须要进行一次唤醒操作,并且没有阻塞的问题,所以这里不通过countWaitLatch进行同步
		key.interestOps(ops);

		try {
			this.getWakeupSelector();
		} catch (IOException e) {
			// 出错了交给其他的流程逻辑,这里只进行一次唤醒
		}

		return true;
	}

	/**
	 * 释放注册
	 *
	 * @param channel
	 * @author Pluto
	 * @since 2021-04-26 16:03:51
	 */
	public void release0(SelectableChannel channel) {
		if (Objects.isNull(channel)) {
			return;
		}
		this.channelProcesserMap.remove(channel);

		SelectionKey key = channel.keyFor(this.selector);

		if (Objects.nonNull(key)) {
			key.cancel();
		}
	}

	@Override
	public void run() {
		CountWaitLatch countWaitLatch = this.countWaitLatch;
		Map<SelectableChannel, ProcesserHolder> chanelProcesserMap = this.channelProcesserMap;

		for (; this.alive;) {
			// 给注册事务一个时间,如果等待时间太长(可能需要注入的太多),就跳出再去获取新事件,防止饿死
			try {
				countWaitLatch.await(this.getWakeupSleepNanos(), TimeUnit.NANOSECONDS);
			} catch (InterruptedException e) {
				log.warn("selector wait register timeout");
			}

			Selector selector;
			try {
				selector = getSelector();

				// 采用有期限的监听,以免线程太快,没有来的及注册,就永远阻塞在那里了
				int select = selector.select(this.getSelectTimeout());
				if (select <= 0) {
					continue;
				}
			} catch (IOException e) {
				log.error("NioHallows run exception", e);
				continue;
			}

			Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
			for (; iterator.hasNext();) {
				SelectionKey key = iterator.next();
				iterator.remove();
				key.interestOps(0);

				ProcesserHolder processerHolder = chanelProcesserMap.get(key.channel());
				if (Objects.isNull(processerHolder)) {
					key.cancel();
					continue;
				}

				NatcrossExecutor.executeNioAction(() -> {
					processerHolder.proccess(key);
				});
			}
		}
	}

	/**
	 * 启动nio事件监听
	 *
	 * @author Pluto
	 * @since 2021-04-26 16:33:07
	 */
	public synchronized void start() {

		this.canceled = false;
		this.alive = true;

		if (this.myThread == null || !this.myThread.isAlive()) {
			this.myThread = new Thread(this);
			this.myThread.setName("nio-hallows");
			this.myThread.start();

			log.info("NioHallows is started!");
		}
	}

	/**
	 * 退出nio事件监听
	 *
	 * @author Pluto
	 * @since 2021-04-26 16:33:33
	 */
	public void cancel() {
		// 假设A线程执行到了 this.selector = Selector.open() 但是调用 this.cancel()
		// 方法的B线程抢占cpu成功,并一直到执行完成,此时A线程抢占CPU继续执行,又会进行重启,与关停项目时的关停期望不同。
		//
		// 此处锁定 this.selectorLock 后再去设置 this.canceled,形成与 this.getSelector()
		// 的线程同步,同时避免了被动调用 this.start() 时与 this.cancel() 的同步问题,最终可关闭。
		// 虽与主动调用 this.start() 有不同步的风险,但 this.start() 、 this.cancel()
		// 主动调用的场景有极大对立性,所以不进行过多的关照。
		//
		// 注意:若 this.cancel() 添加了synchronized,存在死锁的可能!!!
		synchronized (this.selectorLock) {
			this.canceled = true;
		}

		log.info("NioHallows cancel");

		this.alive = false;

		Selector selector;
		if ((selector = this.selector) != null) {
			this.selector = null;
			try {
				selector.close();
			} catch (IOException e) {
				// do nothing
			}
		}

		Thread myThread;
		if ((myThread = this.myThread) != null) {
			this.myThread = null;
			myThread.interrupt();
		}
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347

执行器内容:

@Data
@AllArgsConstructor(staticName = "of")
public class ProcesserHolder {

	private SelectableChannel channel;

	private int interestOps;

	private INioProcesser processer;

	/**
	 * 执行事件的任务
	 *
	 * @param key
	 * @author Pluto
	 * @since 2021-04-26 16:35:36
	 */
	public void proccess(SelectionKey key) {
		this.processer.proccess(key);

		if (!NioHallows.reRegisterByKey(key, this.interestOps)) {
			NioHallows.release(this.channel);
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

数据读写:

@Slf4j
public class SimplePassway implements Runnable, INioProcesser {
...
	/**
	 * 向输出通道输出数据
	 * <p>
	 * 这里不只是为了DMA而去用DMA,而是这里有奇葩问题
	 * <p>
	 * 如能采用了SocketChannel,而去用outputStream的时候,不管输入输出,都会有奇怪的问题,比如输出会莫名的阻塞住
	 * <p>
	 * 整体就是如果能用nio的方法,但是用了bio形式都会各种什么 NullPointException、IllageSateException 等等错误
	 * <p>
	 * 经过实验,是java8会出现阻塞的情况,java14没有出些这些奇怪的问题,估计是jni进行了变更吧
	 * </p>
	 *
	 * @param byteBuffer
	 * @throws IOException
	 * @author Pluto
	 * @since 2021-04-09 16:37:33
	 */
	private void write(ByteBuffer byteBuffer) throws IOException {
		SocketChannel outputChannel;
		OutputStream outputStream;
		if (Objects.nonNull((outputChannel = this.getOutputChannel()))) {
			// 这里要注意,可能缓存空间不足,而没有完全写出byteBuffer,所以需要循环处理进行全部输出(或其他的方式保证输出完毕)
			while (byteBuffer.hasRemaining()) {
				outputChannel.write(byteBuffer);
			}
		} else {
			outputStream = this.getOutputStream();
			outputStream.write(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit());
			outputStream.flush();
		}
	}

	// ============== nio =================

	@Setter(AccessLevel.NONE)
	@Getter(AccessLevel.NONE)
	private ByteBuffer byteBuffer;

	private ByteBuffer obtainByteBuffer() {
		if (Objects.isNull(byteBuffer)) {
			if (Objects.isNull(this.getOutputChannel())) {
			    // 如果需要处理数据的话,这里直接声明为 java.nio.HeapByteBuffer.HeapByteBuffer 就好了,
			    // java.nio.DirectByteBuffer#get(byte[], int, int)用的也是UNSAFE.copyMemory
				byteBuffer = ByteBuffer.allocate(streamCacheSize);
			} else {
				// 输入输出可以使用channel,此处则使用DirectByteBuffer,这时候才真正体现出了DMA
				byteBuffer = ByteBuffer.allocateDirect(streamCacheSize);
			}
		}
		return byteBuffer;
	}

	@Override
	public void proccess(SelectionKey key) {
		if (alive && key.isValid()) {
			ByteBuffer buffer = this.obtainByteBuffer();

			SocketChannel inputChannel = (SocketChannel) key.channel();
			try {
				int len = -1;
				do {
					buffer.clear();

					len = inputChannel.read(buffer);

					if (len > 0) {
						buffer.flip();
						if (buffer.hasRemaining()) {
							this.write(buffer);
						}
					}

				} while (len > 0);

				// 如果不是负数,则还没有断开连接,返回继续等待
				if (len >= 0) {
					return;
				}
			} catch (IOException e) {
				//
			}
		}

		log.debug("one InputToOutputThread closed");

		this.cancell();
	}
...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92

java中channel真的比stream快吗?

在需要处理数据的情况下,java中channel还是需要一次拷贝的过程;然后我们看下InputStream.read的实现,还是通过一路追踪过去,找到 java.net.SocketInputStream#socketRead0(FileDescriptor, byte[], int, int, int) 为最终实现方法;这里并没有再次拷贝的过程,直接写入了目标byteArray,我们知道一般这种jni中的都会相对较快一些,难道stream真的比channel要省一步拷贝过程吗?

带着疑问我们看下jni(使用的jdk_u8的源码,jdk看14,jni看8真有点儿意思

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/516510?site
推荐阅读
相关标签