当前位置:   article > 正文

Kafka生产者源码解析(三)——Sender(1)

Kafka生产者源码解析(三)——Sender(1)

private long sendProducerData(long now) {

//获取元数据集群信息

Cluster cluster = metadata.fetch();

//筛选出可以向哪些Node节点发送消息

RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

//如果存在某些主题对应的节点找不到的情况,则标记需要更新Kafka集群信息

if (!result.unknownLeaderTopics.isEmpty()) {

……………………

this.metadata.requestUpdate();

}

//遍历之前筛选好的Node节点

Iterator iter = result.readyNodes.iterator();

while (iter.hasNext()) {

Node node = iter.next();

//调用client.ready()方法检查每个节点的网络I/O是否符合发送消息的条件,将不符合的节点移除

if (!this.client.ready(node, now)) {

iter.remove();

notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));

}

}

//把<分区, 消息队列>的映射关系转换成<节点, 消息队列>的映射关系

Map<Integer, List> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);

……………………

//构造发送消息请求

sendProduceRequests(batches, now);

return pollTimeout;

}

我们需要重点关心上面方法中的sendProduceRequests()方法,追溯其内部方法sendProduceRequest,源码如下,发现最终是调用client.newClientRequest()方法进行构造ClientRequest请求,该构造方法很简单,追踪下去只是简单的属性赋值,所以这里不再深究该构造方法。

private void sendProduceRequest(long now, int destination, short acks, int timeout, List batches) {

………………

//得到要发送的brokerId

String nodeId = Integer.toString(destination);

//构造ClientRequest请求

ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,

requestTimeoutMs, callback);

//将ClientRequest请求写进KafkaChannel的send属性,并且为KafkaChannel注册写入事件

client.send(clientRequest, now);

………………

}

ClientRequest请求构造完成之后,我们还需要调用client.send()方法将ClientRequest请求写进KafkaChannel的send属性,并且为KafkaChannel注册写入事件,下面来看下这是怎么实现的。

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {

//要发送的目标brokerID

String destination = clientRequest.destination();

//构造请求头

RequestHeader header = clientRequest.makeHeader(request.version());

………………

//将目标brokerID和请求头构成NetworkSend

Send send = request.toSend(destination, header);

//构造InFlightRequest

InFlightRequest inFlightRequest = new InFlightRequest(

clientRequest,

header,

isInternalRequest,

request,

send,

now);

//追加入inFlightRequests队列

this.inFlightRequests.add(inFlightRequest);

//覆盖KafkaChannel的send字段,并为KafkaChannel注册写入事件

selector.send(send);

}

首先我们利用之前构造好的ClientRequest请求得到目标brokerId和请求头,再由它们构成一个NetworkSend对象,这个类是对ByteBuffer的封装,接着构造了InFlightRequest并放入了inFlightRequests队列中,inFlightRequests队列的作用是缓存已经发出去但没有收到响应的ClientRequest,但是到目前为止仍然没有看到我们想要看到的内容,不急,继续看 selector.send()方法。

Selector属于网络I/O层,它使用NIO异步非阻塞模式实现网络I/O操作,到这一步已经完成将ClientRequest请求写进KafkaChannel的send属性,并且为KafkaChannel注册写入事件,只有注册了写入事件,Selector才会把消息发送出去,sendProducerData方法的流程也就到此为止了。

二、client.poll()

在此之前,sendProducerData()已经完成了ClientRequest请求的构造以及请求的准备工作,那么接下来就要真正的将这些请求通过网络发送出去了。下面来看client.poll()方法源码。

public List poll(long timeout, long now) {

…………………

try {

//网络I/O

this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));

} catch (IOException e) {

log.error(“Unexpected error during I/O”, e);

}

……………………

}

果不其然,最终还是看到了Selector的出现!这应该是意料中的事情,因为刚刚我们就说了Selector属于网络I/O层,那么我们要将消息通过网络发送出去那一定会用到它。进入到selector.poll()方法内部会看到一句代码如下:

pollSelectionKeys(readyKeys, false, endSelect)

它是执行网络I/O操作的核心,它需要传入一个SelectionKey集合,用于后面获取KafkaChannel,下面是该方法的源码。

void pollSelectionKeys(Set selectionKeys,

boolean isImmediatelyConnected,

long currentTimeNanos) {

for (SelectionKey key : determineHandlingOrder(selectionKeys)) {

//通过SelectionKey获取到KafkaChannel

KafkaChannel channel = channel(key);

long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;

boolean sendFailed = false;

//一次性注册所有的连接

sensors.maybeRegisterConnectionMetrics(channel.id());

if (idleExpiryManager != null)

idleExpiryManager.update(channel.id(), currentTimeNanos);

try {

//如果connect返回true或OP_CONNECT

if (isImmediatelyConnected || key.isConnectable()) {

//finishConnect()先检测socketChannel是否建立成功,然后会注册读事件

if (channel.finishConnect()) {

//加入已连接的集合

this.connected.add(channel.id());

this.sensors.connectionCreated.record();

SocketChannel socketChannel = (SocketChannel) key.channel();

log.debug(“Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}”,

socketChannel.socket().getReceiveBufferSize(),

socketChannel.socket().getSendBufferSize(),

socketChannel.socket().getSoTimeout(),

channel.id());

} else {

continue;

}

}

……………………

//处理读事件

attemptRead(key, channel);

//如果还有数据未读取完,把selectionKeys缓存在keysWithBufferedRead中

if (channel.hasBytesBuffered()) {

keysWithBufferedRead.add(key);

}

if (channel.ready() && key.isWritable() && !channel.maybeBeginClientReauthentication(

() -> channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos)) {

Send send;

try {

//处理写事件

send = channel.write();

} catch (Exception e) {

sendFailed = true;

throw e;

}

if (send != null) {

this.completedSends.add(send);

this.sensors.recordBytesSent(channel.id(), send.size());

总结

虽然面试套路众多,但对于技术面试来说,主要还是考察一个人的技术能力和沟通能力。不同类型的面试官根据自身的理解问的问题也不尽相同,没有规律可循。

上面提到的关于这些JAVA基础、三大框架、项目经验、并发编程、JVM及调优、网络、设计模式、spring+mybatis源码解读、Mysql调优、分布式监控、消息队列、分布式存储等等面试题笔记及资料

有些面试官喜欢问自己擅长的问题,比如在实际编程中遇到的或者他自己一直在琢磨的这方面的问题,还有些面试官,尤其是大厂的比如 BAT 的面试官喜欢问面试者认为自己擅长的,然后通过提问的方式深挖细节,刨根到底。

要还是考察一个人的技术能力和沟通能力。不同类型的面试官根据自身的理解问的问题也不尽相同,没有规律可循。

[外链图片转存中…(img-728Dxx7p-1714411028795)]

[外链图片转存中…(img-ON5aWA8X-1714411028796)]

上面提到的关于这些JAVA基础、三大框架、项目经验、并发编程、JVM及调优、网络、设计模式、spring+mybatis源码解读、Mysql调优、分布式监控、消息队列、分布式存储等等面试题笔记及资料

有些面试官喜欢问自己擅长的问题,比如在实际编程中遇到的或者他自己一直在琢磨的这方面的问题,还有些面试官,尤其是大厂的比如 BAT 的面试官喜欢问面试者认为自己擅长的,然后通过提问的方式深挖细节,刨根到底。

本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/618344
推荐阅读
相关标签
  

闽ICP备14008679号