赞
踩
转载:Kafka源码阅读(二):Producer Metadata概述及源码分析
什么是metadata?
metadata指Kafka集群的元数据,包含了Kafka集群的各种信息,例如如:
metadata应用场景
metadata在Kafka中无疑是非常重要的,很多场景中都需要从metadata中获取数据或更新数据,例如:
LeastLoadedNode
指Kafka集群中所有node中负载最小的那一个node
,它是由每个node再InFlightRequests中还未确定的请求数决定的,未确定的请求越少则负载越小。如上图所示,node1即为LeastLoadedNode。
当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息或者超过了rnetadata .rnax.age.rns
配置的时间还没有更新元数据就会进行元数据的强制更新。
元数据的更新操作是在客户端内部进行的
,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出LeastLoadedNode
,然后向这个node发送MetadataRequest
来获取具体的元数据信息。
创建完成MetadataRequest
后,该请求也会放入InFlightRequests
中,因此更新元数据与发送消息一样都是由Sender线程负责的,但是主线程也会读取元数据信息,因此这些操作都会通过synchronized和final来保证数据一致性。
上一篇博文中KafkaProducer发送消息的doSend()方法中调用了waitOnMetadata()方法来等待更新元数据,那么Kafka是如何等待更新元数据的呢?接下来就让我们通过阅读源码来分析一下这其中的一些细节。在开始分析源码之前我们先看下Cluster对象和Metadata对象中的主要属性,以便更好的理解代码。
// 该Metadata对象会被主线程和Sender线程共享, 当metadata不包含我们所需要的数据时会发送``MetadataRequest``来同步数据。 // ProducerMetadata继承了Metadata类 public class Metadata implements Closeable { private final Logger log; private final Map<String, Long> topics = new HashMap<>(); // topic和过期时间的对应关系 private final long refreshBackoffMs;// retry.backoff.ms: 默认值为100ms,它用来设定两次重试之间的时间间隔,避免无效的频繁重试. private final long metadataExpireMs;// metadata.max.age.ms: 默认值为300000,如果在这个时间内元数据没有更新的话会被 强制更新. private int updateVersion; // 更新版本号,每更新成功1次,version自增1,主要是用于判断metadata是否更新 private int requestVersion; // 请求版本号,没发送一次请求,version自增1 private long lastRefreshMs; // 上一次更新的时间(包含更新失败) private long lastSuccessfulRefreshMs; // 上一次更新成功的时间 private KafkaException fatalException; private Set<String> invalidTopics; // 非法的topics private Set<String> unauthorizedTopics; // 未认证的topics private MetadataCache cache = MetadataCache.empty(); private boolean needUpdate; private final ClusterResourceListeners clusterResourceListeners; // 会收到metadata updates的Listener列表 private boolean isClosed; private final Map<TopicPartition, Integer> lastSeenLeaderEpochs; // 存储Partition最近一次的leaderEpoch }
// 保存了Kafka集群中部分nodes、topics和partitions的信息
public final class Cluster {
private final boolean isBootstrapConfigured;
private final List<Node> nodes;
private final Set<String> unauthorizedTopics; // 未认证的topics
private final Set<String> invalidTopics; // 非法的topics
private final Set<String> internalTopics; // kafka内置的topics
private final Node controller;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; // partition对应的信息,如:leader所在节点、所有的副本、ISR中的副本、offline的副本
private final Map<String, List<PartitionInfo>> partitionsByTopic; // topic和partition信息的对应关系
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic; // topic和可用partition(leader不为null)的对应关系
private final Map<Integer, List<PartitionInfo>> partitionsByNode; // node和partition信息的对应关系
private final Map<Integer, Node> nodesById; //节点id与节点的对应关系
private final ClusterResource clusterResource; //集群信息,里面只有一个clusterId
}
了解Cluster对象和 Metadata对象的基本信息之后,接下来将正式进入分析代码阶段。
waitOnMetadata() // 等待更新集群的元数据 private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException { // 获取缓存中的cluster信息 Cluster cluster = metadata.fetch(); // 判断给定的topic在当前集群中是不是非法的(若果topic的partition没有leader,则认为该topic是invalid) if (cluster.invalidTopics().contains(topic)) throw new InvalidTopicException(topic); // 将topic添加到metadata的topics列表,并将过期时间重置为-1; 如果topics列表中不存在当前topic, // 则强制更新metadata并将requestVersion加1,同时将lastRefreshMs设为0,将needUpdate设为true metadata.add(topic); // 获取给定topic的分区数 Integer partitionsCount = cluster.partitionCountForTopic(topic); // 如果从缓存中获取的cluster中有partition,并且ProducerRecord中没有指定partition或者ProducerRecord中指定的partition在已知的partition范围内,则返回缓存中的cluster信息 if (partitionsCount != null && (partition == null || partition < partitionsCount)) return new ClusterAndWaitTime(cluster, 0); long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; // maxWaitMs: 等待更新metadata的最长时间 long elapsed; // 更新过程中已经消耗的时间 // 一直等待metadata更新,除非metadata中含有我们所需要的topic和partition信息,或者超过最大的等待时间 do { if (partition != null) { log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic); } else { log.trace("Requesting metadata update for topic {}.", topic); } // 参考上面介绍 metadata.add(topic); // 获取上一次更新的version,并将needUpdate设为true,强制更新 int version = metadata.requestUpdate(); // 唤醒Sender线程,Sender线程又会唤醒NetworkClient线程,并发送updateMetadataRequest请求 sender.wakeup(); try { // 一直等待更新metadata,直到当前的updateVersion大于上一次的updateVersion或者timeout(方法内部会不断的获取最新的updateVersion) metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs throw new TimeoutException( String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs)); } // 从缓存中获取最新的cluster信息 cluster = metadata.fetch(); elapsed = time.milliseconds() - begin; // 如果等待时间超过设定的最大等待时长,则抛出异常结束等待 if (elapsed >= maxWaitMs) { throw new TimeoutException(partitionsCount == null ? String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs) : String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", partition, topic, partitionsCount, maxWaitMs)); } metadata.maybeThrowExceptionForTopic(topic); remainingWaitMs = maxWaitMs - elapsed; // 计算可以等待的剩余时间 partitionsCount = cluster.partitionCountForTopic(topic); // 重新获取partition数 } while (partitionsCount == null || (partition != null && partition >= partitionsCount)); return new ClusterAndWaitTime(cluster, elapsed); }
总结一下上面这段代码:
do ....while
循环中进行,直到metadata中含有所需partition的信息,该循环中主要做了一下事情:
metadata.requestUpdate()
方法来获取updateVersion
,即上一次更新成功时的version
,并将needUpdate设为true,强制更新;sender.wakeup()
方法来唤醒Sender
线程,Sender
线程中又会唤醒NetworkClient
线程,在NetworkClient
中会对UpdateMetadataRequest
请求进行操作,待会下面会详细介绍;metadata.awaitUpdate(version, remainingWaitMs)
方法来等待metadata
的更新,通过比较当前的updateVersion
与步骤1中获取的updateVersion来判断是否更新成功;上面提到过需要更新metadata时会调用sender.wakeup()
方法来唤醒Sender线程,Sender线程中又会唤醒NetworkClient
线程,在NetworkClient
中会对UpdateMetadataRequest
请求进行操作,在NetworkClient中真正处理请求的是NetworkClient.poll()方
法,接下来让我们通过分析源码来看下NetworkClient是如何处理请求的。
public List<ClientResponse> poll(long timeout, long now) { // 判断当前NetworkClient是否是处于active状态 ensureActive(); // 判断是否有打断的响应(比如UnsupportedVersionException),如果有的话立即处理 if (!abortedSends.isEmpty()) { // If there are aborted sends because of unsupported version exceptions or disconnects, // handle them immediately without waiting for Selector#poll. List<ClientResponse> responses = new ArrayList<>(); handleAbortedSends(responses); completeResponses(responses); return responses; } // 判断是否需要更新metadata,如果需要则更新,返回值为可以等待更新的时间,待会下面会详细介绍 long metadataTimeout = metadataUpdater.maybeUpdate(now); try { // 进行I/O的读写操作,这里先不展开,有机会再详细介绍 this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); // 处理已经发送完成的request,如果请求不需要response则将response设为null handleCompletedSends(responses, updatedNow); // 处理已经接收完成的response,并根据接收的response更新responses列表,包括metadata的更新 // 待会下面会详细介绍 handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); // 内部会触发强制更新metadata handleConnections(); handleInitiateApiVersionRequests(updatedNow); handleTimedOutRequests(responses, updatedNow); // 内部会触发强制更新metadata completeResponses(responses); return responses; }
接下来看一下metadata是如何更新的
public long maybeUpdate(long now) { // 获取下一次更新的时间,如果needUpdate=true,则返回0,即马上更新;否则返回剩余的过期时间 long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); // 计算需要等待的时间,如果有正在处理的请求,则返回默认的请求间隔时间,否则返回0 long waitForMetadataFetch = hasFetchInProgress() ? defaultRequestTimeoutMs : 0; long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch); // 大于0说明还需等待一段时间才能更新 if (metadataTimeout > 0) { return metadataTimeout; } //获取最小负载节点,概述里已经讲的很清楚了,这里就不在细看. Node node = leastLoadedNode(now); if (node == null) { log.debug("Give up sending metadata request since no node is available"); return reconnectBackoffMs; // 返回等待创建连接所需时间 } return maybeUpdate(now, node); } private long maybeUpdate(long now, Node node) { String nodeConnectionId = node.idString(); // 判断当前node节点是否已经ready,并且支持发送更多请求(即inFlightRequests是否有未处理的request或者给队列是否达到最大size) if (canSendRequest(nodeConnectionId, now)) { // 该请求会更新当前metadata中包含的所有topic Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion(); this.inProgressRequestVersion = requestAndVersion.requestVersion; MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder; log.debug("Sending metadata request {} to node {}", metadataRequest, node); // 调用NetworkClient的doSend方法,发送更新metadata请求 sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now); return defaultRequestTimeoutMs; } // If there's any connection establishment underway, wait until it completes. This prevents // the client from unnecessarily connecting to additional nodes while a previous connection // attempt has not been completed. if (isAnyNodeConnecting()) { // Strictly the timeout we should return here is "connect timeout", but as we don't // have such application level configuration, using reconnect backoff instead. return reconnectBackoffMs; } if (connectionStates.canConnect(nodeConnectionId, now)) { // We don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node); initiateConnect(node, now); return reconnectBackoffMs; }
总结一下上面几个方法所做的事情:
该博文的源码是基于Kafka 2.3.0
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。