赞
踩
老铁们好,我是V,今天我们简单聊聊ES中的索引存储类型
目前ES中主要支持以下几种存储类型
fs
默认文件系统实现。这将根据操作环境选择最佳实施,目前会默认启用hybridfs
simplefs
Simple FS 类型是SimpleFsDirectory
使用随机访问文件的文件系统存储(映射到 Lucene)的直接实现。这种实现的并发性能很差(多个线程会成为瓶颈),并且禁用了堆内存使用的一些优化。基本上使用的较少
niofs
NIO FS 类型使用 NIO 将分片索引存储在文件系统上(映射到 Lucene NIOFSDirectory
)。它允许多个线程同时读取同一个文件。但是不建议在 Windows 上使用,因为在window环境下Java 实现中存在一些错误,并且禁用了堆内存使用的某些优化。
mmapfs
MMap FS 类型通过将文件映射到内存 ( MMapDirectory
mmap) 将分片索引存储在文件系统上(映射到 Lucene)。内存映射占用进程中虚拟内存地址空间的一部分,其大小等于被映射文件的大小。在使用此类之前,请确保您已允许了足够的 虚拟地址空间。
hybridfs
该类型是niofs和mmaps的混合体,它根据读取访问模式为每种类型的文件选择最佳的文件系统类型。目前只有 Lucene 术语词典、规范和文档值文件是内存映射的。所有其他文件均使用 NIOFSDirectory 打开。和mmapfs类似,要确保你已允许了足够的 虚拟地址空间。
修改存储类型首先需要关闭索引,然后修改存储类型后再打开索引
POST /xxx/_close
PUT /xxx/_settings
{
"index.store.type": "niofs"
}
POST /xxx/_open
决定索引使用哪种存储类型的代码如下:
org.elasticsearch.index.store.FsDirectoryFactory#newFSDirectory
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException { final String storeType = indexSettings.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey()); IndexModule.Type type; if (IndexModule.Type.FS.match(storeType)) { type = IndexModule.defaultStoreType(IndexModule.NODE_STORE_ALLOW_MMAP.get(indexSettings.getNodeSettings())); } else { type = IndexModule.Type.fromSettingsKey(storeType); } Set<String> preLoadExtensions = new HashSet<>( indexSettings.getValue(IndexModule.INDEX_STORE_PRE_LOAD_SETTING)); switch (type) { case HYBRIDFS: // Use Lucene defaults final FSDirectory primaryDirectory = FSDirectory.open(location, lockFactory); if (primaryDirectory instanceof MMapDirectory) { MMapDirectory mMapDirectory = (MMapDirectory) primaryDirectory; return new HybridDirectory(lockFactory, setPreload(mMapDirectory, lockFactory, preLoadExtensions)); } else { return primaryDirectory; } case MMAPFS: return setPreload(new MMapDirectory(location, lockFactory), lockFactory, preLoadExtensions); case SIMPLEFS: return new SimpleFSDirectory(location, lockFactory); case NIOFS: return new NIOFSDirectory(location, lockFactory); default: throw new AssertionError("unexpected built-in store type [" + type + "]"); } }
当index.store.type为空、或者fs或者hybridfs在linux环境下都会选择hybridfs,即混合使用
niofs和mmapfs
所以上面看似有很多种类型,其实fs和hybridfs都是基于niofs和mmapfs来实现的,而simplefs基本上因为性能问题没有人使用,所以我们着重介绍下niofs和mmapfs实现。
niofs的实现是org.apache.lucene.store.NIOFSDirectory
niofs对外提供的ByteBuffer是HeapByteBuffer
存储类型打开后返回一个IndexInput供其他模块读取数据
# org.apache.lucene.store.NIOFSDirectory#openInput @Override public IndexInput openInput(String name, IOContext context) throws IOException { ensureOpen(); ensureCanRead(name); Path path = getDirectory().resolve(name); FileChannel fc = FileChannel.open(path, StandardOpenOption.READ); boolean success = false; try { final NIOFSIndexInput indexInput = new NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, context); success = true; return indexInput; } finally { if (success == false) { IOUtils.closeWhileHandlingException(fc); } } }
IndexInput的实现是NIOFSIndexInput
其中主要包含一个FileChannel、是否克隆、开始坐标和结束坐标
static final class NIOFSIndexInput extends BufferedIndexInput {
/**
* The maximum chunk size for reads of 16384 bytes.
*/
private static final int CHUNK_SIZE = 16384;
/** the file channel we will read from */
protected final FileChannel channel;
/** is this instance a clone and hence does not own the file to close it */
boolean isClone = false;
/** start offset: non-zero in the slice case */
protected final long off;
/** end offset (start+length) */
protected final long end;
}
父类BufferedIndexInput中的buffer才是真正对外提供数据的对象
public abstract class BufferedIndexInput extends IndexInput implements RandomAccessInput { private static final ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.allocate(0); /** Default buffer size set to {@value #BUFFER_SIZE}. */ public static final int BUFFER_SIZE = 1024; /** Minimum buffer size allowed */ public static final int MIN_BUFFER_SIZE = 8; // The normal read buffer size defaults to 1024, but // increasing this during merging seems to yield // performance gains. However we don't want to increase // it too much because there are quite a few // BufferedIndexInputs created during merging. See // LUCENE-888 for details. /** * A buffer size for merges set to {@value #MERGE_BUFFER_SIZE}. */ public static final int MERGE_BUFFER_SIZE = 4096; private int bufferSize = BUFFER_SIZE; private ByteBuffer buffer = EMPTY_BYTEBUFFER; private long bufferStart = 0; // position in file of buffer ... }
其他模块会调用NIOFSIndexInput的父类BufferedIndexInput的readByte readLong readInt等方法获取数据,而这些方法又是调用内部的buffer来获取数据
@Override public final short readShort() throws IOException { if (Short.BYTES <= buffer.remaining()) { return buffer.getShort(); } else { return super.readShort(); } } @Override public final int readInt() throws IOException { if (Integer.BYTES <= buffer.remaining()) { return buffer.getInt(); } else { return super.readInt(); } } @Override public final long readLong() throws IOException { if (Long.BYTES <= buffer.remaining()) { return buffer.getLong(); } else { return super.readLong(); } }
这些方法都会尝试先从buffer中获取数据,如果buffer中没有则调用父类的的方法
例如readInt会尝试先从buffer中获取数据,如果buffer中没有则调用父类的readInt方法
public int readInt() throws IOException {
return ((readByte() & 0xFF) << 24) | ((readByte() & 0xFF) << 16)
| ((readByte() & 0xFF) << 8) | (readByte() & 0xFF);
}
而父类的readInt又会调用readByte,最终buffer中如果没有数据则会出发refill逻辑
@Override
public final byte readByte() throws IOException {
if (buffer.hasRemaining() == false) {
refill();
}
return buffer.get();
}
buffer中的数据是调用refill方法首先创建一个HeapByteBuffer,然后调用readInternal来填充buffer
# org.apache.lucene.store.BufferedIndexInput#refill private void refill() throws IOException { long start = bufferStart + buffer.position(); long end = start + bufferSize; // bufferSize是1024 if (end > length()) // don't read past EOF end = length(); int newLength = (int)(end - start); if (newLength <= 0) throw new EOFException("read past EOF: " + this); if (buffer == EMPTY_BYTEBUFFER) { // 创建一个HeapByteBuffer buffer = ByteBuffer.allocate(bufferSize); // allocate buffer lazily // 检查当前坐标是否越界 seekInternal(bufferStart); } buffer.position(0); buffer.limit(newLength); bufferStart = start; // 填充当前的buffer readInternal(buffer); // Make sure sub classes don't mess up with the buffer. assert buffer.order() == ByteOrder.BIG_ENDIAN : buffer.order(); assert buffer.remaining() == 0 : "should have thrown EOFException"; assert buffer.position() == newLength; // 切换到读模式 buffer.flip(); }
接下来我们来看看readInternal,里面是调用FileChannel的read方法来填充ByteBuffer
@Override protected void readInternal(ByteBuffer b) throws IOException { long pos = getFilePointer() + off; if (pos + b.remaining() > end) { throw new EOFException("read past EOF: " + this); } try { int readLength = b.remaining(); while (readLength > 0) { final int toRead = Math.min(CHUNK_SIZE, readLength); b.limit(b.position() + toRead); assert b.remaining() == toRead; final int i = channel.read(b, pos); if (i < 0) { // be defensive here, even though we checked before hand, something could have changed throw new EOFException("read past EOF: " + this + " buffer: " + b + " chunkLen: " + toRead + " end: " + end); } assert i > 0 : "FileChannel.read with non zero-length bb.remaining() must always read at least one byte (FileChannel is in blocking mode, see spec of ReadableByteChannel)"; pos += i; readLength -= i; } assert readLength == 0; } catch (IOException ioe) { throw new IOException(ioe.getMessage() + ": " + this, ioe); } }
# sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer, long) public int read(ByteBuffer dst, long position) throws IOException { if (dst == null) throw new NullPointerException(); if (position < 0) throw new IllegalArgumentException("Negative position"); if (!readable) throw new NonReadableChannelException(); if (direct) Util.checkChannelPositionAligned(position, alignment); ensureOpen(); if (nd.needsPositionLock()) { synchronized (positionLock) { return readInternal(dst, position); } } else { return readInternal(dst, position); // 走这里 } }
# sun.nio.ch.FileChannelImpl#readInternal private int readInternal(ByteBuffer dst, long position) throws IOException { assert !nd.needsPositionLock() || Thread.holdsLock(positionLock); int n = 0; int ti = -1; try { // 标记可能会长时间block beginBlocking(); ti = threads.add(); if (!isOpen()) return -1; do { n = IOUtil.read(fd, dst, position, direct, alignment, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { threads.remove(ti); // 解除block endBlocking(n > 0); assert IOStatus.check(n); } }
从缓存中获取DirectByteBuffer,如果没有则新建一个DirectByteBuffer
将文件内容读取到DirectByteBuffer
将DirectByteBuffer所有字节写入HeapByteBuffer
# sun.nio.ch.IOUtil#read(java.io.FileDescriptor, java.nio.ByteBuffer, long, boolean, int, sun.nio.ch.NativeDispatcher) static int read(FileDescriptor fd, ByteBuffer dst, long position, boolean directIO, int alignment, NativeDispatcher nd) throws IOException { if (dst.isReadOnly()) throw new IllegalArgumentException("Read-only buffer"); if (dst instanceof DirectBuffer) return readIntoNativeBuffer(fd, dst, position, directIO, alignment, nd); // Substitute a native buffer ByteBuffer bb; int rem = dst.remaining(); if (directIO) { // directIO 是 flase Util.checkRemainingBufferSizeAligned(rem, alignment); bb = Util.getTemporaryAlignedDirectBuffer(rem, alignment); } else { bb = Util.getTemporaryDirectBuffer(rem); // 从缓存中返回java.nio.DirectByteBuffer } try { // 从fd中根据坐标获取数据,directIO为false int n = readIntoNativeBuffer(fd, bb, position, directIO, alignment,nd); bb.flip(); if (n > 0) // 非常朴实无华,循环获取byte放到目标ByteBuffer中 dst.put(bb); // dts是HeapByteBuffer return n; } finally { // 释放ByteBuffer,如果可以尝试缓存ByteBuffer Util.offerFirstTemporaryDirectBuffer(bb); } }
从缓存中获取DirectByteBuffer,如果没有则新建一个DirectByteBuffer
# sun.nio.ch.Util#getTemporaryDirectBuffer public static ByteBuffer getTemporaryDirectBuffer(int size) { // If a buffer of this size is too large for the cache, there // should not be a buffer in the cache that is at least as // large. So we'll just create a new one. Also, we don't have // to remove the buffer from the cache (as this method does // below) given that we won't put the new buffer in the cache. if (isBufferTooLarge(size)) { return ByteBuffer.allocateDirect(size); } BufferCache cache = bufferCache.get(); ByteBuffer buf = cache.get(size); if (buf != null) { return buf; // 可能走这里 } else { // No suitable buffer in the cache so we need to allocate a new // one. To avoid the cache growing then we remove the first // buffer from the cache and free it. if (!cache.isEmpty()) { buf = cache.removeFirst(); free(buf); } return ByteBuffer.allocateDirect(size); // 返回 java.nio.DirectByteBuffer } }
将文件内容读取到DirectByteBuffer
# sun.nio.ch.IOUtil#readIntoNativeBuffer private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb, long position, boolean directIO, int alignment, NativeDispatcher nd) throws IOException { int pos = bb.position(); int lim = bb.limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); if (directIO) { // directIO是false Util.checkBufferPositionAligned(bb, pos, alignment); Util.checkRemainingBufferSizeAligned(rem, alignment); } if (rem == 0) return 0; int n = 0; if (position != -1) { // 走的这里 最终调用linux的pread64 n = nd.pread(fd, ((DirectBuffer)bb).address() + pos, rem, position); } else { n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem); } if (n > 0) // 更新bb的position bb.position(pos + n); return n; }
调用native方法pread0
# sun.nio.ch.FileDispatcherImpl#pread
int pread(FileDescriptor fd, long address, int len, long position)
throws IOException
{
return pread0(fd, address, len, position);
}
static native int pread0(FileDescriptor fd, long address, int len,
long position) throws IOException;
FileDispatcherImpl.c.read0
https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/19fb8f93c59dfd791f62d41f332db9e306bc1422/src/java.base/unix/native/libnio/ch/FileDispatcherImpl.c#L89
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_pread0(JNIEnv *env, jclass clazz, jobject fdo,
jlong address, jint len, jlong offset)
{
jint fd = fdval(env, fdo);
void *buf = (void *)jlong_to_ptr(address);
return convertReturnVal(env, pread64(fd, buf, len, offset), JNI_TRUE);
}
非常朴实无华,循环获取byte放到目标ByteBuffer中
# java.nio.ByteBuffer#put(java.nio.ByteBuffer)
public ByteBuffer put(ByteBuffer src) {
if (src == this)
throw createSameBufferException();
if (isReadOnly())
throw new ReadOnlyBufferException();
int n = src.remaining();
if (n > remaining())
throw new BufferOverflowException();
for (int i = 0; i < n; i++)
put(src.get());
return this;
}
清空DirectByteBuffer,并尝试将清空后的DirectByteBuffer缓存起来
# sun.nio.ch.Util#offerFirstTemporaryDirectBuffer
static void offerFirstTemporaryDirectBuffer(ByteBuffer buf) {
// If the buffer is too large for the cache we don't have to
// check the cache. We'll just free it.
if (isBufferTooLarge(buf)) {
free(buf);
return;
}
assert buf != null;
BufferCache cache = bufferCache.get();
if (!cache.offerFirst(buf)) { // 缓存buf
// cache is full
free(buf); // 清空buf
}
mmapfs的实现类是org.apache.lucene.store.MMapDirectory
mmapfs使用MMapDirectory提供openInput返回一个IndexInput给其他的模块使用
# org.apache.lucene.store.MMapDirectory#openInput
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
ensureCanRead(name);
Path path = directory.resolve(name);
try (FileChannel c = FileChannel.open(path, StandardOpenOption.READ)) {
final String resourceDescription = "MMapIndexInput(path=\"" + path.toString() + "\")";
final boolean useUnmap = getUseUnmap(); // useUnmap=true
// 根据ByteBuffer数量选择使用SingleBufferImpl或者MultiBufferImpl包装
return ByteBufferIndexInput.newInstance(resourceDescription,
map(resourceDescription, c, 0, c.size()),
c.size(), chunkSizePower, new ByteBufferGuard(resourceDescription, useUnmap ? CLEANER : null)); // useUnmap为true
}
}
ByteBufferIndexInput.newInstance 就是个空壳
public static ByteBufferIndexInput newInstance(String resourceDescription, ByteBuffer[] buffers, long length, int chunkSizePower, ByteBufferGuard guard) {
if (buffers.length == 1) {
return new SingleBufferImpl(resourceDescription, buffers[0], length, chunkSizePower, guard);
} else {
return new MultiBufferImpl(resourceDescription, buffers, 0, length, chunkSizePower, guard);
}
}
其中SingleBufferImpl和MultiBufferImpl本质都是代理了ByteBuffer对象,所以这些我们直接跳过,重点是这个map方法
方法返回的ByteBuffer数组是MappedByteBuffer,具体的实现类是java.nio.DirectByteBufferR
maxChunkSize的获取规则是 Constants.
JRE_IS_64BIT
? (1 << 30) : (1 << 28);
在64位系统下是1073741824即1024MB
this.chunkSizePower = 31 Integer.numberOfLeadingZeros(maxChunkSize); = 30
这里面的preload一直是false,所以buffer.load()不会触发
# org.apache.lucene.store.MMapDirectory#map final ByteBuffer[] map(String resourceDescription, FileChannel fc, long offset, long length) throws IOException { if ((length >>> chunkSizePower) >= Integer.MAX_VALUE) throw new IllegalArgumentException("RandomAccessFile too big for chunk size: " + resourceDescription); final long chunkSize = 1L << chunkSizePower; // chunkSizePower是30, chunkSize是1G // we always allocate one more buffer, the last one may be a 0 byte one // 根据文件大小和块大小来计算分片数量 final int nrBuffers = (int) (length >>> chunkSizePower) + 1; ByteBuffer buffers[] = new ByteBuffer[nrBuffers]; long bufferStart = 0L; for (int bufNr = 0; bufNr < nrBuffers; bufNr++) { int bufSize = (int) ( (length > (bufferStart + chunkSize)) ? chunkSize : (length - bufferStart) ); MappedByteBuffer buffer; // buffer 是 java.nio.DirectByteBufferR try { buffer = fc.map(MapMode.READ_ONLY, offset + bufferStart, bufSize); } catch (IOException ioe) { throw convertMapFailedIOException(ioe, resourceDescription, bufSize); } if (preload) { // preload=false buffer.load(); } buffers[bufNr] = buffer; bufferStart += bufSize; } return buffers; }
重点看下fileChannel.map方法
beginBlocking() 方法 标记可能会无限期阻塞,需要和endBlocking配合使用
nd.size(fd) 获取文件大小,然后校验文件大小是不是小于位点+块大小,如果是则扩张文件,但是我们是只读模式所以这个逻辑不会走
map0(imode, mapPosition, mapSize) 这段逻辑开启虚拟内存地址和文件之间的映射
nd.duplicateForMapping(fd); 创建了一个新的FileDescriptor
创建新的DirectByteBufferR Util.newMappedByteBufferR(isize, addr + pagePosition, mfd, um);
endBlocking()结束标记阻塞
# sun.nio.ch.FileChannelImpl#map // mode是MapMode.READ_ONLY // position是开始坐标 // size是小文件则是文件大小,大文件则是块大小 public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { ensureOpen(); if (mode == null) throw new NullPointerException("Mode is null"); if (position < 0L) throw new IllegalArgumentException("Negative position"); if (size < 0L) throw new IllegalArgumentException("Negative size"); if (position + size < 0) throw new IllegalArgumentException("Position + size overflow"); if (size > Integer.MAX_VALUE) throw new IllegalArgumentException("Size exceeds Integer.MAX_VALUE"); int imode = -1; if (mode == MapMode.READ_ONLY) imode = MAP_RO; else if (mode == MapMode.READ_WRITE) imode = MAP_RW; else if (mode == MapMode.PRIVATE) imode = MAP_PV; assert (imode >= 0); if ((mode != MapMode.READ_ONLY) && !writable) throw new NonWritableChannelException(); if (!readable) throw new NonReadableChannelException(); long addr = -1; int ti = -1; try { // 标记可能会一直阻塞,需要和endBlocking配合使用 beginBlocking(); ti = threads.add(); if (!isOpen()) return null; long mapSize; int pagePosition; synchronized (positionLock) { long filesize; do { filesize = nd.size(fd); // 获取文件大小 } while ((filesize == IOStatus.INTERRUPTED) && isOpen()); if (!isOpen()) return null; // 这段逻辑是写入时需要扩张文件,我们是只读模式不会走这段逻辑 if (filesize < position + size) { // Extend file size if (!writable) { throw new IOException("Channel not open for writing " + "- cannot extend file to required size"); } int rv; do { rv = nd.truncate(fd, position + size); } while ((rv == IOStatus.INTERRUPTED) && isOpen()); if (!isOpen()) return null; } // 正常情况size 大于0 if (size == 0) { addr = 0; // a valid file descriptor is not required FileDescriptor dummy = new FileDescriptor(); if ((!writable) || (imode == MAP_RO)) return Util.newMappedByteBufferR(0, 0, dummy, null); else return Util.newMappedByteBuffer(0, 0, dummy, null); } // allocationGranularity 16384 即16K pagePosition = (int)(position % allocationGranularity); long mapPosition = position - pagePosition; mapSize = size + pagePosition; try { // If map0 did not throw an exception, the address is valid // 开启虚拟内存地址 // imode MAP_RO addr = map0(imode, mapPosition, mapSize); } catch (OutOfMemoryError x) { // An OutOfMemoryError may indicate that we've exhausted // memory so force gc and re-attempt map System.gc(); try { Thread.sleep(100); } catch (InterruptedException y) { Thread.currentThread().interrupt(); } try { addr = map0(imode, mapPosition, mapSize); } catch (OutOfMemoryError y) { // After a second OOME, fail throw new IOException("Map failed", y); } } } // synchronized // On Windows, and potentially other platforms, we need an open // file descriptor for some mapping operations. FileDescriptor mfd; try { mfd = nd.duplicateForMapping(fd); } catch (IOException ioe) { unmap0(addr, mapSize); throw ioe; } assert (IOStatus.checkAll(addr)); assert (addr % allocationGranularity == 0); int isize = (int)size; Unmapper um = new Unmapper(addr, mapSize, isize, mfd); if ((!writable) || (imode == MAP_RO)) { return Util.newMappedByteBufferR(isize, addr + pagePosition, mfd, um); } else { return Util.newMappedByteBuffer(isize, addr + pagePosition, mfd, um); } } finally { threads.remove(ti); endBlocking(IOStatus.checkAll(addr)); } }
这段代码的是将文件的一部分和虚拟内存空间映射起来
# sun.nio.ch.FileChannelImpl#map0
// Creates a new mapping
private native long map0(int prot, long position, long length)
throws IOException;
https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/19fb8f93c59dfd791f62d41f332db9e306bc1422/src/java.base/unix/native/libnio/ch/FileChannelImpl.c#L74
map0源码
JNIEXPORT jlong JNICALL Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this, jint prot, jlong off, jlong len) { void *mapAddress = 0; jobject fdo = (*env)->GetObjectField(env, this, chan_fd); jint fd = fdval(env, fdo); int protections = 0; int flags = 0; if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) { protections = PROT_READ; flags = MAP_SHARED; } else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) { protections = PROT_WRITE | PROT_READ; flags = MAP_SHARED; } else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) { protections = PROT_WRITE | PROT_READ; flags = MAP_PRIVATE; } mapAddress = mmap64( 0, /* Let OS decide location */ len, /* Number of bytes to map */ protections, /* File permissions */ flags, /* Changes are shared */ fd, /* File descriptor of mapped file */ off); /* Offset into file */ if (mapAddress == MAP_FAILED) { if (errno == ENOMEM) { JNU_ThrowOutOfMemoryError(env, "Map failed"); return IOS_THROWN; } return handle(env, -1, "Map failed"); } return ((jlong) (unsigned long) mapAddress); }
# sun.nio.ch.Util#newMappedByteBufferR static MappedByteBuffer newMappedByteBufferR(int size, long addr, FileDescriptor fd, Runnable unmapper) { MappedByteBuffer dbb; if (directByteBufferRConstructor == null) initDBBRConstructor(); // 如果构造器为空则初始化构造器 try { // 反射创建新的DirectByteBufferR对象 dbb = (MappedByteBuffer)directByteBufferRConstructor.newInstance( new Object[] { size, addr, fd, unmapper }); } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { throw new InternalError(e); } return dbb; }
DirectByteBufferR 构造器
protected DirectByteBufferR(int cap, long addr,
FileDescriptor fd,
Runnable unmapper)
{
super(cap, addr, fd, unmapper);
this.isReadOnly = true;
}
DirectByteBufferR父类构造器
protected DirectByteBuffer(int cap, long addr,
FileDescriptor fd,
Runnable unmapper)
{
super(-1, 0, cap, cap, fd);
address = addr;
cleaner = Cleaner.create(this, unmapper);
att = null;
}
MappedByteBuffer(int mark, int pos, int lim, int cap, // package-private
FileDescriptor fd)
{
super(mark, pos, lim, cap);
this.fd = fd;
}
@Override public final byte readByte() throws IOException { try { // curBuf是 java.nio.DirectByteBufferR return guard.getByte(curBuf); // guard 是 org.apache.lucene.store.ByteBufferGuard } catch (BufferUnderflowException e) { do { curBufIndex++; if (curBufIndex >= buffers.length) { throw new EOFException("read past EOF: " + this); } setCurBuf(buffers[curBufIndex]); curBuf.position(0); } while (!curBuf.hasRemaining()); return guard.getByte(curBuf); } catch (NullPointerException npe) { throw new AlreadyClosedException("Already closed: " + this); } }
guard.getByte(curBuf)只是套了一层壳
# org.apache.lucene.store.ByteBufferGuard#getByte(java.nio.ByteBuffer)
public byte getByte(ByteBuffer receiver) {
ensureValid();
return receiver.get();
}
最终调用java.nio.DirectByteBuffer#get()
# java.nio.DirectByteBuffer#get()
public byte get() {
try {
return ((UNSAFE.getByte(ix(nextGetIndex()))));
} finally {
Reference.reachabilityFence(this);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。