当前位置:   article > 正文

Kafka源码(6)-副本同步机制及isr列表更新_out of sync replicas

out of sync replicas

1. 重要类介绍

1.1 概念

AR::所有副本几点

ISR:IN-SYNC REPLICATION 保持一定程度同步的副本

OSR:Out-Sync Relipcas,副本同步落后太多的节点

HW:HW是High Watermak的缩写, 俗称高水位,它表示了一个特定消息的偏移量(offset),消费之只能拉取到这个offset之前的消息。

LEO:Log End Offset当前要写入最新消息的位置

1.1 kafkaServer

代表一个kafka Broker的生命周期。除了所有的必要启动和停止一个kafka node的功能

1.2 ReplicaManager

管理副本的动作,比如,启动副本为leader或者Follower,停止副本,从leader同步数据等。

1.3 ReplicaFetcherManager

继承自AbstractFetcherManager。负责创建和停止ReplicaFetcherThread。

1.4 ReplicaFetcherThread

继承自AbstractFetcherThread。负责从leader同步数据,追加到本地日志。

2.源码

2.1 ReplicaManager创建和启动

在kafkaServer中创建并且启动了ReplicaManager

  1. /* start replica manager */
  2. replicaManager = createReplicaManager(isShuttingDown)
  3. replicaManager.startup()
  4. protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
  5. new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
  6. brokerTopicStats, metadataCache, logDirFailureChannel)

启动的实际上是多个定时调度线程,周期性的检测是否有副本掉队,进而收缩isr列表

  1. // start ISR expiration thread
  2. // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
  3. scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
  4. scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS)
  5. scheduler.schedule("shutdown-idle-replica-alter-log-dirs-thread", shutdownIdleReplicaAlterLogDirsThread _, period = 10000L, unit = TimeUnit.MILLISECONDS)

maybeShrinkIsr会遍历所有分区,判断是否收缩isr

  1. private def maybeShrinkIsr(): Unit = {
  2. trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
  3. // Shrink ISRs for non offline partitions
  4. allPartitions.keys.foreach { topicPartition =>
  5. nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr())
  6. }
  7. }

maybeShrinkIsr它会首先判断是不是leader在本地。是的话进行处理,否则什么都不做。

  1. private def needsShrinkIsr(): Boolean = {
  2. if (isLeader) {
  3. val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)
  4. outOfSyncReplicaIds.nonEmpty
  5. } else {
  6. false
  7. }
  8. }
  9. def maybeShrinkIsr(): Unit = {
  10. val needsIsrUpdate = inReadLock(leaderIsrUpdateLock) {
  11. ne
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/215572
推荐阅读
相关标签
  

闽ICP备14008679号