赞
踩
众所周知,jvm的GC会有不小的时间损耗,stop the world会严重影响kafka 生产者的消息发送。如果我们每次消息发送后,对中间生成的实例不做任何处理留给JVM,可能会造成严重的后果。因此Kafka生产者用内存池这种东西来循环利用中间生成的消息缓存bytebuffer。
依据我的理解,就是:标准大小消息的bytebuffer,用过一次后接着用装下一条消息。对于非标准大小的消息,用另外手段分配。
对于大消息手动使用赋值null对于GC有无帮助?看到下面的说明
java对象不再使用时,为什么要赋值为null
直接覆写bytebuffer确实快。但是看完内存池的代码,在这里BB两句:对于大消息,kafka还是要依靠JVM GC。那为什么不在缓存池外直接new bytebuffer,使用完后自己赋值null释放呢,非要来内存池抢东西。
接着看kafka的内存池吧,RecordAccumulator中的append方法是把消息添加进batches里面的相应dq,在添加前需要把消息放进bytebuffer里面,bytebuffer从free里面allocate方法要来的。多个生产者会有线程安全问题,比如这边计算可用内存后别人给用掉了,所以allocate时候要锁住Bufferpool
buffer = free.allocate(size, maxTimeToBlock);
free就是内存池,几个比较重要的属性和解释如下
//总大小 默认32M
private final long totalMemory;
//标准批次大小
private final int poolableSize;
//可用bytebuffer(回收的,一开始没有)
private final Deque<ByteBuffer> free;
//因为内存不足等待的线程队列
private final Deque<Condition> waiters;
//totalMemory - poolableSize*free.size() - 使用中的butebuffer总大小
private long nonPooledAvailableMemory;
allocate方法中,首先会对批次的大小做一个判断,如果大于totalMemory直接报错。
free队列有bytebuffer且size是标准大小,直接安排覆写bytebuffer
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
然后 this.nonPooledAvailableMemory + freeListSize 得出可用内存,在能满足size的条件下,freeUp(size)后,跳出try,走 safeAllocateByteBuffer(size),随便找一块内存安排上(这块内存就无法回收到free队列了,只能靠JVM回收),同时nonPooledAvailableMemory扣减size大小
if (this.nonPooledAvailableMemory + freeListSize >= size) {
freeUp(size);
this.nonPooledAvailableMemory -= size;
}
freeUp看下,nonPooledAvailableMemory 顶不住的话,不断释放free来补nonPooledAvailableMemory
private void freeUp(int size) {
while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}
这就麻烦了,得在线等在使用的bytebuffer释放
用accumulated记录搞来的内存
先把当前线程制作成Condition放入wait队列
关于Condition
// we are out of memory and will have to block int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); recordWaitTime(timeNs); } if (waitingTimeElapsed) { throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // just grab a buffer from the free list buffer = this.free.pollFirst(); accumulated = size; } else { // we'll need to allocate memory, but we may only get // part of what we need on this iteration freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory); this.nonPooledAvailableMemory -= got; accumulated += got; } } // Don't reclaim memory on throwable since nothing was thrown accumulated = 0;
看上面的代码,发现free不空直接安排了
看上面的代码,freeUp(size - accumulated),
有可能这还不行,那就算下 Math.min(size - accumulated, this.nonPooledAvailableMemory),看看又榨取了多少内存
accumulated += got, 把榨取来的加到accumulated
跳出后waiters.remove(moreMemory),把刚满足的这个Condition去掉
每次allocate成功后,顺手叫醒等待在waiter中的最早的线程,并且解锁内存池
!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty())判断还有能否榨取的内存,没有就没必要唤醒等待者了。
try {
if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
// Another finally... otherwise find bugs complains
lock.unlock();
}
如果buffer使用过了,要从内存池里面deallocate这个buffer,注意这个append的finally的dellocate针对的不是放入dq这个情况哦,成功放进dq后,buffer指向null
finally里面的deallocate针对是在自建batch前成功塞入dq的情况
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
deallocate 方法也看下,size = 标准就安排进内存池
public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { this.nonPooledAvailableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } }
看完有几点需要记录下:
nonPooledAvailableMemory 是个long,代表还能从堆空间拿多少内存,其他文章总说未申请空间感觉怪怪的,bufferpool实例化时并不是占用上32MB空间,上限是侵占堆空间32MB
ReentrantLock锁后面需要了解下
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。