赞
踩
Kafka Producer以ProducerBatch为单位发送数据,而ProducerBatch中的数据以ByteBuffer的形式进行存储。当发送端数据量极大时,ByteBuffer就会无限制地频繁申请,可能会引发OOM;另外,发送完数据后,ByteBuffer就会释放,会频繁的引发FullGC,影响Kafka性能。
为此,设计了Kafka Producer端的内存池,它具有以下功能:
KafkaProducer发送流程:
KafkaProducer相关数据结构:
为什么需要free池?
new一个对象时,需要经历申请对象空间、设置引用(引用常量池中的常量,引用堆中的对象)等过程,代价较大。而如果能够直接从一个容器中取出已经实例化好的对象,则可以省去以上步骤,避免频繁的实例化。而要想将ByteBuffer都实例化好,则必然需要给ByteBuffer设定一个大小即poolableSize。
为什么需要availableMemory池?
由于free池中的ByteBuffer对象都是固定大小的,而KafkaProducer端发送的数据未必都能被ByteBuffer装下,因此遇到size > poolableSize的时候,我们需要通过availableMemory池来申请ByteBuffer对象。
名词解释:
free:该池子中存储大小等于poolableSize的ByteBuffer;
availableMemory:内存池中,除了free池和已申请的ByteBuffer,剩余的字节大小。物理上,其处于JVM堆内存中,只是通过nonPooledAvailableMemory标记来约束其可以从堆内存申请的字节大小;
totalMemory:内存池可以申请的ByteBuffer字节总大小;
poolableSize:free池中ByteBuffer的固定大小;
步骤:
Sender#handleProduceResponse
),会进行ByteBuffer的释放。释放的字节大小会重新回到free池或availableMemory池,释放的字节大小通过accumulated计数器进行技术,当accumulated >= size,再进行申请;流程图:
源码:
public class BufferPool { static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time"; // 内存池最多可以申请的字节大小 private final long totalMemory; // free池中单个ByteBuffer的大小 private final int poolableSize; // free池用一个双端队列来持有,因此其中的ByteBuffer不会被GC private final Deque<ByteBuffer> free; // 用于等待已申请的ByteBuffer释放的条件锁 private final Deque<Condition> waiters; // 用于标记AvailableMemory的大小 private long nonPooledAvailableMemory; public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { // 申请的size大于内存池总大小,抛异常,可以通过KafkaProducer进行重新设置 if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // size == poolableSize,直接从free池申请 if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); int freeListSize = freeSize() * this.poolableSize; // free池和availableMemory池的总量足够 if (this.nonPooledAvailableMemory + freeListSize >= size) { // 如果availableMemory池的容量不够,则释放free池中的ByteBuffer,增加到availableMemory池 freeUp(size); this.nonPooledAvailableMemory -= size; } else { // 计数器,用于累计已经通过ByteBuffer释放得到的字节大小 int accumulated = 0; // 条件锁 Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // loop,直到ProducerRecord释放的ByteBuffer足够大 while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { // await超时则退出,返回false;或者被唤醒退出,返回true waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); this.waitTime.record(timeNs, time.milliseconds()); } // 超时抛异常 if (waitingTimeElapsed) { throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // 如果外部释放空间的ByteBuffer大小为poolableSize,则会被放回free池,此时可以从free池获取需要的ByteBuffer if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { buffer = this.free.pollFirst(); accumulated = size; } else { // freeUp操作会给availableMemory池释放足够多的字节大小,因为存在两条释放链路,如下: // 当释放的ByteBuffer大小 <= poolableSize:ByteBuffer -> free -> availableMemory // 当释放的ByteBuffer大小 > poolableSize:ByteBuffer -> availableMemory // 详情参考deallocate()方法 freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory); this.nonPooledAvailableMemory -= got; accumulated += got; } } accumulated = 0; } finally { this.nonPooledAvailableMemory += accumulated; this.waiters.remove(moreMemory); } } } finally { try { if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { lock.unlock(); } } if (buffer == null) // 通过availableMemory池申请ByteBuffer,即在nonPooledAvailableMemory标记足够大的条件下,通过堆内存申请ByteBuffer return safeAllocateByteBuffer(size); else return buffer; } private ByteBuffer safeAllocateByteBuffer(int size) { boolean error = true; try { // 申请内存 ByteBuffer buffer = allocateByteBuffer(size); error = false; return buffer; } finally { if (error) { this.lock.lock(); try { this.nonPooledAvailableMemory += size; if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { this.lock.unlock(); } } } } // 申请内存 protected ByteBuffer allocateByteBuffer(int size) { return ByteBuffer.allocate(size); } // freeUp操作会给availableMemory池释放足够多的字节大小,因为存在两条释放链路,如下: // 当释放的ByteBuffer大小 <= poolableSize:ByteBuffer -> free -> availableMemory // 当释放的ByteBuffer大小 > poolableSize:ByteBuffer -> availableMemory // 详情参考deallocate()方法 private void freeUp(int size) { while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size) this.nonPooledAvailableMemory += this.free.pollLast().capacity(); } public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { // 当size == poolableSize,ByteBuffer释放后回到free池 if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { // 当size != poolableSize,ByteBuffer释放后回到availableMemory池 this.nonPooledAvailableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } } }
Flink中的内存池技术与Kafka的内存池大同小异,可举一反三。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。