赞
踩
kafka之所以能够做到高吞吐,究其原因是因为生产者发送消息使用的是攒批发送的方式。这样做的好处还有可以减少网络上的资源消耗。在新版的Kafka Producer中,设计了一个内存缓冲池,在创建Producer时会默认创建一个大小为32M的缓冲池,也可以通过buffer.memory参数指定缓冲池的大小,同时缓冲池被切分成多个内存块,内存块的大小就是我们创建Producer时传的batch.size大小,默认大小16384,而每个Batch都会包含一个batch.size大小的内存块,消息就是存放在内存块当中。
kafka设计这样一个内存缓冲池的好处是什么呢?新版本的kafka是由Java语言实现的,Java语言中有GC机制,在高并发的场景下,如果频繁的创建ByteBuffer对象,然后再销毁ByteBuffer对象,势必会影响性能,甚至还会引发Java GC中的Stop the world,这一问题,让应用停止响应。kafka Producer是通过在RecordAccumulator中调用append方法添加一条消息到RecordAccumulator中,这个方法会调用BufferPool中的allocate方法来申请一个ByteBuffer。BufferPool是kafka设计的一个内存缓冲池,它大体上分为两部分:Deque<ByteBuffer> free和availableMemory。free中存储的一个ByteBuffer内存块是16KB。当RecordAccumulator去调用BufferPool的allocate时根据传入的size,判断由哪个内存区域分配内存块。在构建RecordAccumulator对象的时候,传入的batchSize参数是由batch.size提供的,默认是16384byte,也就是16kb。也就是kafka默认一个批次是16kb大小。如果是16kb,直接从Deque<ByteBuffer> free中拿一个返回,不过不是则从availableMemory重新分配。下图是BufferPool的组成结构和流程:
类BufferPool
重要字段如下:
- // BufferPool总内存,默认32M
- private final long totalMemory;
- // 可以池化的内存大小,在RecordAccumulator构造函数中调用BufferPool构造函数,
- // 传入的是batchSize来给这个变量赋值,也就是定义了一个批次的大小。
- private final int poolableSize;
- // 因为会有多线程并发创建和回收ByteBuffer,所以使用锁控制并发,保证了线程安全。
- private final ReentrantLock lock;
- // 可以用于池化的内存。
- private final Deque<ByteBuffer> free;
- // 因为会有申请不到足够内存的线程,线程为了等待其他线程释放内存而阻塞等待,
- // 对应的Condition对象会进入该队列。
- private final Deque<Condition> waiters;
- // 可以额外在分配的内存。
- private long availableMemory;
接下来我们来看看BufferPool中的allocate方法。该方法的作用根据给定的size,在BufferPool中分配一个buffer。如果没有足够的内存用于分配并且BufferPool配置为阻塞模式,则这个方法会阻塞。
- public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
- if (size > this.totalMemory)
- throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of "
- + this.totalMemory + " on memory allocations.");
- // 加锁同步
- this.lock.lock();
- try {
- // check if we have a free buffer of the right size pooled
- // 请求的是poolableSize指定大小的ByteBuffer,且free Deque中有空闲的ByteBuffer。
- if (size == poolableSize && !this.free.isEmpty())
- return this.free.pollFirst();
- // 当申请的空间大小不是poolableSize,则执行下面的处理。
- // free队列中都是poolableSize大小的ByteBuffer,可以直接计算整个free队列的空间。
- // now check if the request is immediately satisfiable with the
- // memory on hand or if we need to block
- int freeListSize = this.free.size() * this.poolableSize;
- // 可用内存加上内存队列上的内存数,即总内存。
- if (this.availableMemory + freeListSize >= size) {
- // 为了让availableMemory > size,freeUp()方法会从free队列中不断释放ByteBuffer,直到availableMemory满足这次申请。
- // we have enough unallocated or pooled memory to immediately
- // satisfy the request
- freeUp(size);
- // 减少availableMemory
- this.availableMemory -= size;
- lock.unlock();
- return ByteBuffer.allocate(size);
- } else {
- // we are out of memory and will have to block
- int accumulated = 0;
- ByteBuffer buffer = null;
- Condition moreMemory = this.lock.newCondition();
- long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
- // 将Condition添加到waiters中。
- this.waiters.addLast(moreMemory);
- // loop over and over until we have a buffer or have reserved
- // enough memory to allocate one
- while (accumulated < size) {
- long startWaitNs = time.nanoseconds();
- long timeNs;
- boolean waitingTimeElapsed;
- try {
- waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
- } catch (InterruptedException e) {
- this.waiters.remove(moreMemory);
- throw e;
- } finally {
- long endWaitNs = time.nanoseconds();
- timeNs = Math.max(0L, endWaitNs - startWaitNs);
- this.waitTime.record(timeNs, time.milliseconds());
- }
- if (waitingTimeElapsed) {
- this.waiters.remove(moreMemory);
- throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
- }
-
- remainingTimeToBlockNs -= timeNs;
- // check if we can satisfy this request from the free list,
- // otherwise allocate memory
- if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
- // just grab a buffer from the free list
- buffer = this.free.pollFirst();
- accumulated = size;
- } else {
- // we'll need to allocate memory, but we may only get
- // part of what we need on this iteration
- freeUp(size - accumulated);
- int got = (int) Math.min(size - accumulated, this.availableMemory);
- this.availableMemory -= got;
- accumulated += got;
- }
- }
- // remove the condition for this thread to let the next thread
- // in line start getting memory
- Condition removed = this.waiters.removeFirst();
- if (removed != moreMemory)
- throw new IllegalStateException("Wrong condition: this shouldn't happen.");
- // signal any additional waiters if there is more memory left
- // over for them
- if (this.availableMemory > 0 || !this.free.isEmpty()) {
- if (!this.waiters.isEmpty())
- this.waiters.peekFirst().signal();
- }
- // unlock and return the buffer
- lock.unlock();
- if (buffer == null)
- return ByteBuffer.allocate(size);
- else
- return buffer;
- }
- } finally {
- if (lock.isHeldByCurrentThread())
- lock.unlock();
- }
- }
下面我们来总结一下allocate方法的逻辑。
我把allocate方法用流程图再来画一下。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。