赞
踩
private long sendProducerData(long now) {
//获取元数据集群信息
Cluster cluster = metadata.fetch();
//筛选出可以向哪些Node节点发送消息
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
//如果存在某些主题对应的节点找不到的情况,则标记需要更新Kafka集群信息
if (!result.unknownLeaderTopics.isEmpty()) {
……………………
this.metadata.requestUpdate();
}
//遍历之前筛选好的Node节点
Iterator iter = result.readyNodes.iterator();
while (iter.hasNext()) {
Node node = iter.next();
//调用client.ready()方法检查每个节点的网络I/O是否符合发送消息的条件,将不符合的节点移除
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
//把<分区, 消息队列>的映射关系转换成<节点, 消息队列>的映射关系
Map<Integer, List> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
……………………
//构造发送消息请求
sendProduceRequests(batches, now);
return pollTimeout;
}
我们需要重点关心上面方法中的sendProduceRequests()方法,追溯其内部方法sendProduceRequest,源码如下,发现最终是调用client.newClientRequest()方法进行构造ClientRequest请求,该构造方法很简单,追踪下去只是简单的属性赋值,所以这里不再深究该构造方法。
private void sendProduceRequest(long now, int destination, short acks, int timeout, List batches) {
………………
//得到要发送的brokerId
String nodeId = Integer.toString(destination);
//构造ClientRequest请求
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
//将ClientRequest请求写进KafkaChannel的send属性,并且为KafkaChannel注册写入事件
client.send(clientRequest, now);
………………
}
ClientRequest请求构造完成之后,我们还需要调用client.send()方法将ClientRequest请求写进KafkaChannel的send属性,并且为KafkaChannel注册写入事件,下面来看下这是怎么实现的。
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
//要发送的目标brokerID
String destination = clientRequest.destination();
//构造请求头
RequestHeader header = clientRequest.makeHeader(request.version());
………………
//将目标brokerID和请求头构成NetworkSend
Send send = request.toSend(destination, header);
//构造InFlightRequest
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
//追加入inFlightRequests队列
this.inFlightRequests.add(inFlightRequest);
//覆盖KafkaChannel的send字段,并为KafkaChannel注册写入事件
selector.send(send);
}
首先我们利用之前构造好的ClientRequest请求得到目标brokerId和请求头,再由它们构成一个NetworkSend对象,这个类是对ByteBuffer的封装,接着构造了InFlightRequest并放入了inFlightRequests队列中,inFlightRequests队列的作用是缓存已经发出去但没有收到响应的ClientRequest,但是到目前为止仍然没有看到我们想要看到的内容,不急,继续看 selector.send()方法。
Selector属于网络I/O层,它使用NIO异步非阻塞模式实现网络I/O操作,到这一步已经完成将ClientRequest请求写进KafkaChannel的send属性,并且为KafkaChannel注册写入事件,只有注册了写入事件,Selector才会把消息发送出去,sendProducerData方法的流程也就到此为止了。
在此之前,sendProducerData()已经完成了ClientRequest请求的构造以及请求的准备工作,那么接下来就要真正的将这些请求通过网络发送出去了。下面来看client.poll()方法源码。
public List poll(long timeout, long now) {
…………………
try {
//网络I/O
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error(“Unexpected error during I/O”, e);
}
……………………
}
果不其然,最终还是看到了Selector的出现!这应该是意料中的事情,因为刚刚我们就说了Selector属于网络I/O层,那么我们要将消息通过网络发送出去那一定会用到它。进入到selector.poll()方法内部会看到一句代码如下:
pollSelectionKeys(readyKeys, false, endSelect)
它是执行网络I/O操作的核心,它需要传入一个SelectionKey集合,用于后面获取KafkaChannel,下面是该方法的源码。
void pollSelectionKeys(Set selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
//通过SelectionKey获取到KafkaChannel
KafkaChannel channel = channel(key);
long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
boolean sendFailed = false;
//一次性注册所有的连接
sensors.maybeRegisterConnectionMetrics(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
try {
//如果connect返回true或OP_CONNECT
if (isImmediatelyConnected || key.isConnectable()) {
//finishConnect()先检测socketChannel是否建立成功,然后会注册读事件
if (channel.finishConnect()) {
//加入已连接的集合
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug(“Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}”,
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
channel.id());
} else {
continue;
}
}
……………………
//处理读事件
attemptRead(key, channel);
//如果还有数据未读取完,把selectionKeys缓存在keysWithBufferedRead中
if (channel.hasBytesBuffered()) {
keysWithBufferedRead.add(key);
}
if (channel.ready() && key.isWritable() && !channel.maybeBeginClientReauthentication(
() -> channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos)) {
Send send;
try {
//处理写事件
send = channel.write();
} catch (Exception e) {
sendFailed = true;
throw e;
}
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
虽然面试套路众多,但对于技术面试来说,主要还是考察一个人的技术能力和沟通能力。不同类型的面试官根据自身的理解问的问题也不尽相同,没有规律可循。
上面提到的关于这些JAVA基础、三大框架、项目经验、并发编程、JVM及调优、网络、设计模式、spring+mybatis源码解读、Mysql调优、分布式监控、消息队列、分布式存储等等面试题笔记及资料
有些面试官喜欢问自己擅长的问题,比如在实际编程中遇到的或者他自己一直在琢磨的这方面的问题,还有些面试官,尤其是大厂的比如 BAT 的面试官喜欢问面试者认为自己擅长的,然后通过提问的方式深挖细节,刨根到底。
要还是考察一个人的技术能力和沟通能力。不同类型的面试官根据自身的理解问的问题也不尽相同,没有规律可循。
[外链图片转存中…(img-728Dxx7p-1714411028795)]
[外链图片转存中…(img-ON5aWA8X-1714411028796)]
上面提到的关于这些JAVA基础、三大框架、项目经验、并发编程、JVM及调优、网络、设计模式、spring+mybatis源码解读、Mysql调优、分布式监控、消息队列、分布式存储等等面试题笔记及资料
有些面试官喜欢问自己擅长的问题,比如在实际编程中遇到的或者他自己一直在琢磨的这方面的问题,还有些面试官,尤其是大厂的比如 BAT 的面试官喜欢问面试者认为自己擅长的,然后通过提问的方式深挖细节,刨根到底。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。