赞
踩
一、内存池原理简介
Kafka发送消息的流程如上所示,待发送的消息封装成一个个的批次对象RecordBatch,然后该批次对象被放到对应的Deque队列中,这里每个topic的每个分区对应一个Deque队列。之后Sender线程会从队列中取出RecordBatch对象,然后封装成ProducerRequest对象发送给服务端。所以RecordBatch是存储数据的对象,那么RecordBatch是怎么分配的呢?如果每次都创建一个新的RecordBatch对象,用完之后通过GC回收,那么在Kafka高吞吐的场景下,势必会创建大量的RecordBatch对象,增加GC的频率,而进行GC时对应的STW时间就会增加,从而使吞吐量降低。
Kafka为了解决这个问题,巧妙地运用了内存池。创建RecordBatch对象时,会从内存池中获取对应的内存,用完之后再将这部分内存归还给内存池,因此这部分内存可以循环利用而不必经过GC。
二、关键的几个类
1.RecordAccumulator:暂存消息的类。生产者主线程生产的消息并不会直接发送给服务端,而是先放到RecordAccumulator对象中进行缓存,当满足一定条件时,由Sender线程从该对象中取出消息进行封装并发送给服务端。该类的属性和构造方法如下:
public final class RecordAccumulator { private volatile boolean closed; private final AtomicInteger flushesInProgress; private final AtomicInteger appendsInProgress; private final int batchSize;//一个批次的大小 private final CompressionType compression;//压缩类型 private final long lingerMs; private final long retryBackoffMs; private final BufferPool free;//缓存对象 private final Time time; private final ConcurrentMap> batches; private final IncompleteRecordBatches incomplete; private final Set muted; private int drainIndex; public RecordAccumulator(int batchSize, //批次大小默认16k long totalSize,//缓存大小默认32M CompressionType compression, long lingerMs, long retryBackoffMs, Metrics metrics, Time time) { this.drainIndex = 0; this.closed = false; this.flushesInProgress = new AtomicInteger(0); this.appendsInProgress = new AtomicInteger(0); this.batchSize = batchSize; this.compression = compression; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap<>(); String metricGrpName = "producer-metrics"; this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName); this.incomplete = new IncompleteRecordBatches(); this.muted = new HashSet<>(); this.time = time; registerMetrics(metrics, metricGrpName); }}
重点看三个属性:
public final class BufferPool { private final long totalMemory;//缓存的总大小 private final int poolableSize;//默认的批次大小 private final ReentrantLock lock; private final Deque free;//内存池 private final Deque waiters;//存放Condition对象的队列 private long availableMemory;//未申请的内存空间 private final Metrics metrics; private final Time time; private final Sensor waitTime;}
重点属性:
向内存池申请内存对应的是BufferPool类的allocate方法,该方法有两个参数:①int类型的size,即申请的内存大小;②long类型的maxTimeToBlockMs,即最长等待的时间,单位毫秒。具体方法如下:
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { //判断消息占用的空间是否大于内存池总容量32M if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); this.lock.lock(); try { // check if we have a free buffer of the right size pooled //判断申请的内存大小是否为默认批次的内存大小(16k),且是否存在已申请但未使用的批次 //如果申请内存大小等于批次默认值,且存在已申请未使用的批次,那么返回已申请未使用的第一个批次,free是一个队列 if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block //poolableSize为默认批次大小,freeListSize为已申请但未使用的内存大小 int freeListSize = this.free.size() * this.poolableSize; //size是本次我们要申请的内存 //this.availableMemory + freeListSize:目前可用的总内存 if (this.availableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request freeUp(size); //进行内存的扣减,此时的可用内存已满足申请内存 this.availableMemory -= size; lock.unlock(); //分配内存 return ByteBuffer.allocate(size); } else { //如果可用内存无法满足申请的内存值,只能等待释放内存 //定义一个变量来记录释放的内存 int accumulated = 0; ByteBuffer buffer = null; //创建一个Condition 对象 Condition moreMemory = this.lock.newCondition(); long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); //waiters是一个队列,里面存放来Condition对象,只要队列中有对象,说明此时内存是不够分配的, this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one /** * 如果内存不够,总的分配思路就是一点点地分配,有一点分配一点 */ //只要累计释放的内存小于申请的内存,就继续循环 while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { //condition对象调用await方法进行等待,结束等待有两种情况:1。时间到了;2。被人唤醒了(内存池回收内存时,会唤醒) waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { this.waiters.remove(moreMemory); throw e; } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); this.waitTime.record(timeNs, time.milliseconds()); } //如果是等待时间到了,那么就把这个condition对象从队列中移除,并抛出异常 if (waitingTimeElapsed) { this.waiters.remove(moreMemory); throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory //检查是否可以满足申请的内存;如果还没有释放内存,且申请内存大小正好等于批次大小,且已申请未使用队列中有批次 if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // just grab a buffer from the free list //那么就直接从已申请未使用队列中拿出一个批次的内存 buffer = this.free.pollFirst(); //释放内存就等于批次大小 accumulated = size; } else {//如果可用内存小于申请内存 // we'll need to allocate memory, but we may only get // part of what we need on this iteration //从已申请未使用内存中拿出批次,大小为申请内存-释放内存 freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.availableMemory); //更新可用内存和释放内存 this.availableMemory -= got; accumulated += got; } } // remove the condition for this thread to let the next thread // in line start getting memory //当释放的内存大于等于申请内存,就把之前的condition从队列中移除 Condition removed = this.waiters.removeFirst(); if (removed != moreMemory) throw new IllegalStateException("Wrong condition: this shouldn't happen."); // signal any additional waiters if there is more memory left // over for them if (this.availableMemory > 0 || !this.free.isEmpty()) { if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } // unlock and return the buffer lock.unlock(); if (buffer == null) return ByteBuffer.allocate(size); else return buffer; } } finally { if (lock.isHeldByCurrentThread()) lock.unlock(); } }
1.判断申请的内存是否超过了缓存的总大小,如果超出,则抛异常。
if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations.");
2.加锁
this.lock.lock();
3.判断申请内存大小是否为默认批次大小,且内存池中还有内存对象。如果均满足,直接返回内存池队列中的第一个ByteBuffer对象。
if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst();
4.如果不满足,则计算内存池占用的内存大小。
//poolableSize为默认批次大小,freeListSize为内存池占用内存大小int freeListSize = this.free.size() * this.poolableSize
5.判断可用内存是否大于申请内存,如果大于,释放内存池中的内存,直到availableMemory大于申请内存。然后availableMemory进行内存扣减,解锁,最后返回一个容量为申请内存大小的ByteBuffer对象。
if (this.availableMemory + freeListSize >= size) { freeUp(size); //进行内存的扣减,此时的可用内存已满足申请内存 this.availableMemory -= size; lock.unlock(); //分配内存 return ByteBuffer.allocate(size);}
freeUp
方法:
//如果未申请使用内存不满足申请内存,那么就从内存池末尾,//不断取出ByteBuffer对象释放内存,直到未申请内存满足申请内存private void freeUp(int size) { while (!this.free.isEmpty() && this.availableMemory < size) this.availableMemory += this.free.pollLast().capacity();}
6.如果可用内存小于申请内存,只能等待释放内存。这里定义一个变量accumulated 用来累计释放的内存,且创建一个Condition对象,然后放到等待队列末尾。
int accumulated = 0;Condition moreMemory = this.lock.newCondition();this.waiters.addLast(moreMemory);
7.如果累计释放的内存小于申请内存,就一直循环等待内存释放。这里调用Condition对象moreMemory 的await方法进行等待,结束等待有两种情况:①等待时间到了;②被唤醒。如果是时间到了,就将moreMemory从等待队列移除,然后抛出异常。
while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { //condition对象调用await方法进行等待,结束等待有两种情况:1。时间到了;2。被人唤醒了(内存池回收内存时,会唤醒) waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { this.waiters.remove(moreMemory); throw e; } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); this.waitTime.record(timeNs, time.milliseconds()); } //如果是等待时间到了,那么就把这个condition对象从队列中移除,并抛出异常 if (waitingTimeElapsed) { this.waiters.remove(moreMemory); throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); }
8.如果是被唤醒的,说明有内存被归还了。判断申请的内存是否正好等于内存池中一个批次的大小,且内存池中有ByteBuffer对象。如果是,则直接从内存池头部取出一个ByteBuffer对象,更新累计释放内存变量accumulated 。
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { //那么就直接从内存池头部拿出一个ByteBuffer对象 buffer = this.free.pollFirst(); //累计释放内存就等于申请内存 accumulated = size;}
9.如果可用内存还不够申请内存,则一点点地分配。即释放一点,就分配一点,直到累计释放的内存达到申请内存。
else { //只有内存池有ByteBuffer对象,就释放其内存 freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.availableMemory); //更新未申请内存,即只要有未申请的内存,就把它分配出去 this.availableMemory -= got; //更新累计释放内存,只要有未申请的内存,就拿过来进行累计 accumulated += got;}
10.当累计释放内存大于申请内存时,将之前放到等待队列中的Condition对象移除。
Condition removed = this.waiters.removeFirst();
11.如果分配了申请内存之后,还有未分配内存,或者内存池不为空;同时等待队列不为空,则从等待队列中取出一个Condition对象,并唤醒。
if (this.availableMemory > 0 || !this.free.isEmpty()) { if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal();}
12.解锁,并返回申请的内存。如果buffer为null,说明申请内存大于默认批次16K,返回实际申请的内存;buffer不为null,说明申请内存为默认的批次大小,直接返回。
lock.unlock();if (buffer == null) return ByteBuffer.allocate(size);else return buffer;
四、向内存池归还内存步骤详解
向内存池归还内存对应的是BufferPool类的deallocate方法,该方法有两个参数:①ByteBuffer类型的buffer;②int类型的size,即归还的内存大小。具体方法如下:
public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { //如果申请的批次大小和默认大小一致,且大小正好等于buffer的容量,则直接清空buffer,然后把这个buffer放回内存池 //TODO 内存池只能回收和默认批次大小一致的内存,不一致的只能等待GC进行回收 if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { //否则,可用内存增加归还内存大小 this.availableMemory += size; } //归还内存后,取出等待队列中的第一个Condition对象,然后唤醒等待内存释放的线程 Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); }}
1.加锁
lock.lock();
2.如果归还的内存大小正好等于默认的批次大小16k,且buffer的容量正好也为16K,则直接将buffer清空,然后将其放回内存池队列中。
if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer);}
3.否则,直接在未申请内存上增加size大小的内存,此时不会将内存归还到内存池中。
else { //否则,可用内存增加size大小 this.availableMemory += size;}
注意:从上面的代码可以看出,只有释放的内存大小正好等于批次大小时,才会归还给内存池。否则,是直接增加了availableMemory未申请内存值,之前申请的内存无法复用,只能等待GC进行回收。
4.如果等待队列waiters不为空,则取出头部的Condition对象,然后唤醒。
Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal();
总结:
内存池的设计是为了复用内存,减少GC,从而提高吞吐量。
批次的默认大小为16K,缓存的默认大小为32M。
缓存包含已用内存、内存池内存和未申请使用内存三部分。
只有归还的内存大小等于批次大小时,内存池才可以进行回收并复用;否则要等待GC进行回收。所以生产中要根据单条消息的大小调整批次大小(由参数batch.size设置)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。