当前位置:   article > 正文

Kafka源码之RecordAccumulator分析_kafka recordaccumulator

kafka recordaccumulator

上一篇文章介绍过KafkaProduer可以有同步和异步两种方式发送消息,其实两者的底层实现相同,都是通过异步方式实现的。主线程调用send方法发送消息的时候,先将消息放大RecordAccumulator中暂存,然后主线程就可以返回了,这个时候消息并没有真正的发送给Kafka,而是换存在了RecordAccumulator中,之后,主线程通过send方法不断的向RecordAccumulator里面不断追加消息,当达到一定的条件,会唤醒Sender线程发送RecordAccumulator里面的消息。RecordAccumulator中有一个以TopicPartition为key的ConcurrentMap,每个value是ArrayDeque<RecordBatch>,其中缓存了发往对应的TopicPartition的消息。每个RecordBatch拥有一个MemoryRecords对象的引用,他才是消息最终存放的地方。
再MemeoryRecords里面有下面四个字段比较重要:
buffer:用于保存消息数据的java NIO ByteBuffer
writeLimit:记录buffer字段最多可以写入多少个字节的数据
compressor:压缩器,对消息数据进行压缩,然后将压缩的数据输出到buffer。
writable:标记是只读模式还是可写模式

public Compressor(ByteBuffer buffer, CompressionType type) {
		//保存压缩的类型,和buffer开始的位置
        this.type = type;
        this.initPos = buffer.position();

        this.numRecords = 0;
        this.writtenUncompressed = 0;
        this.compressionRate = 1;
        this.maxTimestamp = Record.NO_TIMESTAMP;

        if (type != CompressionType.NONE) {
            // for compressed records, leave space for the header and the shallow message metadata
            // and move the starting position to the value payload offset
            buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
        }

        // 创建合适的输入流
        bufferStream = new ByteBufferOutputStream(buffer);
        //根据压缩类型创建压缩流
        appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

再Compressor的构造方法里面最终会调用wrapForOutput方法为当前的buffer创建指定类型的压缩流

public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
        try {
            switch (type) {
                case NONE:
                    return new DataOutputStream(buffer);
                case GZIP:
                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
                case SNAPPY:
                    try {
                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
                        return new DataOutputStream(stream);
                    } catch (Exception e) {
                        throw new KafkaException(e);
                    }
                case LZ4:
                    try {
                        OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
                        return new DataOutputStream(stream);
                    } catch (Exception e) {
                        throw new KafkaException(e);
                    }
                default:
                    throw new IllegalArgumentException("Unknown compression type: " + type);
            }
        } catch (IOException e) {
            throw new KafkaException(e);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

在wrapForOutput方法里面主要也是根据指定的压缩类型创建输出流,所以说,在Compressor中通过利用装饰者模式使buffer有了自动扩容和压缩的功能。
下面我们机选看MemoryRecords里面几个比较重要的方法:
emptyRecords:我们只能通过它来返回MemoryRecords对象
append:首先会判断是否是可写模式,然后调用Compressor的put方法
hasRoomFor:根据Compressor估算已写字节数
close:当有扩容的情况时,MemoryRecords.buffer字段ByteBufferOutputStream.buffer字段所指向的不再是同一个ByteBuffer对象,所以close方法会将MemoryRecords.buffer指向扩容后的对象,同时,设置为只读模式。
sizeInBytes:对于可写返回的是ByteBufferOutputStream.buffer大小,对于只读返回的是MemoryRecords.buffer大小
下面来看一下RecordBatch的核心方法:

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
		//判断是否还有空间
        if (!this.records.hasRoomFor(key, value)) {
            return null;
        } else {
        	//向MemoryRecords里面添加内容
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
           //更新统计信息
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            //创建FutureRecordMetadata对象
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/618325
推荐阅读
相关标签
  

闽ICP备14008679号