Flink内部并非直接将对象存储在堆上,而是将对象序列化到一个个预先分配的MemorySegment中。MemorySegment是一段固定长度的内存(默认32KB);也是Flink中最小的内存分配单元。MemorySegment提供了高效的读写方法,它的底层可以是堆上的byte[],也可以是堆外(off-heap)ByteBuffer。可以把MemorySegment看作Java NIO中的ByteBuffer,Flink还实现了Java的java.io.DataOutput和java.io.DataInput接口,分别是AbstractPagedInputView和AbstractPagedOutputView,其可以通过一种逻辑视图的方式来操作连续的多块MemorySegment。
需要注意的是,上面所说的三部分的内存并非都是JVM堆上的内存,因为MemorySegment底层的内存可以在堆上,也可以在堆外(不由JVM管理)。对于Network Buffers,这一部分内存就是在堆外(off-heap)进行分配的;对于Managed Memory,这一部分内存可以配置在堆上,也可以配置在堆外。另外还需要注意的一点是,Managed Memory主要是在Batch模式下使用,在Streaming模式下这一部分内存并不会预分配,因而空闲出来的内存其实都是可以给用户自定义函数使用的。
Flink可以处理任意的Java或Scala对象,而不必实现特定的接口。对于Java实现的Flink程序,Flink会通过反射框架获取用户自定义函数返回的类型;而对于Scala实现的Flink程序,则通过Scala Compiler分析用户自定义函数返回的类型。每一种数据类型都对应一个TypeInfomation。其基本的数据类型如下:
通过TypeInfomation可以获取到对应数据类型的序列化器TypeSerializer。对于BasicTypeInfo,Flink提供了对应的序列化器;对于WritableTypeInfo,Flink会将序列化和反序列化操作委托给Hadoop Writable接口的write() and readFields();对于GenericTypeInfo,Flink默认使用Kyro进行序列化;而TupleTypeInfo、CaseClassTypeInfo和PojoTypeInfo是一种组合类型,序列化时分别委托给成员的序列化器进行序列化即可。
在早期版本中,由于MemorySegment是只基于堆内存的,因而只需要提供一种类型的MemorySegment实现即可;而在引入对堆外内存的支持 后,按一般的思路是应该在新增一个基于堆外内存的实现即可。但是,这里涉及到一个JIT优化的性能问题。在只有一种类型的MemorySegment的情况下,通过ClassHierarchyAnalysis(CHA),JIT编译器能够确定方法调用的具体实现,因而方法调用可以通过去虚化(de-virtualized)和内联(inlined)来提升性能。而一旦有了两种类型的实现,在同时使用两种类型的MemorySegment的情况下,JIT编译器就无法进行优化,这大概会导致2.7倍的性能差异。因而Flink做了这两种优化:1)确保只有一种MemorySegment的实现被加载;2)提供一种能同时处理管理堆内存和堆外内存的MemorySegment实现,从而保证频繁调用的MemorySegment能够被JIT优化。其MemorySegment的源码实现如下:
- public abstract class MemorySegment {
- protected final byte[] heapMemory; // 堆内存引用
- protected long address; // 堆外内存地址
- // 基于堆内存创建MemorySegment
- MemorySegment(byte[] buffer, Object owner) {
- if (buffer == null) {
- throw new NullPointerException("buffer");
- }
- this.heapMemory = buffer;
- this.address = BYTE_ARRAY_BASE_OFFSET;
- this.size = buffer.length;
- this.addressLimit = this.address + this.size;
- this.owner = owner;
- }
- // 基于堆外内存创建MemorySegment
- MemorySegment(long offHeapAddress, int size, Object owner) {
- if (offHeapAddress <= 0) {
- throw new IllegalArgumentException("negative pointer or size");
- }
- if (offHeapAddress >= Long.MAX_VALUE - Integer.MAX_VALUE) {
- // this is necessary to make sure the collapsed checks are safe against numeric overflows
- throw new IllegalArgumentException("Segment initialized with too large address: " + offHeapAddress
- + " ; Max allowed address is " + (Long.MAX_VALUE - Integer.MAX_VALUE - 1));
- }
- this.heapMemory = null;
- this.address = offHeapAddress;
- this.addressLimit = this.address + size;
- this.size = size;
- this.owner = owner;
- }
- public boolean isOffHeap() {
- return heapMemory == null;
- }
- public final long getLong(int index) {
- final long pos = address + index;
- if (index >= 0 && pos <= addressLimit - 8) {
- return UNSAFE.getLong(heapMemory, pos); // 这是能够在一个实现中同时操作堆内存和堆外内存的关键
- }
- else if (address > addressLimit) {
- throw new IllegalStateException("segment has been freed");
- }
- else {
- // index is in fact invalid
- throw new IndexOutOfBoundsException();
- }
- }
- //.........
- }
- public final class HybridMemorySegment extends MemorySegment {
- private final ByteBuffer offHeapBuffer; // 堆外内存
- private final Runnable cleaner; // The cleaner is called to free the underlying native memory.
- //堆外内存初始化
- HybridMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner, @Nullable Runnable cleaner) {
- super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
- this.offHeapBuffer = buffer;
- this.cleaner = cleaner;
- }
- //堆内内存初始化
- HybridMemorySegment(byte[] buffer, Object owner) {
- super(buffer, owner);
- this.offHeapBuffer = null;
- this.cleaner = null;
- }
- //.........
- }
- public final class HeapMemorySegment extends MemorySegment {
- private byte[] memory;
- HeapMemorySegment(byte[] memory, Object owner) {
- super(Objects.requireNonNull(memory), owner);
- this.memory = memory;
- }
- //......
- }

之所以能够使用同一份代码实现既能够处理堆内存又能够处理堆外内存的效果,其关键点在于sun.misc.Unsafe的一些方法会根据对象引用表现出不同的行为,列如sun.misc.Unsafe.getLong(Object reference, long offset);在reference不为null的情况下,则会取该对象的地址,加上后面的offset,从相对地址处取出8字节;而在reference为null的情况下,则offset就是要操作的绝对地址。所以,通过控制对象引用的值,就可以灵活地管理堆外内存和堆内存。
既然HybridMemorySegment可以同时管理堆内存和堆外内存,为什么还需要HeapMemorySegment呢?这是因为假如所有的MemorySegment都是在堆上分配的,使用HeapMemorySegment相比于HybridMemorySegment会有更好的性能。但实际上,由于Flink中Network buffer使用的MemorySegment一定是在堆外分配的,HeapMemorySegment在Flink中已经不会再使用了,具体可以参考FLINK-7310 always use the HybridMemorySegment。MemorySegment通常不直接构造,而是通过MemorySegmentFactory来创建如下:
- public final class MemorySegmentFactory { // A factory for (hybrid) memory segments ({@link HybridMemorySegment}).
- // 申请分配堆内存
- public static MemorySegment wrap(byte[] buffer) { // 创建堆内存 heap memory region
- return new HybridMemorySegment(buffer, null);
- }
- public static MemorySegment allocateUnpooledSegment(int size) { // 按照指定大小 分配堆内存 new byte[size] 实现如下:
- return allocateUnpooledSegment(size, null);
- }
- public static MemorySegment allocateUnpooledSegment(int size, Object owner) {
- return new HybridMemorySegment(new byte[size], owner);
- }
- // 申请分配堆外内存
- public static MemorySegment allocateUnpooledOffHeapMemory(int size) { // 按照指定大小 分配堆外内存ByteBuffer.allocateDirect(size) 实现如下:
- return allocateUnpooledOffHeapMemory(size, null);
- }
- public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) {
- ByteBuffer memory = ByteBuffer.allocateDirect(size);
- return new HybridMemorySegment(memory, owner, null);
- }
- public static MemorySegment allocateOffHeapUnsafeMemory(int size, Object owner) {
- long address = MemoryUtils.allocateUnsafe(size);
- ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
- return new HybridMemorySegment(offHeapBuffer, owner, MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address));
- }
- public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
- return new HybridMemorySegment(memory, null, null);
- }
- }

在TaskManager的内存布局中我们说过,TaskManager的内存主要分为三个部分:其中Network Buffers和Managed Memory都是一组MemorySegment的集合;下面就分别介绍下这两块内存是如何管理的。
- public class NetworkBufferPool implements BufferPoolFactory, MemorySegmentProvider, AvailabilityProvider {
- private final int totalNumberOfMemorySegments;
- private final int memorySegmentSize;
- // 所有可用的MemorySegment,阻塞队列
- private final ArrayBlockingQueue<MemorySegment> availableMemorySegments;
- // ---- Managed buffer pools ----------------------------------------------
- private final Object factoryLock = new Object();
- private final Set<LocalBufferPool> allBufferPools = new HashSet<>();
- private int numTotalRequiredBuffers;
- private final int numberOfSegmentsToRequest;
- private final Duration requestSegmentsTimeout;
- /**
- * Allocates all {@link MemorySegment} instances managed by this pool.
- */
- public NetworkBufferPool(
- int numberOfSegmentsToAllocate,
- int segmentSize,
- int numberOfSegmentsToRequest,
- Duration requestSegmentsTimeout) {
- this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
- this.memorySegmentSize = segmentSize;
- checkArgument(numberOfSegmentsToRequest > 0, "The number of required buffers should be larger than 0.");
- this.numberOfSegmentsToRequest = numberOfSegmentsToRequest;
- Preconditions.checkNotNull(requestSegmentsTimeout);
- checkArgument(requestSegmentsTimeout.toMillis() > 0, "The timeout for requesting exclusive buffers should be positive.");
- this.requestSegmentsTimeout = requestSegmentsTimeout;
- final long sizeInLong = (long) segmentSize;
- try {
- this.availableMemorySegments = new ArrayDeque<>(numberOfSegmentsToAllocate); // 所有可用的MemorySegment,阻塞队列
- } catch (OutOfMemoryError err) {
- throw new OutOfMemoryError("Could not allocate buffer queue of length " + numberOfSegmentsToAllocate + " - " + err.getMessage());
- }
- try {
- for (int i = 0; i < numberOfSegmentsToAllocate; i++) { // NetworkBufferPool使用的MemorySegment全是堆外内存
- availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));
- }
- }
- catch (OutOfMemoryError err) {
- // ............
- }
- }

MemoryManager是管理Managed Memory的类,这部分主要是在Batch模式下使用,在Streaming模式下这一块内存不会分配。MemoryManager主要通过内部接口MemoryPool来管理所有的MemorySegment。Managed Memory和管理相比于Network Buffers的管理更为简单,因为不需要Buffer的那一层封装。其源代码如下:
- public class MemoryManager {
- // 管理所有的MemorySegment
- private final MemoryPool memoryPool; // The memory pool from which we draw memory segments. Specific to on-heap or off-heap memory
- private final HashMap<Object, Set<MemorySegment>> allocatedSegments; // Memory segments allocated per memory owner.
- public MemoryManager(long memorySize, int numberOfSlots, int pageSize,
- MemoryType memoryType, boolean preAllocateMemory) {
- // sanity checks
- ......
- this.memoryType = memoryType;
- this.memorySize = memorySize;
- this.numberOfSlots = numberOfSlots;
- // assign page size and bit utilities
- this.pageSize = pageSize;
- this.roundingMask = ~((long) (pageSize - 1));
- final long numPagesLong = memorySize / pageSize;
- if (numPagesLong > Integer.MAX_VALUE) {
- throw new IllegalArgumentException("The given number of memory bytes (" + memorySize
- + ") corresponds to more than MAX_INT pages.");
- }
- this.totalNumPages = (int) numPagesLong; // 所有可用的MemorySegment数量
- if (this.totalNumPages < 1) {
- throw new IllegalArgumentException("The given amount of memory amounted to less than one page.");
- }
- this.allocatedSegments = new HashMap<Object, Set<MemorySegment>>();
- this.isPreAllocated = preAllocateMemory;
- this.numNonAllocatedPages = preAllocateMemory ? 0 : this.totalNumPages;
- final int memToAllocate = preAllocateMemory ? this.totalNumPages : 0; // 是否需要预分配内存,Streaming不会预分配
- switch (memoryType) {
- case HEAP: // 堆上内存
- this.memoryPool = new HybridHeapMemoryPool(memToAllocate, pageSize);
- break;
- case OFF_HEAP: // 堆外内存
- if (!preAllocateMemory) {
- LOG.warn("It is advisable to set 'taskmanager.memory.preallocate' to true when" +
- " the memory type 'taskmanager.memory.off-heap' is set to true.");
- }
- this.memoryPool = new HybridOffHeapMemoryPool(memToAllocate, pageSize);
- break;
- default:
- throw new IllegalArgumentException("unrecognized memory type: " + memoryType);
- }
- // ......
- }
- abstract static class MemoryPool {
- abstract int getNumberOfAvailableMemorySegments();
- abstract MemorySegment allocateNewSegment(Object owner);
- abstract MemorySegment requestSegmentFromPool(Object owner);
- abstract void returnSegmentToPool(MemorySegment segment);
- abstract void clear();
- }
- static final class HybridHeapMemoryPool extends MemoryPool {
- private final ArrayDeque<byte[]> availableMemory; // The collection of available memory segments.
- private final int segmentSize;
- HybridHeapMemoryPool(int numInitialSegments, int segmentSize) {
- this.availableMemory = new ArrayDeque<>(numInitialSegments);
- this.segmentSize = segmentSize;
- for (int i = 0; i < numInitialSegments; i++) {
- this.availableMemory.add(new byte[segmentSize]); // 堆上直接使用byte数组
- }
- }
- }
- static final class HybridOffHeapMemoryPool extends MemoryPool {
- private final ArrayDeque<ByteBuffer> availableMemory; // The collection of available memory segments.
- private final int segmentSize;
- HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) {
- this.availableMemory = new ArrayDeque<>(numInitialSegments);
- this.segmentSize = segmentSize;
- for (int i = 0; i < numInitialSegments; i++) {
- this.availableMemory.add(ByteBuffer.allocateDirect(segmentSize)); // 堆外使用DirectByteBuffer
- }
- }
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。