当前位置:   article > 正文

Kafka生产者之Sender分析_kafka drainbatchesforonenode

kafka drainbatchesforonenode

基于kafka2.x 版本

生产者局部架构图

 

image.png

Sender线程run()步骤:场景驱动方式cluster中已有元数据

1、获取元数据

2、判断哪些partitinon有消息要发送,获取分区号对应的broker主机

3、标志还没有拉取到元数据的topic

4、检查与要发送数据的主机网络是否建立好

5、有可能要发送的parttition有多个,有一些parttition的leader partition在同一台服务器上。

按照broker进行分区,同一个broker的partition为同一组

6、放弃超时的batch

7、创建发送消息的请求

8、执行网络操作的这个NetWordClient组件,包括:发送请求、接收响应

run()流程图:

image.png

Sender 发送流程中,大部分都是调用 RecordAccumulator 方法来实现其特定逻辑,重点对上述涉及到RecordAccumulator 的方法进行一个详细剖析

RecordAccumulator#ready 该方法主要就是根据缓存区中的消息,判断哪些分区已经达到发送条件。

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    Set<Node> readyNodes = new HashSet<>();
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    Set<String> unknownLeaderTopics = new HashSet<>();

    boolean exhausted = this.free.queued() > 0;
    for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {   // @1
        TopicPartition part = entry.getKey();
        Deque<ProducerBatch> deque = entry.getValue();

        Node leader = cluster.leaderFor(part);   // @2
        synchronized (deque) {
            if (leader == null && !deque.isEmpty()) {   // @3
                // This is a partition for which leader is not known, but messages are available to send.
                // Note that entries are currently not removed from batches when deque is empty.
                unknownLeaderTopics.add(part.topic());
            } else if (!readyNodes.contains(leader) && !isMuted(p
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号