赞
踩
Kafka源码包含多个模块,每个模块负责不同的功能。以下是一些核心模块及其功能的概述:
服务端源码 :实现Kafka Broker的核心功能,包括日志存储、控制器、协调器、元数据管理及状态机管理、延迟机制、消费者组管理、高并发网络架构模型实现等。
Java客户端源码 :实现了Producer和Consumer与Broker的交互机制,以及通用组件支撑代码。
Connect源码 :用来构建异构数据双向流式同步服务。
Stream源码 :用来实现实时流处理相关功能。
Raft源码 :实现了Raft一致性协议。
Admin模块 :Kafka的管理员模块,操作和管理其topic,partition相关,包含创建,删除topic,或者拓展分区等。
Api模块 :负责数据交互,客户端与服务端交互数据的编码与解码。
Client模块 :包含Producer读取Kafka Broker元数据信息的类,如topic和分区,以及leader。
Cluster模块 :包含Broker、Cluster、Partition、Replica等实体类。
Common模块 :包含各种异常类以及错误验证。
Consumer模块 :消费者处理模块,负责客户端消费者数据和逻辑处理。
Controller模块 :负责中央控制器的选举,分区的Leader选举,Replica的分配或重新分配,分区和副本的扩容等。
Coordinator模块 :负责管理部分consumer group和他们的offset。
Javaapi模块 :提供Java语言的Producer和Consumer的API接口。
Log模块 :负责Kafka文件存储,读写所有Topic消息数据。
Message模块 :封装多条数据组成数据集或压缩数据集。
Metrics模块 :负责内部状态监控。
Network模块 :处理客户端连接,网络事件模块。
Producer模块 :生产者细节实现,包括同步和异步消息发送。
Security模块 :负责Kafka的安全验证和管理。
Serializer模块 :序列化和反序列化消息内容。
Server模块 :涉及Leader和Offset的checkpoint,动态配置,延时创建和删除Topic,Leader选举,Admin和Replica管理等。
Tools模块 :包含多种工具,如导出consumer offset值,LogSegments信息,Topic的log位置信息,Zookeeper上的offset值等。
Utils模块 :包含各种工具类,如Json,ZkUtils,线程池工具类,KafkaScheduler公共调度器类等。
这些模块共同构成了Kafka的整体架构,使其能够提供高吞吐量、高可用性的消息队列服务。
Kafka 的 Controller 是 Kafka 集群中的核心组件,主要负责管理和协调整个集群的状态。以下是对 Kafka Controller 源码的详细解析:
控制器组件(Controller)在 Kafka 集群中扮演着关键角色。它通过与 Apache ZooKeeper 交互来管理和协调整个 Kafka 集群。集群中任意一台 Broker 都能充当控制器的角色,但在同一时刻只能有一个 Broker 成为控制器。
控制器在初始化时会从 ZooKeeper 读取元数据并填充到自己的缓存中。这些数据使得控制器能够对外提供数据服务,主要是对其他 Broker。
Kafka 提供了控制器的故障转移功能。当当前的控制器宕机或意外终止时,Kafka 能够快速感知并启用备用控制器来代替之前失败的控制器。这个过程是自动完成的,无需手动干预。
在 Kafka 0.11 版本之前,控制器的设计较为复杂,代码也较为混乱。社区在 0.11 版本中重构了控制器的底层设计,将多线程方案改为单线程加事件队列的方案。这种设计减少了线程同步机制的使用,提高了处理速度。
KafkaController 是 Kafka 集群的控制管理模块,主要通过向 ZooKeeper 注册各种监听事件来管理集群节点、分区的 leader 选举、再平衡等问题。其主要组成部分包括:
KafkaController 在初始化时会设置 Startup 事件,并启动事件管理器。事件管理器会调用 KafkaController 的 process() 方法处理 Startup 事件,进行控制器的选举等初始化处理。
控制器的主要职责包括:
Kafka 的元数据信息持久化在 ZooKeeper 中,但为了提高性能,控制器使用进程内的缓存来存储元数据信息。
Kafka 提供了一个名为 activeController 的 JMX 指标,用于实时监控控制器的存活状态。这个指标在运维操作中非常关键。
Kafka 使用 epoch number 来处理脑裂问题。每个新选择的 Controller 通过 ZooKeeper 的条件递增操作获得一个新的、更大的 epoch number。Broker 根据最大的 epoch number 来区分最新的 Controller,从而避免脑裂问题。
KafkaServer.startup()启动时会调用kafkaController.startup()尝试启动控制器:
//控制器,集群只有一个broker会竞选为控制器
/* start kafka controller */
kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix)
kafkaController.startup()
kafkaController.startup():
/** * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker * is the controller. It merely registers the session expiration listener and starts the controller leader * elector */ def startup() = { //将Startup事件放入事件队列 eventManager.put(Startup) //启动事件队列处理线程,会处理Startup事件 eventManager.start() } //controller的Startup事件 case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { //注册一个监听器,当 ZooKeeper 会话超时时,会触发相应的回调。 registerSessionExpirationListener() //注册控制器变更监听器,以便在控制器角色发生变化时(例如,当前控制器失败,需要选举新的控制器)能够及时响应。 registerControllerChangeListener() //尝试选举为集群的控制器 elect() } } class ControllerChangeListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkDataListener { //zk的/controller节点数据变更 override def handleDataChange(dataPath: String, data: Any): Unit = { eventManager.put(controller.ControllerChange(KafkaController.parseControllerId(data.toString))) } //zk的/controler节点被删除,则触发重新选举 override def handleDataDeleted(dataPath: String): Unit = { eventManager.put(controller.Reelect) } } //zk的/controller节点数据变更事件 case class ControllerChange(newControllerId: Int) extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { //若这个broker之前为控制器,wasActiveBeforeChange为true val wasActiveBeforeChange = isActive activeControllerId = newControllerId //若当前broker之前是控制前,当前已经不是了 if (wasActiveBeforeChange && !isActive) { //调用控制器的"辞职"(即关闭)方法 onControllerResignation() } } } //控制器重新选举事件 case object Reelect extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { //先前broker是否为控制器 val wasActiveBeforeChange = isActive //更新contrllerId为zk中/controller节点的数据 activeControllerId = getControllerID() //若该broker之前是控制器,而此次更新后不再是了 if (wasActiveBeforeChange && !isActive) { //调用控制器“辞职”(关闭)方法 onControllerResignation() } //再次尝试竞选控制器 elect() } } //尝试竞选为控制器的方法 def elect(): Unit = { val timestamp = time.milliseconds val electString = ZkUtils.controllerZkData(config.brokerId, timestamp) //获取zk的/controller节点数据,并解析出控制器的brokerId activeControllerId = getControllerID() /* * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, * it's possible that the controller has already been elected when we get here. This check will prevent the following * createEphemeralPath method from getting into an infinite loop if this broker is already the controller. */ //说明集群已存在controler,直接返回 if (activeControllerId != -1) { debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId)) return } //否则抢占式方式尝试在zk中创建/controller的临时节点 try { val zkCheckedEphemeral = new ZKCheckedEphemeral(ZkUtils.ControllerPath, electString, controllerContext.zkUtils.zkConnection.getZookeeper, controllerContext.zkUtils.isSecure) zkCheckedEphemeral.create() //未抛出异常,竞选成功 info(config.brokerId + " successfully elected as the controller") //设置为当前的broker.id activeControllerId = config.brokerId //控制器初始化方法 onControllerFailover() } catch { case _: ZkNodeExistsException => // If someone else has written the path, then activeControllerId = getControllerID if (activeControllerId != -1) debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId)) else warn("A controller has been elected but just resigned, this will result in another round of election") case e2: Throwable => error("Error while electing or becoming controller on broker %d".format(config.brokerId), e2) triggerControllerMove() } } //onControllerResignation 方法的主要作用是清理控制器相关的资源和状态,确保在控制器角色变更时能够平滑过渡。 /** * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is * required to clean up internal controller data structures */ def onControllerResignation() { debug("Resigning") // de-register listeners //取消注册的各种监听器。 //取消 ISR 列表变更监听器的注册。 deregisterIsrChangeNotificationListener() //取消分区重分配监听器的注册。 deregisterPartitionReassignmentListener() //取消优先副本选举监听器的注册 deregisterPreferredReplicaElectionListener() //取消日志目录变更监听器的注册 deregisterLogDirEventNotificationListener() //关闭主题删除管理器。 // reset topic deletion manager topicDeletionManager.reset() //关闭分区leader负载均衡定时器 // shutdown leader rebalance scheduler kafkaScheduler.shutdown() offlinePartitionCount = 0 preferredReplicaImbalanceCount = 0 globalTopicCount = 0 globalPartitionCount = 0 // de-register partition ISR listener for on-going partition reassignment task deregisterPartitionReassignmentIsrChangeListeners() //关闭状态机 // shutdown partition state machine partitionStateMachine.shutdown() deregisterTopicChangeListener() partitionModificationsListeners.keys.foreach(deregisterPartitionModificationsListener) deregisterTopicDeletionListener() // shutdown replica state machine replicaStateMachine.shutdown() deregisterBrokerChangeListener() //重置控制器上下文 resetControllerContext() info("Resigned") } /** * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller. * It does the following things on the become-controller state change - * 1. Register controller epoch changed listener * 2. Increments the controller epoch * 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and * leaders for all existing partitions. * 4. Starts the controller's channel manager * 5. Starts the replica state machine * 6. Starts the partition state machine * If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller. * This ensures another controller election will be triggered and there will always be an actively serving controller */ //broker成功竞选为控制器后,会调用此方法进行一些初始化操作 def onControllerFailover() { info("Starting become controller state transition") //从zk的/controller_epoch节点中读取数据,并赋值给epoch和epochZkVersion字段 readControllerEpochFromZookeeper() //将epoch加1,并更新至zk中的/controller_epoch节点,并重新赋值给epoch和epochZkVersion字段 incrementControllerEpoch() LogDirUtils.deleteLogDirEvents(zkUtils) // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks //监听/admin/reassign_partitions节点的数据变化,这个节点用于存储分区副本迁移的计划和状态。 /* 节点的json数据示例: { "version": 1, "partitions": [ { "topic": "my-topic", "partition": 0, "replicas": [1, 2], "log_dirs": ["/kafka-logs-1", "/kafka-logs-2"] } ] } */ registerPartitionReassignmentListener() //监控路径【/isr_change_notification】,isr 变动监听 registerIsrChangeNotificationListener() //监听路径【/admin/preferred_replica_election】,最优 leader 选举 registerPreferredReplicaElectionListener() //注册/brokers/topics下的主题变化监听器 registerTopicChangeListener() //注册/admin/delete_topics下的主题删除监听器 registerTopicDeletionListener() //注册/brokers/ids下的broker变化监听器 registerBrokerChangeListener() registerLogDirEventNotificationListener() //初始化 Controller 的上下文信息,更新 Controller 的相关缓存信息、并启动 ControllerChannelManager 等; initializeControllerContext() val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress() topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion) // We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines // are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and // partitionStateMachine.startup(). //向所有 alive 的 broker 发送 Update-Metadata 请求,broker 通过这个请求获取当前集群中 alive 的 broker 列表 sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) //在 KafkaController 中 //有两个状态机:分区状态机和副本状态机; //一个管理器:Channel 管理器,负责管理所有的 Broker 通信; //相关缓存:Partition 信息、Topic 信息、broker id 信息等; //四种 leader 选举机制:分别是用 leader offline、broker 掉线、partition reassign、最优 leader 选举时触发; //启动副本状态机,初始化所有 Replica 的状态信息,如果 Replica 所在节点是 alive 的,那么状态更新为 OnlineReplica, 否则更新为 ReplicaDeletionIneligible; replicaStateMachine.startup() //启动分区状态机,初始化所有 Partition 的状态信息,如果 leader 所在 broker 是 alive 的,那么状态更新为 OnlinePartition,否则更新为 OfflinePartition partitionStateMachine.startup() // register the partition change listeners for all existing topics on failover //为当前所有 topic 注册一个 PartitionModificationsListener 监听器,监听所有 Topic 分区数的变化; controllerContext.allTopics.foreach(topic => registerPartitionModificationsListener(topic)) info(s"Ready to serve as the new controller with epoch $epoch") //触发一次分区副本迁移的操作 maybeTriggerPartitionReassignment() topicDeletionManager.tryTopicDeletion() val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections() //触发一次最优 leader 选举操作 onPreferredReplicaElection(pendingPreferredReplicaElections) info("Starting the controller scheduler") //初始化定时器 kafkaScheduler.startup() //如果开启了自动 leader 均衡,启动自动 leader 均衡线程,它会根据配置的信息定时运行。 if (config.autoLeaderRebalanceEnable) { scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS) } }
ControllerContext类用于存储kafka控制器上下文信息,源码如下:
class ControllerContext(val zkUtils: ZkUtils) { /** * Kafka的ControllerContext是Kafka集群中Controller组件的核心数据结构,它负责存储和管理集群的元数据信息。以下是对ControllerContext源码的详细解析: * * ControllerContext简介: * ControllerContext是Kafka集群中Controller组件的核心数据结构,负责存储和管理集群的元数据信息。它包含了集群中所有Broker的信息、主题信息、分区信息、副本信息等。ControllerContext通过与Zookeeper的交互来获取和更新这些元数据信息。 * * ControllerContext的主要字段: * * stats:存储Controller的统计信息,如Unclean Leader选举次数等。 * offlinePartitionCount:统计集群中所有离线或不可用状态的主题分区数量。 * shuttingDownBrokerIds:保存所有正在关闭中的Broker的ID列表。 * liveBrokers:当前运行中的Broker对象列表。 * liveBrokerEpochs:运行中Broker的Epoch列表。 * epoch:Controller当前的Epoch值。 * epochZkVersion:Controller对应ZooKeeper节点的Epoch值。 * allTopics:集群主题列表。 * partitionAssignments:主题分区的副本列表。 * partitionLeadershipInfo:主题分区的Leader/ISR副本信息。 * partitionsBeingReassigned:正处于副本重分配过程的主题分区列表。 * partitionStates:主题分区状态列表。 * replicaStates:主题分区的副本状态列表。 * replicasOnOfflineDirs:不可用磁盘路径上的副本列表。 * topicsToBeDeleted:待删除主题列表。 * topicsWithDeletionStarted:已开启删除的主题列表。 * topicsIneligibleForDeletion:暂时无法执行删除的主题列表。 * ControllerContext的初始化: * ControllerContext在KafkaController启动时初始化。它会从Zookeeper中读取集群的元数据信息,并在Controller的生命周期中维护这些信息。初始化过程中,ControllerContext会读取所有Broker的信息、主题信息、分区信息等,并存储在相应的字段中。 * * ControllerContext的更新机制: * ControllerContext通过Zookeeper的Watcher机制来监听集群中的变化。当集群中的Broker上线或下线、主题创建或删除、分区分配或重新分配时,Zookeeper会触发相应的Watcher,ControllerContext会根据这些变化更新其内部的元数据信息。 * * ControllerContext的作用: * * 集群元数据管理:ControllerContext负责存储和管理集群的元数据信息,包括Broker信息、主题信息、分区信息、副本信息等。 * 协调集群操作:ControllerContext通过与Zookeeper的交互来协调集群中的各种操作,如Broker的上线和下线、主题的创建和删除、分区的分配和重新分配等。 * 状态机管理:ControllerContext管理分区状态机和副本状态机,负责处理分区和副本的状态变化。 * ControllerContext的监控: * ControllerContext提供了一些监控指标,如activeController,用于实时监控控制器的存活状态。这些监控指标在实际运维操作中非常关键,可以帮助及时发现和处理集群中的问题。 * * 通过以上解析,我们可以看到ControllerContext在Kafka集群中扮演着至关重要的角色,它通过管理元数据信息和协调集群操作,确保了Kafka集群的稳定运行和高效管理。 */ //存储Controller的统计信息,如Unclean Leader选举次数等 val stats = new ControllerStats //控制器通道管理器,负责管理与 Broker 的通信。 var controllerChannelManager: ControllerChannelManager = null //保存所有正在关闭中的Broker的ID列表 var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty //控制器纪元(epoch),/controller_epoch节点的数据,用于标识控制器的状态版本。 var epoch: Int = KafkaController.InitialControllerEpoch - 1 // /controller_epoch节点的zk版本号。 var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1 //存储集群中所有主题的集合。 var allTopics: Set[String] = Set.empty //集群的分区副本分配关系 var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty //存储分区的leader副本和isr列表信息 var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicAndPartition]] = mutable.HashMap.empty //所有潜在的broker,包含运行中和关闭中的 private var liveBrokersUnderlying: Set[Broker] = Set.empty private var liveBrokerIdsUnderlying: Set[Int] = Set.empty // setter def liveBrokers_=(brokers: Set[Broker]) { liveBrokersUnderlying = brokers liveBrokerIdsUnderlying = liveBrokersUnderlying.map(_.id) } // getter //当前运行中的Broker对象列表 def liveBrokers = liveBrokersUnderlying.filter(broker => !shuttingDownBrokerIds.contains(broker.id)) def liveBrokerIds = liveBrokerIdsUnderlying -- shuttingDownBrokerIds def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying def liveOrShuttingDownBrokers = liveBrokersUnderlying def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = { partitionReplicaAssignment.collect { case (topicAndPartition, replicas) if replicas.contains(brokerId) => topicAndPartition }.toSet } def isReplicaOnline(brokerId: Int, topicAndPartition: TopicAndPartition, includeShuttingDownBrokers: Boolean = false): Boolean = { val brokerOnline = { if (includeShuttingDownBrokers) liveOrShuttingDownBrokerIds.contains(brokerId) else liveBrokerIds.contains(brokerId) } brokerOnline && !replicasOnOfflineDirs.getOrElse(brokerId, Set.empty).contains(topicAndPartition) } def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = { brokerIds.flatMap { brokerId => partitionReplicaAssignment.collect { case (topicAndPartition, replicas) if replicas.contains(brokerId) => PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId) } }.toSet } def replicasForTopic(topic: String): Set[PartitionAndReplica] = { partitionReplicaAssignment .filter { case (topicAndPartition, _) => topicAndPartition.topic == topic } .flatMap { case (topicAndPartition, replicas) => replicas.map { r => PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r) } }.toSet } def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = partitionReplicaAssignment.keySet.filter(topicAndPartition => topicAndPartition.topic == topic) def allLiveReplicas(): Set[PartitionAndReplica] = { replicasOnBrokers(liveBrokerIds).filter { partitionAndReplica => isReplicaOnline(partitionAndReplica.replica, TopicAndPartition(partitionAndReplica.topic, partitionAndReplica.partition)) } } def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = { partitions.flatMap { p => val replicas = partitionReplicaAssignment(p) replicas.map(r => PartitionAndReplica(p.topic, p.partition, r)) } } def removeTopic(topic: String) = { partitionLeadershipInfo = partitionLeadershipInfo.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic } partitionReplicaAssignment = partitionReplicaAssignment.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic } allTopics -= topic } } /** * Returns true if this broker is the current controller. */ def isActive: Boolean = activeControllerId == config.brokerId
ControllerChannelManager源码:
//在 Kafka 中,ControllerChannelManager 是一个关键组件,负责管理控制器与其它 Kafka 代理(Broker)之间的通信。控制器(Controller)是 Kafka 集群中的一个特殊角色,负责管理集群的元数据和协调集群的操作,如分区分配、副本同步等。 // //ControllerChannelManager 的主要功能包括: // //管理通信通道:维护控制器与各个 Broker 之间的网络连接和通信通道。 //发送控制命令:向 Broker 发送控制命令,如分区重分配、副本同步等。 //接收响应:接收 Broker 发送的响应和状态更新。 //处理失败和重试:处理与 Broker 通信过程中可能出现的失败,并进行重试。 //监控和统计:监控通信状态和性能,收集统计信息。 //ControllerChannelManager 在初始化时,会为集群中的每个节点初始化一个 ControllerBrokerStateInfo 对象,该对象包含四个部分: // //NetworkClient:网络连接对象; //Node:节点信息; //BlockingQueue:请求队列; //RequestSendThread:请求的发送线程。 //其具体实现如下所示: class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, stateChangeLogger: StateChangeLogger, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { import ControllerChannelManager._ protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] private val brokerLock = new Object this.logIdent = "[Channel manager on controller " + config.brokerId + "]: " newGauge( "TotalQueueSize", new Gauge[Int] { def value: Int = brokerLock synchronized { brokerStateInfo.values.iterator.map(_.messageQueue.size).sum } } ) controllerContext.liveBrokers.foreach(addNewBroker) def startup() = { brokerLock synchronized { brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1)) } } def shutdown() = { brokerLock synchronized { brokerStateInfo.values.foreach(removeExistingBroker) } } //向 broker 发送请求(并没有真正发送,只是添加到对应的 queue 中), 请求的的发送是在 每台 Broker 对应的 RequestSendThread 中处理的。 def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest], callback: AbstractResponse => Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) => stateInfo.messageQueue.put(QueueItem(apiKey, request, callback)) case None => warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId)) } } } def addBroker(broker: Broker) { // be careful here. Maybe the startup() API has already started the request send thread brokerLock synchronized { if(!brokerStateInfo.contains(broker.id)) { addNewBroker(broker) startRequestSendThread(broker.id) } } } def removeBroker(brokerId: Int) { brokerLock synchronized { removeExistingBroker(brokerStateInfo(brokerId)) } } private def addNewBroker(broker: Broker) { val messageQueue = new LinkedBlockingQueue[QueueItem] debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id)) val brokerNode = broker.getNode(config.interBrokerListenerName) val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ") val networkClient = { val channelBuilder = ChannelBuilders.clientChannelBuilder( config.interBrokerSecurityProtocol, JaasContext.Type.SERVER, config, config.interBrokerListenerName, config.saslMechanismInterBrokerProtocol, config.saslInterBrokerHandshakeRequestEnable ) val selector = new Selector( NetworkReceive.UNLIMITED, Selector.NO_IDLE_TIMEOUT_MS, metrics, time, "controller-channel", Map("broker-id" -> brokerNode.idString).asJava, false, channelBuilder, logContext ) new NetworkClient( selector, new ManualMetadataUpdater(Seq(brokerNode).asJava), config.brokerId.toString, 1, 0, 0, Selectable.USE_DEFAULT_BUFFER_SIZE, Selectable.USE_DEFAULT_BUFFER_SIZE, config.requestTimeoutMs, time, false, new ApiVersions, logContext ) } val threadName = threadNamePrefix match { case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id) case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id) } val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient, brokerNode, config, time, stateChangeLogger, threadName) requestThread.setDaemon(false) val queueSizeGauge = newGauge( QueueSizeMetricName, new Gauge[Int] { def value: Int = messageQueue.size }, queueSizeTags(broker.id) ) brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread, queueSizeGauge)) } private def queueSizeTags(brokerId: Int) = Map("broker-id" -> brokerId.toString) private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) { try { // Shutdown the RequestSendThread before closing the NetworkClient to avoid the concurrent use of the // non-threadsafe classes as described in KAFKA-4959. // The call to shutdownLatch.await() in ShutdownableThread.shutdown() serves as a synchronization barrier that // hands off the NetworkClient from the RequestSendThread to the ZkEventThread. brokerState.requestSendThread.shutdown() brokerState.networkClient.close() brokerState.messageQueue.clear() removeMetric(QueueSizeMetricName, queueSizeTags(brokerState.brokerNode.id)) brokerStateInfo.remove(brokerState.brokerNode.id) } catch { case e: Throwable => error("Error while removing broker by the controller", e) } } protected def startRequestSendThread(brokerId: Int) { val requestThread = brokerStateInfo(brokerId).requestSendThread if(requestThread.getState == Thread.State.NEW) requestThread.start() } }
KafkaContrller中的几种leader选举机制:
源码待更新。。。
OfflinePartitionLeaderSelector:
NoReplicaOnlineException
异常。NoReplicaOnlineException
异常。ReassignedPartitionLeaderSelector:
PreferredReplicaPartitionLeaderSelector:
ControlledShutdownLeaderSelector:
unclean.leader.election.enable
。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。