赞
踩
这里用kakfa 3.5版本做源码演示
首先看一下kafka-server-start.sh
中的最后一行
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
我们知道了执行的是core/src/main/scala/kafka/Kafka.scala
下的main方法
def main(args: Array[String]): Unit = { try { //获得配置文件中的参数 val serverProps = getPropsFromArgs(args) //build server,非常重要,因为你要确定到底执行的是哪种server val server = buildServer(serverProps) //省略干扰代码。。。。 //执行server的 startup try server.startup() //省略干扰代码。。。。 } //生成server private def buildServer(props: Properties): Server = { val config = KafkaConfig.fromProps(props, false) if (config.requiresZookeeper) { //是否在properties中有process.roles=broker,controller配置 //没有则构造KafkaServer new KafkaServer( config, Time.SYSTEM, threadNamePrefix = None, enableForwarding = false ) } else { //存在则构造KafkaRaftServer new KafkaRaftServer( config, Time.SYSTEM, threadNamePrefix = None ) } }
其中config.requiresZookeeper
就是判断/config/kraft/server.properties
中是否有下面的配置
参考Kafka2.8无Zookeeper模式下集群部署
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@master:9093,2@slave1:9093,3@slave2:9093
每一种server都要实现下面这三个接口
trait Server {
def startup(): Unit
def shutdown(): Unit
def awaitShutdown(): Unit
}
初始化调用的是kafkaServer.scala
这个类中的startup
方法
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
* 启动用于启动 Kafka 服务器的单个实例的 API。实例化 LogManager、SocketServer 和请求处理程序 - KafkaRequestHandlers
*/
override def startup(): Unit = {
//省略代码。。。。。
//初始化逻辑
}
初始化调用的是KafkaRaftServer.scala
//raft启动函数
override def startup(): Unit = {
Mx4jLoader.maybeLoad()
//这行代码使用foreach方法对controller进行迭代,如果controller不为None,则调用其startup方法。这里使用了占位符语法_.startup(),表示对每个元素执行startup方法。
//ControllerServer 对象,当节点的配置 process.roles 中指定了 controller 角色时才会创建,处理元数据类请求,包括 topic 创建删除等
controller.foreach(_.startup())
//broker.foreach(_.startup()):与前一行代码类似,这行代码对broker进行迭代,并调用其startup方法。
//BrokerServer 对象,当节点的配置 process.roles 中指定了 broker 角色时才会创建,处理消息数据类请求,例如消息的生产消费等
broker.foreach(_.startup())
AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
info(KafkaBroker.STARTED_MESSAGE)
}
controller.foreach(_.startup())
遍历调用的是ControllerServer.scala
类中的startup方法,主要是初始化控制器
broker.foreach(_.startup())
遍历调用的是BrokerServer.scala
类中的startup方法,主要是初始化每一个broker
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。