当前位置:   article > 正文

Kafka核心源码分析-生产者-RecordAccumulator_kafka源码recordaccutor

kafka源码recordaccutor

1.简单介绍

主线程调用KafkaProducer.send()方法发送消息的时候,先将消息放到RecordAccumulator中暂存,然后主线程就可以从send()方法中返回了,此时消息并没有真正发送给Kafka,而是在RecordAccumulator中暂存,当RecordAccumulator中存储量达到一个条件后,便会唤醒Sender线程发送RecordAccumulator中的消息。
在这里插入图片描述
RecordAccumulator中用一个ConcurrentHashMap来缓存我们的数据,其中key保存的分区的信息,value保存的是具体的消息记录,value使用ArrayQueue来保存,RecordBatch又使用MemoryRecords来保存。数据结构如上图。

2.源码分析

大体了解了RecordAccumulator的数据结构后,我们从底层的MemoryRecords开始分析。

2.1 MemoryRecords

MemoryRecords表示的是对多个消息的集合。底层是使用ByteBuffer来存储数据的。简单介绍一下属性

	// 仅用于附加的压缩器,对消息进行压缩,将压缩后的数据输出到buffer
    private final Compressor compressor;

    // 记录buffer字段最多可以写入多少个字节的数据
    private final int writeLimit;

    // 初始缓冲区的容量,仅用于可写记录的取消分配
    private final int initialCapacity;

    // 用于读取的底层缓冲区; 当记录仍然可写时,它为null
    private ByteBuffer buffer;

    // 此MemoryRecords是可读还是可写模式,在MemoryRecords发送前,是只读模式
    private boolean writable;

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

Compressor的压缩类型是由"compress.type"配置参数指定的,目前版本主要支持GZIP,SNAPPY,LZ4三种压缩方式。

MemoryRecords的构造方法是私有的,只能通过empty()方法得到其对象。主要的方法有append(),close()

public long append(long offset, long timestamp, byte[] key, byte[] value) {
        if (!writable)
            throw new IllegalStateException("Memory records is not writable");

        int size = Record.recordSize(key, value);
        //偏移量
        compressor.putLong(offset);
        //消息的size
        compressor.putInt(size);
        //写入数据
        long crc = compressor.putRecord(timestamp, key, value);
        //指明此消息体的大小
        compressor.recordWritten(size + Records.LOG_OVERHEAD);
        //返回数据量大小
        return crc;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

close()方法中,会将MemoryRecords.buffer字段指向扩容后的ByteBuffer对象,同时将writable设置为false,即只读模式。

2.2 RecordBatch

我们继续分析上层的结构,,每个RecordBatch对象中封装了一个MemoryRecords对象,除此之外,还封装了很多控制信息和统计信息,下面简单介绍一下。

	//记录了保存的Record个数
    public int recordCount = 0;
    //最大的Record的字节数
    public int maxRecordSize = 0;
    //尝试发送当前RecordBatch的次数
    public volatile int attempts = 0;
    public final long createdMs;
    public long drainedMs;
    //最后一次尝试发送的时间戳
    public long lastAttemptMs;
    //拥有一个MemoryRecords的引用,这个才是消息真正存放的地方,指向存储数据的MemoryRecords对象
    public final MemoryRecords records;
    //当前RecordBatch中缓存的消息,都会发送给此TopicPartition
    public final TopicPartition topicPartition;
    //标识RecordBatch状态的Future对象
    public final ProduceRequestResult produceFuture;
    //最后一次向RecordBatch追加消息的时间戳
    public long lastAppendTime;
    //Thunk对象的集合,可以理解为消息的回调对象对列,Thunk中的callback字段就指向对应消息的Callback对象
    private final List<Thunk> thunks;
    //用来记录某消息在RecordBatch中的偏移量
    private long offsetCounter = 0L;
    //是否正在重试,如果RecordBatch中的数据发送失败,则会重新尝试发送
    private boolean retry;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

我们来看下接收到响应时的流程图
在这里插入图片描述
以RecordBatch中的done()方法画的,ProduceRequestResult这个类实现了类似Future的功能,使用count值为1的CountdownLatch对象,来标记此RecordBatch是否发送完成。当RecordBatch的消息被全部响应,是会调用ProduceRequestResult.done()方法的,用error字段标识是否正常完成,之后调用CountdownLatch.countDown(),唤醒所有等待此消息发送完成的线程。简单看下源码

   /**
     * 将此请求标记为已完成,并取消阻止所有等待其完成的线程。
     * error表示是正常完成还是异常完成
     *
     */
    public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
        this.topicPartition = topicPartition;
        this.baseOffset = baseOffset;
        this.error = error;
        //countDown-1
        this.latch.countDown();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

ProduceRequestResult中有一个属性,baseOffset,这里保存的是服务端为此RecordBatch中第一条消息分配的offset的值,为什么需要这个值?因为我们在发送完消息后,需要知道这个消息的在服务端的offset,这样我们就不需要知道这个RecordBatch中所有消息的offset了,只需要知道第一个消息的offset,再集合自身在RecordBatch中的相对偏移量,就可以计算出来当前消息在服务端分区中的偏移量了。

Thunk指的是消息的回调对象队列,Thunk中的callback字段就指向对应消息的Callback对象

 /**
     * A callback and the associated FutureRecordMetadata argument to pass to it.
     * RecordBatch的内部类
     * 回调以及要传递给它的关联的FutureRecordMetadata参数。
     */
    final private static class Thunk {
        final Callback callback;

        final FutureRecordMetadata future;

        public Thunk(Callback callback, FutureRecordMetadata future) {
            this.callback = callback;
            this.future = future;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在这个类中还有一个FutureRecordMetadata的属性,这个类有两个关键字段,

	//指向对应消息所在RecordBatch的produceFuture字段
    private final ProduceRequestResult result;
    //记录了对应消息在RecordBatch中的偏移量
    private final long relativeOffset;
  • 1
  • 2
  • 3
  • 4

这个类的基本操作,都是委托给了ProduceRequestResult对应的方法,当生产者收到某条消息的响应时,FutureRecordMetadata.get()方法就会返回RecordMetadata对象,这个对象包含offset,时间戳等数据,可供用户自定义Callback使用。

下面我们看看RecordBatch类的核心方法,tryAppend(),主要功能是追加到当前记录集并返回该记录集内的相对偏移量

/**
     * Append the record to the current record set and return the relative offset within that record set
     * 将记录追加到当前记录集并返回该记录集内的相对偏移量
     * 
     * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
     * 与此记录对应的RecordSend;如果没有足够的空间,则为null。
     */
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
        //估算剩余空间不足,前面说过,这不是一个准确值
        if (!this.records.hasRoomFor(key, value)) {
            return null;
        } else {
            //向MemoryRecords中添加数据,注意,offsetCounter是在RecordBatch中的偏移量
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            //更新统计信息(省略)
            //创建FutureRecordMetadata对象
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            //将用户自定义的CallBack和FutureRecordMetadata封装成Thunk,保存在Thunks中
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

当RecordBatch成功收到响应、或超时、或关闭生产者时,都会调用RecordBatch的done()方法。在done()方法中,会回调全部的callback,当整个RecordBatch中的消息全部处理完后,还会调用produceFuture.done()方法。

源码如下

/**
     * Complete the request
     * 完成请求
     * 当RecordBatch成功收到正常响应、或超时、或关闭生产者时,都会调用此方法
     * @param baseOffset The base offset of the messages assigned by the server
     * @param timestamp The timestamp returned by the broker.
     * @param exception The exception that occurred (or null if the request was successful)
     */
    public void done(long baseOffset, long timestamp, RuntimeException exception) {
        log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
                  topicPartition,
                  baseOffset,
                  exception);
        // execute callbacks
        // 回调全部消息的Callback方法
        for (int i = 0; i < this.thunks.size(); i++) {
            try {
                Thunk thunk = this.thunks.get(i);
                //正常处理完成
                if (exception == null) {
                    // If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used.
                    // 将服务端返回的信息(offset和timstamp)和消息的其他信息封装成RecordMetadata
                    RecordMetadata metadata = new RecordMetadata(this.topicPartition,  baseOffset, thunk.future.relativeOffset(),
                                                                 timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp,
                                                                 thunk.future.checksum(),
                                                                 thunk.future.serializedKeySize(),
                                                                 thunk.future.serializedValueSize());
                    //调用消息对应的自定义callback
                    thunk.callback.onCompletion(metadata, null);
                //处理过程出现异常的情况,注意,第一个参数为null
                } else {
                    thunk.callback.onCompletion(null, exception);
                }
            } catch (Exception e) {
                log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e);
            }
        }
        // 标识整个RecordBatch都已经处理完成,并调用produceFutur的done方法
        this.produceFuture.done(topicPartition, baseOffset, exception);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

2.3 BufferPool

ByteBuffer的创建和释放是比较消耗资源的,为了实现内存的搞笑利用,基本上每个成熟的框架或工具,都有一套内存管理机制。Kafka使用BufferPool来实现ByteBuffer的复用。每个BufferPool对象只针对特定大小(由poolableSize字段指定)的ByteBuffer进行管理,对于其他大小的ByteBuffer并不会缓存进BufferPool。但是也有例外,当一条消息的字节数大于MemoryRecords时,就不会复用BufferPool中缓存的ByteBuffer,而是额外分配ByteBuffer,在它被使用完后也不会放入BufferPool进行管理,而是直接丢弃,由GC回收。下面看下BufferPool的核心字段。

	//记录了整个Pool的大小
    private final long totalMemory;
    private final int poolableSize;
    //因为会有多线程并发分配和回收ByteBuffer,所以使用锁控制并发,保证线程安全
    private final ReentrantLock lock;
    //缓存了指定大小的ByteBuffer对象
    private final Deque<ByteBuffer> free;
    //记录因申请不到足够的空间而阻塞的线程,此队列中实际记录的是阻塞线程对应的Condition对象
    private final Deque<Condition> waiters;
    //记录了可用空间的大小,这个空间是totalMemory减去free列表中全部ByteBuffer的大小
    private long availableMemory;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

我们重点看下从缓冲池中申请ByteBuffer的方法BufferPool.allcote()方法

/**
  *从缓冲池中申请指定size的ByteBuffer,当缓冲池中空间不足,或者缓冲池是block状态时,就会阻塞调用线程
  */
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");
        //加锁同步
        this.lock.lock();
        try {
            // 检查,请求的是poolableSize指定大小的ByteBuffer,且free中有空闲的ByteBuffer
            if (size == poolableSize && !this.free.isEmpty())
                return this.free.pollFirst();

            // 当申请的空间大小不是poolableSize,则执行下面的处理
            // free队列中多是poolableSize大小的ByteBuffer,可以直接计算整个free队列的空间
            int freeListSize = this.free.size() * this.poolableSize;
            //如果现在可用的内存+即将释放的内存之和大于此次请求的size
            if (this.availableMemory + freeListSize >= size) {
                // 为了让availableMemory>size,freeUp()方法会从free队列中不断释放ByteBuffer,直到avaliableMemory满足这次申请
                freeUp(size);
                //减小avaliableMemory
                this.availableMemory -= size;
                lock.unlock();
                return ByteBuffer.allocate(size);
            } else {
                // 没有足够的内存,所以这里阻塞住
                int accumulated = 0;
                ByteBuffer buffer = null;
                //将Condition添加到waiters中
                Condition moreMemory = this.lock.newCondition();
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                this.waiters.addLast(moreMemory);
                // 循环等待,知道我们获得足够的内存
                while (accumulated < size) {
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    boolean waitingTimeElapsed;
                    try {
                        //阻塞
                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                    } catch (InterruptedException e) {
                        //发生异常,移除此线程对应的Condition
                        this.waiters.remove(moreMemory);
                        throw e;
                    } finally {
                        long endWaitNs = time.nanoseconds();
                        timeNs = Math.max(0L, endWaitNs - startWaitNs);
                        //统计阻塞时间
                        this.waitTime.record(timeNs, time.milliseconds());
                    }
                    //超时报错
                    if (waitingTimeElapsed) {
                        this.waiters.remove(moreMemory);
                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                    }

                    remainingTimeToBlockNs -= timeNs;
                    // 如果请求的是poolableSize大小的ByteBuffer,且free中有空闲的ByteBuffer
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    //先分配一部分空间,并继续等待空闲空间
                    } else {
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.availableMemory);
                        this.availableMemory -= got;
                        accumulated += got;
                    }
                }

                //已经成功分配空间,移除Codition
                Condition removed = this.waiters.removeFirst();
                if (removed != moreMemory)
                    throw new IllegalStateException("Wrong condition: this shouldn't happen.");

                // 要是还有空余空间,就唤醒下一个线程
                if (this.availableMemory > 0 || !this.free.isEmpty()) {
                    if (!this.waiters.isEmpty())
                        this.waiters.peekFirst().signal();
                }

                // 解锁
                lock.unlock();
                if (buffer == null)
                    return ByteBuffer.allocate(size);
                else
                    return buffer;
            }
        } finally {
            //解锁
            if (lock.isHeldByCurrentThread())
                lock.unlock();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97

相应的在释放资源时,会唤醒一个因空间不足而阻塞的线程
在这里插入图片描述

2.4 RecordAccumulator

介绍完了前面3个,我们来看下RecordAccumulator的实现,先了解一下关键字

    //指定每个RecordBatch底层ByteBuffer的大小
    private final int batchSize;
    //压缩类型
    private final CompressionType compression;
    //BufferPool对象
    private final BufferPool free;
    //缓存了发往对应TopicPartition的消息,Deque是非线程安全的集合,追加消息或发送RecordBatch的时候,需要加锁同步
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
    //未发送完成的RecordBatch集合,底层通过Set<RecordBatch>集合实现
    private final IncompleteRecordBatches incomplete;
    // The following variables are only accessed by the sender thread, so we don't need to protect them.
    private final Set<TopicPartition> muted;
    //使用drain方法批量导出RecordBatch时,为了防止饥饿,使用drainIndex记录上次发送停止时的位置,下次继续从此位置开发发送
    private int drainIndex;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

我们简单看下流程图
在这里插入图片描述
下面我们看下源码

public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        // 统计正在向RecordAccumulator中追加数据的线程数,确保不会错过批次abortIncompleteBatches
        appendsInProgress.incrementAndGet();
        try {
            // check if we have an in-progress batch
            //步骤一:查找TopicPartition对应的Queue
            Deque<RecordBatch> dq = getOrCreateDeque(tp);
            //步骤二:对Deque对象加锁
            synchronized (dq) {
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                //步骤3:向Deque中最后一个RecordBatch追加Record
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                //步骤4:追加成功则直接返回
                if (appendResult != null)
                    return appendResult;
            }
            //步骤5:sync块结束,解锁

            // we don't have an in-progress record batch try to allocate a new batch
            // 步骤6:追加失败,从BufferPool中申请新空间
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            //防止多个线程并发向BufferPool申请空间,造成内部碎片
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");

                //对Deque加锁后,再次调用tryAppend()方法,尝试追加Record
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    // 步骤8:追加成功,则返回
                    // 步骤9:释放步骤7中申请的新空间
                    free.deallocate(buffer);
                    return appendResult;
                }
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                //步骤9:在新创建的RecordBatch中追加Record,并将其添加到batches集合中
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

                dq.addLast(batch);
                //步骤10:将新建的RecordBatch追加到incomplete集合
                incomplete.add(batch);
                //步骤11:返回RecordAppendResult
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }//步骤12:sync块结束,解锁
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

会啥会需要加两次锁呢?一个是要减少锁的粒度,一个是在向BufferPool申请新的ByteBuffer的时候,可能会阻塞。那我们再往Deque中追加RecordBatch时为啥要加锁呢?因为可能会出现多个线程同时申请添加RecordBatch这个节点的情况。如下图
在这里插入图片描述
线程1和线程2都检测到RecordBatch2已经满了,同时向队列的尾部新增了一个RecordBatch,这时就会出现资源浪费的情况,所以我们必须要加锁操作

在消息发送给服务端前,会先调用RecordAccumulator.ready()方法,获取集群中符合发送条件的节点集合。下面是流程图

流程简单来说,就是判断这个Deque是否需要发送,如果需要发送的话,就获取Key也就是TopicPartition对应的leader节点的NodeId并保存下来,下面我看下源码

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        //用来记录可以向哪些Node节点发送消息
        Set<Node> readyNodes = new HashSet<>();
        //记录下次需要调用ready()方法的时间间隔
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        //根据Metadata元数据中是否有找不到Leader副本的分区
        boolean unknownLeadersExist = false;

        //是否有线程在阻塞等待BufferPool释放空间
        boolean exhausted = this.free.queued() > 0;
        //遍历batches集合,对其中每个分区的Leader副本所在的Node都进行判断
        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<RecordBatch> deque = entry.getValue();

            //查找分区的Leader副本所在的Node
            Node leader = cluster.leaderFor(part);
            //根据Cluster的信息检查Leader,Leader找不到,肯定不能发送消息
            if (leader == null) {
                //标记为true,之后会触发Metadata的更新
                unknownLeadersExist = true;
            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                //加锁读取Deque中的元素
                synchronized (deque) {
                    //只去Deque中的第一个RecordBatch
                    RecordBatch batch = deque.peekFirst();
                    if (batch != null) {
                        //通过计算得到下面5个条件,具体计算过程略
                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                        long waitedTimeMs = nowMs - batch.lastAttemptMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        //条件一:
                        boolean full = deque.size() > 1 || batch.records.isFull();
                        //条件二:
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        //条件三:条件四:条件五:
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {
                            //记录下次需要调用ready()方法检查的时间间隔
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }

        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

实际发送前,还要对RecordBatch进行聚合,返回Map<Integer,List>的集合,key是NodeId,value是待发送的RecordBatch集合。这个drain()方法是由Sender线程调用的,核心逻辑是进行映射的转换:将RecordAccumulator记录中的TopicPartition->RecordBatch的集合的映射,转换成了NodeId->RecordBatch集合的映射,因为在网络I/O层面,生产者是面向Node节点发送数据,它只建立到Node的连接并发送数据,并不关心这些数据属于哪个TopicPartition,而在KafkaProducer的上层业务逻辑中,则是按照TopicPartition的方式产生数据,只关心发送到哪个TopicPartition,并不关心这些TopicPartition在哪个Node节点上。

public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
                                                 Set<Node> nodes,
                                                 int maxSize,
                                                 long now) {
        if (nodes.isEmpty())
            return Collections.emptyMap();

        //转换后的结果
        Map<Integer, List<RecordBatch>> batches = new HashMap<>();
        //遍历指定的ready Node集合
        for (Node node : nodes) {
            int size = 0;
            //获取当前Node上的分区集合
            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
            //记录要发送的RecordBatch
            List<RecordBatch> ready = new ArrayList<>();
            //drainIndex是batches的下标,记录上次发送停止的位置,下次继续从此位置开始发送
            //若一直从索引0的队列开始发送,可能会出现一直只发送前几个分区的消息的情况,造成其他分区饥饿
            int start = drainIndex = drainIndex % parts.size();
            do {
                //获取分区的详细情况
                PartitionInfo part = parts.get(drainIndex);
                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                if (!muted.contains(tp)) {
                    //获取对应的RecordBatch队列
                    Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
                    if (deque != null) {
                        synchronized (deque) {
                            //获取队列中第一个RecordBatch
                            RecordBatch first = deque.peekFirst();
                            if (first != null) {
                                boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
                                //如果不在退避期内,则仅排空该批次。
                                if (!backoff) {
                                    if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
                                        //数据量已满,结束循环,一般是一个请求的大小
                                        break;
                                    } else {
                                        //从队列中获取一个RecordBatch,并将这个RecordBatch放到ready集合中
                                        //每个TopicPartition只取一个RecordBatch
                                        RecordBatch batch = deque.pollFirst();
                                        //关闭Compressor及底层输出流,并将MemoryRecords设置为只读
                                        batch.records.close();
                                        size += batch.records.sizeInBytes();
                                        ready.add(batch);
                                        batch.drainedMs = now;
                                    }
                                }
                            }
                        }
                    }
                }
                //更新drainIndex
                this.drainIndex = (this.drainIndex + 1) % parts.size();
            } while (start != drainIndex);
            //记录NodeId与RecordBatch的对应关系
            batches.put(node.id(), ready);
        }
        return batches;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

至此缓存消息的RecordAccumulator就讲完了。
本文参考:
1.Apache Kafka源码剖析,徐郡明

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/595897
推荐阅读
相关标签
  

闽ICP备14008679号