赞
踩
大家一定都了解Java的线程池,线程池有什么好处呢?如果没有线程池,我们每次创建线程都要新建一个线程,这样对CPU的消耗比较大。那么利用线程池我们可以对已经创建好的线程复用,线程就不用频繁创建和销毁了。
同样,我们的内存池也是这个原理,producerBatch需要空间存储消息的时候,就去缓存池申请一块内存,而不用频繁地创建和销毁内存,也就避免了频繁地GC。
下面的结构图简单说明了BufferPool的组成结构和处理缓存的流程:
整个BufferPool的大小默认为32M,内部内存区域分为两块:固定大小内存块集合free、非池化缓存nonPooledAvailableMemory。固定大小内存块默认大小为16k。当ProducerBatch向BufferPool申请一个大小为size的内存块时,BufferPool会根据size的大小判断由哪个内存区域分配内存块。同时,free和nonPooledAvailableMemory这两块区域的内存可以交换。
接下来,我们通过代码来学习Kafka底层提供的高效的内存池设计。
重要字段如下:
public class BufferPool {
static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";
private final long totalMemory;//默认32M
private final int poolableSize;//池化大小16k
private final ReentrantLock lock;//分配和回收时用的锁。
private final Deque<ByteBuffer> free;//池化的内存
private final Deque<Condition> waiters;//阻塞线程对应的Condition集合
private long nonPooledAvailableMemory;//非池化可使用的内存
}
接下来,我再来介绍下重要的方法。
allocate()方法是向BufferPool申请ByteBuffer。
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
//1.验证申请的内存是否大于总内存
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
//2.加锁,保证线程安全。
this.lock.lock();
if (this.closed)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。