赞
踩
图片挂了,首发在这里 https://juejin.im/post/5e0f39945188253a941374a8,需要阅读的朋友可以跳转过去
我们上一篇的时候说了这篇会把Kafka的网络模型给梳理一下,这个和 NIO 的那篇关系就非常非常大了,所以如果对这块不了解的朋友可以跳转过去瞧瞧,起码对你理解起来会有一定的帮助
那我们就接着上一篇的流程继续,标题中的RecordAccumulator很快就讲到
上一讲我们虽然码了大概有7600多字,可是其实根本就没跳出第一步,所以这东西真的工程量挺大的
现在让我们用一个流程图去把那7000多字给浓缩一下吧
首先我们的 KafkaProducer 作为主线程,它要去发送消息给我们的 Kafka 集群,可是发送给集群需要什么?需要元数据呀!不知道元数据,那我就不知道leader partition是哪个了
此时主线程就会从 Metadata 那里去找看有没有元数据的缓存,也就是是否曾经拉取过了,但是我们假设现在我们是第一次启动,所以明显是没有的,那没有元数据的情况
此时我们要拉取一个version的值,并把一个 needUpdate 参数修改为true,然后去唤醒 Sender 线程去拉取元数据,而这需要通过一个网络组件 NetworkClient 和Broker通信。
在此同时,主线程会进行阻塞,等待元数据的到来。而当元数据拉取完成后,会通过 notifyAll 唤醒主线程,返回阻塞等待的时间。
好的,图中已经标好了逻辑顺序,其实大致的步骤就是这样,至于更多的细节,那就跳转到上一篇:Kafka源码篇 --- 小白也能看懂的Producer的初始化及元数据获取流程,在本地的idea导入源码去跟着走一遍吧
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。