赞
踩
上一篇文章介绍过KafkaProduer可以有同步和异步两种方式发送消息,其实两者的底层实现相同,都是通过异步方式实现的。主线程调用send方法发送消息的时候,先将消息放大RecordAccumulator中暂存,然后主线程就可以返回了,这个时候消息并没有真正的发送给Kafka,而是换存在了RecordAccumulator中,之后,主线程通过send方法不断的向RecordAccumulator里面不断追加消息,当达到一定的条件,会唤醒Sender线程发送RecordAccumulator里面的消息。RecordAccumulator中有一个以TopicPartition为key的ConcurrentMap,每个value是ArrayDeque<RecordBatch>,其中缓存了发往对应的TopicPartition的消息。每个RecordBatch拥有一个MemoryRecords对象的引用,他才是消息最终存放的地方。
再MemeoryRecords里面有下面四个字段比较重要:
buffer:用于保存消息数据的java NIO ByteBuffer
writeLimit:记录buffer字段最多可以写入多少个字节的数据
compressor:压缩器,对消息数据进行压缩,然后将压缩的数据输出到buffer。
writable:标记是只读模式还是可写模式
public Compressor(ByteBuffer buffer, CompressionType type) { //保存压缩的类型,和buffer开始的位置 this.type = type; this.initPos = buffer.position(); this.numRecords = 0; this.writtenUncompressed = 0; this.compressionRate = 1; this.maxTimestamp = Record.NO_TIMESTAMP; if (type != CompressionType.NONE) { // for compressed records, leave space for the header and the shallow message metadata // and move the starting position to the value payload offset buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD); } // 创建合适的输入流 bufferStream = new ByteBufferOutputStream(buffer); //根据压缩类型创建压缩流 appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE); }
再Compressor的构造方法里面最终会调用wrapForOutput方法为当前的buffer创建指定类型的压缩流
public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { try { switch (type) { case NONE: return new DataOutputStream(buffer); case GZIP: return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); case SNAPPY: try { OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); } case LZ4: try { OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); } default: throw new IllegalArgumentException("Unknown compression type: " + type); } } catch (IOException e) { throw new KafkaException(e); } }
在wrapForOutput方法里面主要也是根据指定的压缩类型创建输出流,所以说,在Compressor中通过利用装饰者模式使buffer有了自动扩容和压缩的功能。
下面我们机选看MemoryRecords里面几个比较重要的方法:
emptyRecords:我们只能通过它来返回MemoryRecords对象
append:首先会判断是否是可写模式,然后调用Compressor的put方法
hasRoomFor:根据Compressor估算已写字节数
close:当有扩容的情况时,MemoryRecords.buffer字段ByteBufferOutputStream.buffer字段所指向的不再是同一个ByteBuffer对象,所以close方法会将MemoryRecords.buffer指向扩容后的对象,同时,设置为只读模式。
sizeInBytes:对于可写返回的是ByteBufferOutputStream.buffer大小,对于只读返回的是MemoryRecords.buffer大小
下面来看一下RecordBatch的核心方法:
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
//判断是否还有空间
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
//向MemoryRecords里面添加内容
long checksum = this.records.append(offsetCounter++, timestamp, key, value);
//更新统计信息
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
//创建FutureRecordMetadata对象
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。