赞
踩
基于kafka2.x 版本
生产者局部架构图
Sender线程run()步骤:场景驱动方式cluster中已有元数据
1、获取元数据
2、判断哪些partitinon有消息要发送,获取分区号对应的broker主机
3、标志还没有拉取到元数据的topic
4、检查与要发送数据的主机网络是否建立好
5、有可能要发送的parttition有多个,有一些parttition的leader partition在同一台服务器上。
按照broker进行分区,同一个broker的partition为同一组
6、放弃超时的batch
7、创建发送消息的请求
8、执行网络操作的这个NetWordClient组件,包括:发送请求、接收响应
run()流程图:
Sender 发送流程中,大部分都是调用 RecordAccumulator 方法来实现其特定逻辑,重点对上述涉及到RecordAccumulator 的方法进行一个详细剖析
RecordAccumulator#ready 该方法主要就是根据缓存区中的消息,判断哪些分区已经达到发送条件。
public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set<String> unknownLeaderTopics = new HashSet<>(); boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) { // @1 TopicPartition part = entry.getKey(); Deque<ProducerBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); // @2 synchronized (deque) { if (leader == null && !deque.isEmpty()) { // @3 // This is a partition for which leader is not known, but messages are available to send. // Note that entries are currently not removed from batches when deque is empty. unknownLeaderTopics.add(part.topic()); } else if (!readyNodes.contains(leader) && !isMuted(p
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。