当前位置:   article > 正文

syslog 向内存中缓存_Kafka源码解析之内存池原理

syslog 缓存大小

一、内存池原理简介

e33499d6f9ad3f275ac269b656efba66.png

    Kafka发送消息的流程如上所示,待发送的消息封装成一个个的批次对象RecordBatch,然后该批次对象被放到对应的Deque队列中,这里每个topic的每个分区对应一个Deque队列。之后Sender线程会从队列中取出RecordBatch对象,然后封装成ProducerRequest对象发送给服务端。所以RecordBatch是存储数据的对象,那么RecordBatch是怎么分配的呢?如果每次都创建一个新的RecordBatch对象,用完之后通过GC回收,那么在Kafka高吞吐的场景下,势必会创建大量的RecordBatch对象,增加GC的频率,而进行GC时对应的STW时间就会增加,从而使吞吐量降低。

    Kafka为了解决这个问题,巧妙地运用了内存池。创建RecordBatch对象时,会从内存池中获取对应的内存,用完之后再将这部分内存归还给内存池,因此这部分内存可以循环利用而不必经过GC。

二、关键的几个类

    1.RecordAccumulator:暂存消息的类。生产者主线程生产的消息并不会直接发送给服务端,而是先放到RecordAccumulator对象中进行缓存,当满足一定条件时,由Sender线程从该对象中取出消息进行封装并发送给服务端。该类的属性和构造方法如下:

public final class RecordAccumulator {    private volatile boolean closed;    private final AtomicInteger flushesInProgress;    private final AtomicInteger appendsInProgress;    private final int batchSize;//一个批次的大小    private final CompressionType compression;//压缩类型    private final long lingerMs;    private final long retryBackoffMs;    private final BufferPool free;//缓存对象    private final Time time;    private final ConcurrentMap> batches;    private final IncompleteRecordBatches incomplete;    private final Set muted;    private int drainIndex;    public RecordAccumulator(int batchSize, //批次大小默认16k                             long totalSize,//缓存大小默认32M                             CompressionType compression,                             long lingerMs,                             long retryBackoffMs,                             Metrics metrics,                             Time time) {        this.drainIndex = 0;        this.closed = false;        this.flushesInProgress = new AtomicInteger(0);        this.appendsInProgress = new AtomicInteger(0);        this.batchSize = batchSize;        this.compression = compression;        this.lingerMs = lingerMs;        this.retryBackoffMs = retryBackoffMs;        this.batches = new CopyOnWriteMap<>();        String metricGrpName = "producer-metrics";        this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);        this.incomplete = new IncompleteRecordBatches();        this.muted = new HashSet<>();        this.time = time;        registerMetrics(metrics, metricGrpName);    }}
    重点看三个属性:
  • batchSize:批次的大小,默认为16K(由参数batch.size设置)
  • free:缓存对象。
  • batches:一个map,key是消息发往的topic和partition,而value是待发送的消息队列
    构造方法中,totalSize为缓存的总大小,默认为32M(由参数buffer.memory设置)。          2.BufferPool:缓存对应的类。该类的属性如下:
public final class BufferPool {    private final long totalMemory;//缓存的总大小    private final int poolableSize;//默认的批次大小    private final ReentrantLock lock;    private final Deque free;//内存池    private final Deque waiters;//存放Condition对象的队列    private long availableMemory;//未申请的内存空间    private final Metrics metrics;    private final Time time;    private final Sensor waitTime;}
    重点属性:
  • totalMemory:缓存的总内存大小
  •  poolableSize:默认的批次大小
  • free:注意区别这个free和RecordAccumulator中的free,此处的free就是内存池,是一个存放ByteBuffer对象的队列。
  • availableMemory:未申请使用的缓存空间。
下面我们看一下缓存中的内存分配:

637b7264857becbfea66409cbcaa1ee8.png

三、向内存池申请内存步骤详解

    向内存池申请内存对应的是BufferPool类的allocate方法,该方法有两个参数:①int类型的size,即申请的内存大小;②long类型的maxTimeToBlockMs,即最长等待的时间,单位毫秒。具体方法如下:

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {        //判断消息占用的空间是否大于内存池总容量32M        if (size > this.totalMemory)            throw new IllegalArgumentException("Attempt to allocate " + size                                               + " bytes, but there is a hard limit of "                                               + this.totalMemory                                               + " on memory allocations.");        this.lock.lock();        try {            // check if we have a free buffer of the right size pooled            //判断申请的内存大小是否为默认批次的内存大小(16k),且是否存在已申请但未使用的批次            //如果申请内存大小等于批次默认值,且存在已申请未使用的批次,那么返回已申请未使用的第一个批次,free是一个队列            if (size == poolableSize && !this.free.isEmpty())                return this.free.pollFirst();            // now check if the request is immediately satisfiable with the            // memory on hand or if we need to block            //poolableSize为默认批次大小,freeListSize为已申请但未使用的内存大小            int freeListSize = this.free.size() * this.poolableSize;            //size是本次我们要申请的内存            //this.availableMemory + freeListSize:目前可用的总内存            if (this.availableMemory + freeListSize >= size) {                // we have enough unallocated or pooled memory to immediately                // satisfy the request                freeUp(size);                //进行内存的扣减,此时的可用内存已满足申请内存                this.availableMemory -= size;                lock.unlock();                //分配内存                return ByteBuffer.allocate(size);            } else {                //如果可用内存无法满足申请的内存值,只能等待释放内存                //定义一个变量来记录释放的内存                int accumulated = 0;                ByteBuffer buffer = null;                //创建一个Condition 对象                Condition moreMemory = this.lock.newCondition();                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);                //waiters是一个队列,里面存放来Condition对象,只要队列中有对象,说明此时内存是不够分配的,                this.waiters.addLast(moreMemory);                // loop over and over until we have a buffer or have reserved                // enough memory to allocate one                /**                 * 如果内存不够,总的分配思路就是一点点地分配,有一点分配一点                 */                //只要累计释放的内存小于申请的内存,就继续循环                while (accumulated < size) {                    long startWaitNs = time.nanoseconds();                    long timeNs;                    boolean waitingTimeElapsed;                    try {                        //condition对象调用await方法进行等待,结束等待有两种情况:1。时间到了;2。被人唤醒了(内存池回收内存时,会唤醒)                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);                    } catch (InterruptedException e) {                        this.waiters.remove(moreMemory);                        throw e;                    } finally {                        long endWaitNs = time.nanoseconds();                        timeNs = Math.max(0L, endWaitNs - startWaitNs);                        this.waitTime.record(timeNs, time.milliseconds());                    }                    //如果是等待时间到了,那么就把这个condition对象从队列中移除,并抛出异常                    if (waitingTimeElapsed) {                        this.waiters.remove(moreMemory);                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");                    }                    remainingTimeToBlockNs -= timeNs;                    // check if we can satisfy this request from the free list,                    // otherwise allocate memory                    //检查是否可以满足申请的内存;如果还没有释放内存,且申请内存大小正好等于批次大小,且已申请未使用队列中有批次                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {                        // just grab a buffer from the free list                        //那么就直接从已申请未使用队列中拿出一个批次的内存                        buffer = this.free.pollFirst();                        //释放内存就等于批次大小                        accumulated = size;                    } else {//如果可用内存小于申请内存                        // we'll need to allocate memory, but we may only get                        // part of what we need on this iteration                        //从已申请未使用内存中拿出批次,大小为申请内存-释放内存                        freeUp(size - accumulated);                        int got = (int) Math.min(size - accumulated, this.availableMemory);                        //更新可用内存和释放内存                        this.availableMemory -= got;                        accumulated += got;                    }                }                // remove the condition for this thread to let the next thread                // in line start getting memory                //当释放的内存大于等于申请内存,就把之前的condition从队列中移除                Condition removed = this.waiters.removeFirst();                if (removed != moreMemory)                    throw new IllegalStateException("Wrong condition: this shouldn't happen.");                // signal any additional waiters if there is more memory left                // over for them                if (this.availableMemory > 0 || !this.free.isEmpty()) {                    if (!this.waiters.isEmpty())                        this.waiters.peekFirst().signal();                }                // unlock and return the buffer                lock.unlock();                if (buffer == null)                    return ByteBuffer.allocate(size);                else                    return buffer;            }        } finally {            if (lock.isHeldByCurrentThread())                lock.unlock();        }    }

1.判断申请的内存是否超过了缓存的总大小,如果超出,则抛异常。

if (size > this.totalMemory)            throw new IllegalArgumentException("Attempt to allocate " + size                                               + " bytes, but there is a hard limit of "                                               + this.totalMemory                                               + " on memory allocations.");

2.加锁

this.lock.lock();

3.判断申请内存大小是否为默认批次大小,且内存池中还有内存对象。如果均满足,直接返回内存池队列中的第一个ByteBuffer对象。

if (size == poolableSize && !this.free.isEmpty())    return this.free.pollFirst();

4.如果不满足,则计算内存池占用的内存大小。

//poolableSize为默认批次大小,freeListSize为内存池占用内存大小int freeListSize = this.free.size() * this.poolableSize
5.判断可用内存是否大于申请内存,如果大于,释放内存池中的内存,直到availableMemory大于申请内存。然后availableMemory进行内存扣减,解锁,最后返回一个容量为申请内存大小的ByteBuffer对象。
if (this.availableMemory + freeListSize >= size) {    freeUp(size);    //进行内存的扣减,此时的可用内存已满足申请内存    this.availableMemory -= size;    lock.unlock();    //分配内存    return ByteBuffer.allocate(size);}
freeUp 方法:
//如果未申请使用内存不满足申请内存,那么就从内存池末尾,//不断取出ByteBuffer对象释放内存,直到未申请内存满足申请内存private void freeUp(int size) {    while (!this.free.isEmpty() && this.availableMemory < size)         this.availableMemory += this.free.pollLast().capacity();}

6.如果可用内存小于申请内存,只能等待释放内存。这里定义一个变量accumulated 用来累计释放的内存,且创建一个Condition对象,然后放到等待队列末尾。

int accumulated = 0;Condition moreMemory = this.lock.newCondition();this.waiters.addLast(moreMemory);

7.如果累计释放的内存小于申请内存,就一直循环等待内存释放。这里调用Condition对象moreMemory 的await方法进行等待,结束等待有两种情况:①等待时间到了;②被唤醒。如果是时间到了,就将moreMemory从等待队列移除,然后抛出异常。

while (accumulated < size) {    long startWaitNs = time.nanoseconds();    long timeNs;    boolean waitingTimeElapsed;    try {        //condition对象调用await方法进行等待,结束等待有两种情况:1。时间到了;2。被人唤醒了(内存池回收内存时,会唤醒)        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);    } catch (InterruptedException e) {        this.waiters.remove(moreMemory);        throw e;    } finally {        long endWaitNs = time.nanoseconds();        timeNs = Math.max(0L, endWaitNs - startWaitNs);        this.waitTime.record(timeNs, time.milliseconds());    }    //如果是等待时间到了,那么就把这个condition对象从队列中移除,并抛出异常    if (waitingTimeElapsed) {         this.waiters.remove(moreMemory);         throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");    }

8.如果是被唤醒的,说明有内存被归还了。判断申请的内存是否正好等于内存池中一个批次的大小,且内存池中有ByteBuffer对象。如果是,则直接从内存池头部取出一个ByteBuffer对象,更新累计释放内存变量accumulated 。

if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {        //那么就直接从内存池头部拿出一个ByteBuffer对象        buffer = this.free.pollFirst();        //累计释放内存就等于申请内存        accumulated = size;}

9.如果可用内存还不够申请内存,则一点点地分配。即释放一点,就分配一点,直到累计释放的内存达到申请内存。

else {    //只有内存池有ByteBuffer对象,就释放其内存    freeUp(size - accumulated);    int got = (int) Math.min(size - accumulated, this.availableMemory);    //更新未申请内存,即只要有未申请的内存,就把它分配出去    this.availableMemory -= got;    //更新累计释放内存,只要有未申请的内存,就拿过来进行累计    accumulated += got;}

10.当累计释放内存大于申请内存时,将之前放到等待队列中的Condition对象移除。

Condition removed = this.waiters.removeFirst();

11.如果分配了申请内存之后,还有未分配内存,或者内存池不为空;同时等待队列不为空,则从等待队列中取出一个Condition对象,并唤醒。

if (this.availableMemory > 0 || !this.free.isEmpty()) {    if (!this.waiters.isEmpty())        this.waiters.peekFirst().signal();}

12.解锁,并返回申请的内存。如果buffer为null,说明申请内存大于默认批次16K,返回实际申请的内存;buffer不为null,说明申请内存为默认的批次大小,直接返回。

lock.unlock();if (buffer == null)    return ByteBuffer.allocate(size);else    return buffer;
四、向内存池归还内存步骤详解

    向内存池归还内存对应的是BufferPool类的deallocate方法,该方法有两个参数:①ByteBuffer类型的buffer;②int类型的size,即归还的内存大小。具体方法如下:

public void deallocate(ByteBuffer buffer, int size) {    lock.lock();    try {        //如果申请的批次大小和默认大小一致,且大小正好等于buffer的容量,则直接清空buffer,然后把这个buffer放回内存池        //TODO 内存池只能回收和默认批次大小一致的内存,不一致的只能等待GC进行回收        if (size == this.poolableSize && size == buffer.capacity()) {            buffer.clear();            this.free.add(buffer);        } else {            //否则,可用内存增加归还内存大小            this.availableMemory += size;        }        //归还内存后,取出等待队列中的第一个Condition对象,然后唤醒等待内存释放的线程        Condition moreMem = this.waiters.peekFirst();        if (moreMem != null)            moreMem.signal();    } finally {        lock.unlock();    }}

1.加锁

lock.lock();

2.如果归还的内存大小正好等于默认的批次大小16k,且buffer的容量正好也为16K,则直接将buffer清空,然后将其放回内存池队列中。

if (size == this.poolableSize && size == buffer.capacity()) {    buffer.clear();    this.free.add(buffer);}

3.否则,直接在未申请内存上增加size大小的内存,此时不会将内存归还到内存池中。

else {    //否则,可用内存增加size大小    this.availableMemory += size;}

注意:从上面的代码可以看出,只有释放的内存大小正好等于批次大小时,才会归还给内存池。否则,是直接增加了availableMemory未申请内存值,之前申请的内存无法复用,只能等待GC进行回收。

4.如果等待队列waiters不为空,则取出头部的Condition对象,然后唤醒。

Condition moreMem = this.waiters.peekFirst();    if (moreMem != null)        moreMem.signal();

总结:

  • 内存池的设计是为了复用内存,减少GC,从而提高吞吐量。

  • 批次的默认大小为16K,缓存的默认大小为32M。

  • 缓存包含已用内存、内存池内存和未申请使用内存三部分。

  • 只有归还的内存大小等于批次大小时,内存池才可以进行回收并复用;否则要等待GC进行回收。所以生产中要根据单条消息的大小调整批次大小(由参数batch.size设置)。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/626070
推荐阅读
相关标签
  

闽ICP备14008679号