赞
踩
ecords = 0; private float actualCompressionRatio = 1; private long maxTimestamp = RecordBatch.NO_TIMESTAMP; private long offsetOfMaxTimestamp = -1; private Long lastOffset = null; private Long firstTimestamp = null; private MemoryRecords builtRecords;
从该类属性字段来看比较多,这里只讲2个关于字节流的字段。
来看看它的初始化构造方法。
public MemoryRecordsBuilder(ByteBuffer buffer,...) { this(new ByteBufferOutputStream(buffer), ...); } public MemoryRecordsBuilder( ByteBufferOutputStream bufferStream, ... int writeLimit) { .... this.initialPosition = bufferStream.position(); this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType); bufferStream.position(initialPosition + batchHeaderSizeInBytes); this.bufferStream = bufferStream; this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic)); } }
从构造函数可以看出,除了基本字段的赋值之外,会做以下3件事情:
看到这里,挺有意思的,不知读者是否意识到这里涉及到 「ByteBuffer」、「bufferStream」 、「appendStream」。
三者的关系是通过「装饰器模式」实现的,即 bufferStream 对 ByteBuffer 装饰实现扩容功能,而 appendStream 又对 bufferStream 装饰实现压缩功能。
来看看它的核心方法。
(1)appendWithOffset()
public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers); } private long nextSequentialOffset() { return lastOffset == null ? baseOffset : lastOffset + 1; } private Long appendWithOffset( long offset, boolean isControlRecord, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { try { if (isControlRecord != isControlBatch) throw new ...; if (lastOffset != null && offset <= lastOffset) throw new ...; if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP) throw new ...; if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) throw new ...; if (firstTimestamp == null) firstTimestamp = timestamp; if (magic > RecordBatch.MAGIC_VALUE_V1) { appendDefaultRecord(offset, timestamp, key, value, headers); return null; } else { return appendLegacyRecord(offset, timestamp, key, value, magic); } } catch (IOException e) { } }
该方法主要用来根据偏移量追加写消息,会根据消息版本来写对应消息,但需要明确的是 ProducerBatch 对标 V2 版本。
来看看 V2 版本消息写入逻辑。
private void appendDefaultRecord( long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException { ensureOpenForRecordAppend(); int offsetDelta = (int) (offset - baseOffset); long timestampDelta = timestamp - firstTimestamp; int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers); recordWritten(offset, timestamp, sizeInBytes); } private void ensureOpenForRecordAppend() { if (appendStream == CLOSED_STREAM) throw new ...; } private void recordWritten(long offset, long timestamp, int size) { .... numRecords += 1; uncompressedRecordsSizeInBytes += size; lastOffset = offset; if (magic > RecordBatch.MAGIC_VALUE_V0 && timestamp > maxTimestamp) { maxTimestamp = timestamp; offsetOfMaxTimestamp = offset; } }
该方法主要用来写入 V2 版本消息的,主要做以下5件事情:
(2)hasRoomFor()
public boole
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。