赞
踩
主线程调用KafkaProducer.send()方法发送消息的时候,先将消息放到RecordAccumulator中暂存,然后主线程就可以从send()方法中返回了,此时消息并没有真正发送给Kafka,而是在RecordAccumulator中暂存,当RecordAccumulator中存储量达到一个条件后,便会唤醒Sender线程发送RecordAccumulator中的消息。
RecordAccumulator中用一个ConcurrentHashMap来缓存我们的数据,其中key保存的分区的信息,value保存的是具体的消息记录,value使用ArrayQueue来保存,RecordBatch又使用MemoryRecords来保存。数据结构如上图。
大体了解了RecordAccumulator的数据结构后,我们从底层的MemoryRecords开始分析。
MemoryRecords表示的是对多个消息的集合。底层是使用ByteBuffer来存储数据的。简单介绍一下属性
// 仅用于附加的压缩器,对消息进行压缩,将压缩后的数据输出到buffer
private final Compressor compressor;
// 记录buffer字段最多可以写入多少个字节的数据
private final int writeLimit;
// 初始缓冲区的容量,仅用于可写记录的取消分配
private final int initialCapacity;
// 用于读取的底层缓冲区; 当记录仍然可写时,它为null
private ByteBuffer buffer;
// 此MemoryRecords是可读还是可写模式,在MemoryRecords发送前,是只读模式
private boolean writable;
Compressor的压缩类型是由"compress.type"配置参数指定的,目前版本主要支持GZIP,SNAPPY,LZ4三种压缩方式。
MemoryRecords的构造方法是私有的,只能通过empty()方法得到其对象。主要的方法有append(),close()
public long append(long offset, long timestamp, byte[] key, byte[] value) { if (!writable) throw new IllegalStateException("Memory records is not writable"); int size = Record.recordSize(key, value); //偏移量 compressor.putLong(offset); //消息的size compressor.putInt(size); //写入数据 long crc = compressor.putRecord(timestamp, key, value); //指明此消息体的大小 compressor.recordWritten(size + Records.LOG_OVERHEAD); //返回数据量大小 return crc; }
close()方法中,会将MemoryRecords.buffer字段指向扩容后的ByteBuffer对象,同时将writable设置为false,即只读模式。
我们继续分析上层的结构,,每个RecordBatch对象中封装了一个MemoryRecords对象,除此之外,还封装了很多控制信息和统计信息,下面简单介绍一下。
//记录了保存的Record个数 public int recordCount = 0; //最大的Record的字节数 public int maxRecordSize = 0; //尝试发送当前RecordBatch的次数 public volatile int attempts = 0; public final long createdMs; public long drainedMs; //最后一次尝试发送的时间戳 public long lastAttemptMs; //拥有一个MemoryRecords的引用,这个才是消息真正存放的地方,指向存储数据的MemoryRecords对象 public final MemoryRecords records; //当前RecordBatch中缓存的消息,都会发送给此TopicPartition public final TopicPartition topicPartition; //标识RecordBatch状态的Future对象 public final ProduceRequestResult produceFuture; //最后一次向RecordBatch追加消息的时间戳 public long lastAppendTime; //Thunk对象的集合,可以理解为消息的回调对象对列,Thunk中的callback字段就指向对应消息的Callback对象 private final List<Thunk> thunks; //用来记录某消息在RecordBatch中的偏移量 private long offsetCounter = 0L; //是否正在重试,如果RecordBatch中的数据发送失败,则会重新尝试发送 private boolean retry;
我们来看下接收到响应时的流程图
以RecordBatch中的done()方法画的,ProduceRequestResult这个类实现了类似Future的功能,使用count值为1的CountdownLatch对象,来标记此RecordBatch是否发送完成。当RecordBatch的消息被全部响应,是会调用ProduceRequestResult.done()方法的,用error字段标识是否正常完成,之后调用CountdownLatch.countDown(),唤醒所有等待此消息发送完成的线程。简单看下源码
/**
* 将此请求标记为已完成,并取消阻止所有等待其完成的线程。
* error表示是正常完成还是异常完成
*
*/
public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
this.topicPartition = topicPartition;
this.baseOffset = baseOffset;
this.error = error;
//countDown-1
this.latch.countDown();
}
ProduceRequestResult中有一个属性,baseOffset,这里保存的是服务端为此RecordBatch中第一条消息分配的offset的值,为什么需要这个值?因为我们在发送完消息后,需要知道这个消息的在服务端的offset,这样我们就不需要知道这个RecordBatch中所有消息的offset了,只需要知道第一个消息的offset,再集合自身在RecordBatch中的相对偏移量,就可以计算出来当前消息在服务端分区中的偏移量了。
Thunk指的是消息的回调对象队列,Thunk中的callback字段就指向对应消息的Callback对象
/**
* A callback and the associated FutureRecordMetadata argument to pass to it.
* RecordBatch的内部类
* 回调以及要传递给它的关联的FutureRecordMetadata参数。
*/
final private static class Thunk {
final Callback callback;
final FutureRecordMetadata future;
public Thunk(Callback callback, FutureRecordMetadata future) {
this.callback = callback;
this.future = future;
}
}
在这个类中还有一个FutureRecordMetadata的属性,这个类有两个关键字段,
//指向对应消息所在RecordBatch的produceFuture字段
private final ProduceRequestResult result;
//记录了对应消息在RecordBatch中的偏移量
private final long relativeOffset;
这个类的基本操作,都是委托给了ProduceRequestResult对应的方法,当生产者收到某条消息的响应时,FutureRecordMetadata.get()方法就会返回RecordMetadata对象,这个对象包含offset,时间戳等数据,可供用户自定义Callback使用。
下面我们看看RecordBatch类的核心方法,tryAppend(),主要功能是追加到当前记录集并返回该记录集内的相对偏移量
/** * Append the record to the current record set and return the relative offset within that record set * 将记录追加到当前记录集并返回该记录集内的相对偏移量 * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. * 与此记录对应的RecordSend;如果没有足够的空间,则为null。 */ public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) { //估算剩余空间不足,前面说过,这不是一个准确值 if (!this.records.hasRoomFor(key, value)) { return null; } else { //向MemoryRecords中添加数据,注意,offsetCounter是在RecordBatch中的偏移量 long checksum = this.records.append(offsetCounter++, timestamp, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); this.lastAppendTime = now; //更新统计信息(省略) //创建FutureRecordMetadata对象 FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length); //将用户自定义的CallBack和FutureRecordMetadata封装成Thunk,保存在Thunks中 if (callback != null) thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } }
当RecordBatch成功收到响应、或超时、或关闭生产者时,都会调用RecordBatch的done()方法。在done()方法中,会回调全部的callback,当整个RecordBatch中的消息全部处理完后,还会调用produceFuture.done()方法。
源码如下
/** * Complete the request * 完成请求 * 当RecordBatch成功收到正常响应、或超时、或关闭生产者时,都会调用此方法 * @param baseOffset The base offset of the messages assigned by the server * @param timestamp The timestamp returned by the broker. * @param exception The exception that occurred (or null if the request was successful) */ public void done(long baseOffset, long timestamp, RuntimeException exception) { log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", topicPartition, baseOffset, exception); // execute callbacks // 回调全部消息的Callback方法 for (int i = 0; i < this.thunks.size(); i++) { try { Thunk thunk = this.thunks.get(i); //正常处理完成 if (exception == null) { // If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used. // 将服务端返回的信息(offset和timstamp)和消息的其他信息封装成RecordMetadata RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset(), timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp, thunk.future.checksum(), thunk.future.serializedKeySize(), thunk.future.serializedValueSize()); //调用消息对应的自定义callback thunk.callback.onCompletion(metadata, null); //处理过程出现异常的情况,注意,第一个参数为null } else { thunk.callback.onCompletion(null, exception); } } catch (Exception e) { log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e); } } // 标识整个RecordBatch都已经处理完成,并调用produceFutur的done方法 this.produceFuture.done(topicPartition, baseOffset, exception); }
ByteBuffer的创建和释放是比较消耗资源的,为了实现内存的搞笑利用,基本上每个成熟的框架或工具,都有一套内存管理机制。Kafka使用BufferPool来实现ByteBuffer的复用。每个BufferPool对象只针对特定大小(由poolableSize字段指定)的ByteBuffer进行管理,对于其他大小的ByteBuffer并不会缓存进BufferPool。但是也有例外,当一条消息的字节数大于MemoryRecords时,就不会复用BufferPool中缓存的ByteBuffer,而是额外分配ByteBuffer,在它被使用完后也不会放入BufferPool进行管理,而是直接丢弃,由GC回收。下面看下BufferPool的核心字段。
//记录了整个Pool的大小
private final long totalMemory;
private final int poolableSize;
//因为会有多线程并发分配和回收ByteBuffer,所以使用锁控制并发,保证线程安全
private final ReentrantLock lock;
//缓存了指定大小的ByteBuffer对象
private final Deque<ByteBuffer> free;
//记录因申请不到足够的空间而阻塞的线程,此队列中实际记录的是阻塞线程对应的Condition对象
private final Deque<Condition> waiters;
//记录了可用空间的大小,这个空间是totalMemory减去free列表中全部ByteBuffer的大小
private long availableMemory;
我们重点看下从缓冲池中申请ByteBuffer的方法BufferPool.allcote()方法
/** *从缓冲池中申请指定size的ByteBuffer,当缓冲池中空间不足,或者缓冲池是block状态时,就会阻塞调用线程 */ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); //加锁同步 this.lock.lock(); try { // 检查,请求的是poolableSize指定大小的ByteBuffer,且free中有空闲的ByteBuffer if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // 当申请的空间大小不是poolableSize,则执行下面的处理 // free队列中多是poolableSize大小的ByteBuffer,可以直接计算整个free队列的空间 int freeListSize = this.free.size() * this.poolableSize; //如果现在可用的内存+即将释放的内存之和大于此次请求的size if (this.availableMemory + freeListSize >= size) { // 为了让availableMemory>size,freeUp()方法会从free队列中不断释放ByteBuffer,直到avaliableMemory满足这次申请 freeUp(size); //减小avaliableMemory this.availableMemory -= size; lock.unlock(); return ByteBuffer.allocate(size); } else { // 没有足够的内存,所以这里阻塞住 int accumulated = 0; ByteBuffer buffer = null; //将Condition添加到waiters中 Condition moreMemory = this.lock.newCondition(); long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // 循环等待,知道我们获得足够的内存 while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { //阻塞 waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { //发生异常,移除此线程对应的Condition this.waiters.remove(moreMemory); throw e; } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); //统计阻塞时间 this.waitTime.record(timeNs, time.milliseconds()); } //超时报错 if (waitingTimeElapsed) { this.waiters.remove(moreMemory); throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // 如果请求的是poolableSize大小的ByteBuffer,且free中有空闲的ByteBuffer if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { buffer = this.free.pollFirst(); accumulated = size; //先分配一部分空间,并继续等待空闲空间 } else { freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.availableMemory); this.availableMemory -= got; accumulated += got; } } //已经成功分配空间,移除Codition Condition removed = this.waiters.removeFirst(); if (removed != moreMemory) throw new IllegalStateException("Wrong condition: this shouldn't happen."); // 要是还有空余空间,就唤醒下一个线程 if (this.availableMemory > 0 || !this.free.isEmpty()) { if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } // 解锁 lock.unlock(); if (buffer == null) return ByteBuffer.allocate(size); else return buffer; } } finally { //解锁 if (lock.isHeldByCurrentThread()) lock.unlock(); } }
相应的在释放资源时,会唤醒一个因空间不足而阻塞的线程
介绍完了前面3个,我们来看下RecordAccumulator的实现,先了解一下关键字
//指定每个RecordBatch底层ByteBuffer的大小
private final int batchSize;
//压缩类型
private final CompressionType compression;
//BufferPool对象
private final BufferPool free;
//缓存了发往对应TopicPartition的消息,Deque是非线程安全的集合,追加消息或发送RecordBatch的时候,需要加锁同步
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
//未发送完成的RecordBatch集合,底层通过Set<RecordBatch>集合实现
private final IncompleteRecordBatches incomplete;
// The following variables are only accessed by the sender thread, so we don't need to protect them.
private final Set<TopicPartition> muted;
//使用drain方法批量导出RecordBatch时,为了防止饥饿,使用drainIndex记录上次发送停止时的位置,下次继续从此位置开发发送
private int drainIndex;
我们简单看下流程图
下面我们看下源码
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). // 统计正在向RecordAccumulator中追加数据的线程数,确保不会错过批次abortIncompleteBatches appendsInProgress.incrementAndGet(); try { // check if we have an in-progress batch //步骤一:查找TopicPartition对应的Queue Deque<RecordBatch> dq = getOrCreateDeque(tp); //步骤二:对Deque对象加锁 synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); //步骤3:向Deque中最后一个RecordBatch追加Record RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); //步骤4:追加成功则直接返回 if (appendResult != null) return appendResult; } //步骤5:sync块结束,解锁 // we don't have an in-progress record batch try to allocate a new batch // 步骤6:追加失败,从BufferPool中申请新空间 int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size, maxTimeToBlock); //防止多个线程并发向BufferPool申请空间,造成内部碎片 synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); //对Deque加锁后,再次调用tryAppend()方法,尝试追加Record RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... // 步骤8:追加成功,则返回 // 步骤9:释放步骤7中申请的新空间 free.deallocate(buffer); return appendResult; } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); //步骤9:在新创建的RecordBatch中追加Record,并将其添加到batches集合中 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); dq.addLast(batch); //步骤10:将新建的RecordBatch追加到incomplete集合 incomplete.add(batch); //步骤11:返回RecordAppendResult return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); }//步骤12:sync块结束,解锁 } finally { appendsInProgress.decrementAndGet(); } }
会啥会需要加两次锁呢?一个是要减少锁的粒度,一个是在向BufferPool申请新的ByteBuffer的时候,可能会阻塞。那我们再往Deque中追加RecordBatch时为啥要加锁呢?因为可能会出现多个线程同时申请添加RecordBatch这个节点的情况。如下图
线程1和线程2都检测到RecordBatch2已经满了,同时向队列的尾部新增了一个RecordBatch,这时就会出现资源浪费的情况,所以我们必须要加锁操作
在消息发送给服务端前,会先调用RecordAccumulator.ready()方法,获取集群中符合发送条件的节点集合。下面是流程图
流程简单来说,就是判断这个Deque是否需要发送,如果需要发送的话,就获取Key也就是TopicPartition对应的leader节点的NodeId并保存下来,下面我看下源码
public ReadyCheckResult ready(Cluster cluster, long nowMs) { //用来记录可以向哪些Node节点发送消息 Set<Node> readyNodes = new HashSet<>(); //记录下次需要调用ready()方法的时间间隔 long nextReadyCheckDelayMs = Long.MAX_VALUE; //根据Metadata元数据中是否有找不到Leader副本的分区 boolean unknownLeadersExist = false; //是否有线程在阻塞等待BufferPool释放空间 boolean exhausted = this.free.queued() > 0; //遍历batches集合,对其中每个分区的Leader副本所在的Node都进行判断 for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); //查找分区的Leader副本所在的Node Node leader = cluster.leaderFor(part); //根据Cluster的信息检查Leader,Leader找不到,肯定不能发送消息 if (leader == null) { //标记为true,之后会触发Metadata的更新 unknownLeadersExist = true; } else if (!readyNodes.contains(leader) && !muted.contains(part)) { //加锁读取Deque中的元素 synchronized (deque) { //只去Deque中的第一个RecordBatch RecordBatch batch = deque.peekFirst(); if (batch != null) { //通过计算得到下面5个条件,具体计算过程略 boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; long waitedTimeMs = nowMs - batch.lastAttemptMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); //条件一: boolean full = deque.size() > 1 || batch.records.isFull(); //条件二: boolean expired = waitedTimeMs >= timeToWaitMs; //条件三:条件四:条件五: boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { readyNodes.add(leader); } else { //记录下次需要调用ready()方法检查的时间间隔 nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); }
实际发送前,还要对RecordBatch进行聚合,返回Map<Integer,List>的集合,key是NodeId,value是待发送的RecordBatch集合。这个drain()方法是由Sender线程调用的,核心逻辑是进行映射的转换:将RecordAccumulator记录中的TopicPartition->RecordBatch的集合的映射,转换成了NodeId->RecordBatch集合的映射,因为在网络I/O层面,生产者是面向Node节点发送数据,它只建立到Node的连接并发送数据,并不关心这些数据属于哪个TopicPartition,而在KafkaProducer的上层业务逻辑中,则是按照TopicPartition的方式产生数据,只关心发送到哪个TopicPartition,并不关心这些TopicPartition在哪个Node节点上。
public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); //转换后的结果 Map<Integer, List<RecordBatch>> batches = new HashMap<>(); //遍历指定的ready Node集合 for (Node node : nodes) { int size = 0; //获取当前Node上的分区集合 List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); //记录要发送的RecordBatch List<RecordBatch> ready = new ArrayList<>(); //drainIndex是batches的下标,记录上次发送停止的位置,下次继续从此位置开始发送 //若一直从索引0的队列开始发送,可能会出现一直只发送前几个分区的消息的情况,造成其他分区饥饿 int start = drainIndex = drainIndex % parts.size(); do { //获取分区的详细情况 PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); if (!muted.contains(tp)) { //获取对应的RecordBatch队列 Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition())); if (deque != null) { synchronized (deque) { //获取队列中第一个RecordBatch RecordBatch first = deque.peekFirst(); if (first != null) { boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; //如果不在退避期内,则仅排空该批次。 if (!backoff) { if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { //数据量已满,结束循环,一般是一个请求的大小 break; } else { //从队列中获取一个RecordBatch,并将这个RecordBatch放到ready集合中 //每个TopicPartition只取一个RecordBatch RecordBatch batch = deque.pollFirst(); //关闭Compressor及底层输出流,并将MemoryRecords设置为只读 batch.records.close(); size += batch.records.sizeInBytes(); ready.add(batch); batch.drainedMs = now; } } } } } } //更新drainIndex this.drainIndex = (this.drainIndex + 1) % parts.size(); } while (start != drainIndex); //记录NodeId与RecordBatch的对应关系 batches.put(node.id(), ready); } return batches; }
至此缓存消息的RecordAccumulator就讲完了。
本文参考:
1.Apache Kafka源码剖析,徐郡明
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。