当前位置:   article > 正文

kafka源码解析(3)生产者解析之RecordAccumulator

recordaccumulator

RecordAccumulator

文章结构
1.RecordAccumulator写入流程
2.batches :CopyOnWriteMap解析
3.线程安全和高并发效率的保证

1.RecordAccumulator写入流程

这是生产者的缓冲组件,可以在生产者的send方法中看到这句

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);
  • 1

下面进入append方法看下

在这里插入图片描述
经过分区器后向RecordAccumulator里面的batches缓存,一开始要判断该topic是否已经有队列
batches是concurrentMap实现类,k 是topic+partiton封装的类, v是队列。

在getOrCreateDeque方法中,如果batches没有tp,那就新建一个队列并返回。

为什么是putIfAbsent(里面判断是否存在),而不是直接put,这个是在我了解到:CopyOnWriteMap之后才明白的,后面会说。

private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
        Deque<ProducerBatch> d = this.batches.get(tp);
        if (d != null)
            return d;
        d = new ArrayDeque<>();
        Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
        if (previous == null)
            return d;
        else
            return previous;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

至此得到了对应tp的队列。

然后锁住这个队列,只能有一个send线程获得dq对象,往里面tryAppend

synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

再进入tryAppend看下,取deque最后一个元素,如果有,那就试着加进去(有可能因为size问题或者根本没有元素不成功),返回的future就是空的。如果最后一个元素不是空且新消息的size没问题,那就进last.tryAppend。因为新数据没有初始化RecordBuilder,还是不会添加成功。具体可以去看ProducerBatch类的tryAppend(不是RecordAccumulator的tryAppend哦,差点看混了)

总结下

如果队列有最后一个batch,且batch有地方容纳消息,那就加进去,然后就结束了
如果队列有最后一个batch,但是batch没地方了,last.tryAppend返回空,last关闭写入,继续往后走
如果队列没有最后一个batch(空),往后走

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque<ProducerBatch> deque) {
        ProducerBatch last = deque.peekLast();
        if (last != null) {
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
            if (future == null)
                last.closeForRecordAppends();
            else
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
        }
        return null;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

下一步估计消息大小,给内存分配计算,取batchSize和消息最大值。

int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, 
compression, key, value, headers));  
buffer = free.allocate(size, maxTimeToBlock);      
  • 1
  • 2
  • 3

分配好内存后再次锁住dq开始搞事情
又一次tryAppend(这步真没看懂作用,难道是free.allocate改变了什么,后续明白了会回补这块)

下一步新建刚才估算大小尺寸的recordsBuilder,封装进batch,batch装消息

dq装batch
至此消息成功装进RecordAccumulator

synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }

                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);

                // Don't deallocate this buffer in the finally block as it's being used in the record batch
                buffer = null;
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
            }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2. batches : CopyOnWriteMap

实现的ConcurrentMap方法,用读写分离来实现线程安全,多个线程在读取这个map时候,会得到这个指向这个map的指针。如果有线程想修改map内容,系统就复制一份map,在这个线程修改好以后,把新的指针赋给ConcurrentMap实现类。可以看到下面的put方法是 synchronized修饰的,因此同一时间只能有一个线程修改内容。在修改的时候别的线程依然可以用老的指针读取。这个非常适合读多写少的场景。

public synchronized V put(K k, V v) {
        Map<K, V> copy = new HashMap<K, V>(this.map);
        V prev = copy.put(k, v);
        this.map = Collections.unmodifiableMap(copy);
        return prev;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

每个消息都会走一次getOrCreateDeque方法,batches随便读,即使有新的tp来了去修改batches时候也能访问。消息的数量远远多于tp数量,因此这个结构划算。

前面留下的疑问,putIfAbsent而不是put,假设现在该tp是新出现的的,多个线程都get到tp所在的dq返回null之后都去尝试向batches放新的队列,put操作是synchronized的,因此第一个获得锁的线程添加好之后,该dq建好了,别的线程要判断下。

private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
        Deque<ProducerBatch> d = this.batches.get(tp);
        if (d != null)
            return d;
        d = new ArrayDeque<>();
        Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
        if (previous == null)
            return d;
        else
            return previous;
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

3. 线程安全和高并发效率的保证

batches新增dq方法是synchronized只限一位,copyonwrite复制map内容写好后返回新map指针,不锁老map

因为不锁老map多个线程拿dq指针时候不受限制

写dq时候锁dq对象,但是map从来不锁上,因此dq指针随便拿

自己画图展现了下多线程效率
线程1在添加键值对时候,
线程3在取dq2
线程2在改dq3,dq3锁上了
线程4在拿dq3指针

这些动作同时都能做,太牛了!
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/618392
推荐阅读
相关标签
  

闽ICP备14008679号