赞
踩
1.1 概念
AR::所有副本几点
ISR:IN-SYNC REPLICATION 保持一定程度同步的副本
OSR:Out-Sync Relipcas,副本同步落后太多的节点
HW:HW是High Watermak的缩写, 俗称高水位,它表示了一个特定消息的偏移量(offset),消费之只能拉取到这个offset之前的消息。
LEO:Log End Offset当前要写入最新消息的位置
1.1 kafkaServer
代表一个kafka Broker的生命周期。除了所有的必要启动和停止一个kafka node的功能
1.2 ReplicaManager
管理副本的动作,比如,启动副本为leader或者Follower,停止副本,从leader同步数据等。
1.3 ReplicaFetcherManager
继承自AbstractFetcherManager。负责创建和停止ReplicaFetcherThread。
1.4 ReplicaFetcherThread
继承自AbstractFetcherThread。负责从leader同步数据,追加到本地日志。
2.1 ReplicaManager创建和启动
在kafkaServer中创建并且启动了ReplicaManager
- /* start replica manager */
- replicaManager = createReplicaManager(isShuttingDown)
- replicaManager.startup()
-
-
- protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
- new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
- brokerTopicStats, metadataCache, logDirFailureChannel)
启动的实际上是多个定时调度线程,周期性的检测是否有副本掉队,进而收缩isr列表
- // start ISR expiration thread
- // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
- scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
- scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS)
- scheduler.schedule("shutdown-idle-replica-alter-log-dirs-thread", shutdownIdleReplicaAlterLogDirsThread _, period = 10000L, unit = TimeUnit.MILLISECONDS)
maybeShrinkIsr会遍历所有分区,判断是否收缩isr
-
- private def maybeShrinkIsr(): Unit = {
- trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
-
- // Shrink ISRs for non offline partitions
- allPartitions.keys.foreach { topicPartition =>
- nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr())
- }
- }
maybeShrinkIsr它会首先判断是不是leader在本地。是的话进行处理,否则什么都不做。
-
- private def needsShrinkIsr(): Boolean = {
- if (isLeader) {
- val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)
- outOfSyncReplicaIds.nonEmpty
- } else {
- false
- }
- }
-
-
- def maybeShrinkIsr(): Unit = {
- val needsIsrUpdate = inReadLock(leaderIsrUpdateLock) {
- ne
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。