当前位置:   article > 正文

kafka副本机制(一):副本与分区介绍_shrinking isr

shrinking isr

Kafka使用Replica对象表示一个分区的副本:

  1. class Replica(val brokerId: Int,//副本所在的brokerID
  2. val partition: Partition,//副本对应的分区
  3. time: Time = SystemTime,
  4. initialHighWatermarkValue: Long = 0L,
  5. //本地副本对应的Log对象
  6. val log: Option[Log] = None) extends Logging {
  7. // the high watermark offset value, in non-leader replicas only its message offsets are kept
  8. //消费者只能获取到HW之前消息。此字段由leader副本负责更新维护,其他副本只保留一份
  9. @volatile private[this] var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(initialHighWatermarkValue)
  10. // the log end offset value, kept in all replicas;
  11. // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
  12. // 追加到log中的最新消息的offset,可以直接从Log.nextOffsetMeatada字段获取,非leader副本从leader副本拉取获取。
  13. @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
  14. val topic = partition.topic
  15. val partitionId = partition.partitionId
  16. //记录Follower副本最后一次赶上leader副本的时机
  17. private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(time.milliseconds)
  18. }

分区Partition:

  1. class Partition(val topic: String,//topic和分区号
  2. val partitionId: Int,
  3. time: Time,
  4. replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
  5. private val localBrokerId = replicaManager.config.brokerId
  6. //Broker上的LogManager对象
  7. private val logManager = replicaManager.logManager
  8. //操作Zookeeper的辅助类
  9. private val zkUtils = replicaManager.zkUtils
  10. //所有副本的集合
  11. private val assignedReplicaMap = new Pool[Int, Replica]
  12. // The read lock is only required when multiple reads are executed and needs to be in a consistent manner
  13. private val leaderIsrUpdateLock = new ReentrantReadWriteLock()
  14. private var zkVersion: Int = LeaderAndIsr.initialZKVersion
  15. // Leader副本的年代信息
  16. @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
  17. // 该分区的leader副本的ID
  18. @volatile var leaderReplicaIdOpt: Option[Int] = None
  19. // 维护了分区的isr集合
  20. @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica]
  21. /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
  22. * One way of doing that is through the controller's start replica state change command. When a new broker starts up
  23. * the controller sends it a start replica command containing the leader for each partition that the broker hosts.
  24. * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
  25. * each partition. */
  26. private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
  27. this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
  28. private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
  29. val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
  30. }

下面Partition的方法主要有5个:
1. 创建副本对象,getOrCreateReplica
2. 副本的leader和follower角色切换,makeLeader和makeFollower
3. isr集合管理,分别有maybeExpandIsr()与maybeShrinkIsr()
4. 消息写入,appendMessagesToLeader
5. 检测HW位置,checkEnoughReplicasReachOffset

创建副本:

  1. // 在AR集合(assignedReplicaMap)中查找指定副本的Replica对象,找不到就添加到AR中。
  2. def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
  3. //在AR集合(assignedReplicaMap)中查找指定副本的Replica对象
  4. val replicaOpt = getReplica(replicaId)
  5. replicaOpt match {
  6. //查到指定的replica对象,直接返回
  7. case Some(replica) => replica
  8. case None =>
  9. //判断是否为localReplica
  10. if (isReplicaLocal(replicaId)) {
  11. //获取配置消息,Zookeeper中的配置会覆盖默认的配置
  12. val config = LogConfig.fromProps(logManager.defaultConfig.originals,
  13. AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
  14. // 创建local副本指定的log,包括文件夹,如果log已经存在就直接返回。
  15. val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
  16. // 获取指定目录的对应的offsetcheckpoint对象,负责管理log目录下的replication-offset-checkpoint文件。
  17. val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
  18. //读取replication-offset-checkpoint文件的HW
  19. val offsetMap = checkpoint.read
  20. if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
  21. info("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
  22. // 把offset和LEO比较,值作为这个副本的HW
  23. val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
  24. //创建replica对象,添加到assignedReplicaMap集合中管理
  25. val localReplica = new Replica(replicaId, this, time, offset, Some(log))
  26. addReplicaIfNotExists(localReplica)
  27. } else {
  28. //非本地副本,直接创建replica对象并添加到AR中。
  29. val remoteReplica = new Replica(replicaId, this, time)
  30. addReplicaIfNotExists(remoteReplica)
  31. }
  32. //返回创建的replica
  33. getReplica(replicaId).get
  34. }
  35. }

副本角色切换
Broker会根据KafkaController发送到LeaderAndISRRequest请求控制副本的leader和follwer角色切换。Paritition.makeLeader()方法是处理LeaderAndISRRequest中比较重要的环节之一,它会把Local Replica设置成Leader副本。

  1. /*
  2. * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset
  3. * from the time when this broker was the leader last time) and setting the new leader and ISR.
  4. * If the leader replica id does not change, return false to indicate the replica manager.
  5. */
  6. public class PartitionState {
  7. public final int controllerEpoch;
  8. //leader副本的ID
  9. public final int leader;
  10. public final int leaderEpoch;
  11. //ISR集合,保存的是ID
  12. public final List<Integer> isr;
  13. public final int zkVersion;
  14. //AR集合
  15. public final Set<Integer> replicas;
  16. }
  17. def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
  18. val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
  19. //获取需要分配的AR集合
  20. val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
  21. // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
  22. // to maintain the decision maker controller's epoch in the zookeeper path
  23. controllerEpoch = partitionStateInfo.controllerEpoch
  24. // add replicas that are new
  25. // 创建AR集合中所有副本对应的Replica对象
  26. allReplicas.foreach(replica => getOrCreateReplica(replica))
  27. //获取ISR集合
  28. val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
  29. // remove assigned replicas that have been removed by the controller
  30. // 根据allReplicas更新assignedReplicas集合
  31. (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
  32. //更新Partition字段
  33. inSyncReplicas = newInSyncReplicas//更新isr集合、leaderEpoch、zkVersion
  34. leaderEpoch = partitionStateInfo.leaderEpoch
  35. zkVersion = partitionStateInfo.zkVersion
  36. // 检测leader是否发生变化
  37. val isNewLeader =
  38. if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {
  39. //laeder所在的brokerID没有变化
  40. false
  41. } else {
  42. //之前这个leader不在这个broker上,就更新leaderReplicaIdOpt
  43. leaderReplicaIdOpt = Some(localBrokerId)
  44. true
  45. }
  46. //获取local Replica
  47. val leaderReplica = getReplica().get
  48. // we may need to increment high watermark since ISR could be down to 1
  49. if (isNewLeader) {
  50. // construct the high watermark metadata for the new leader replica
  51. // 初始化leader的HW metadata
  52. leaderReplica.convertHWToLocalOffsetMetadata()
  53. // reset log end offset for remote replicas
  54. // 重置所有远程副本的LEO是-1
  55. assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
  56. }
  57. //尝试更新HW
  58. (maybeIncrementLeaderHW(leaderReplica), isNewLeader)
  59. }
  60. // some delayed operations may be unblocked after HW changed
  61. // 如果HW增加了,那么DelayFetch可能满足条件了,这里检查
  62. if (leaderHWIncremented)
  63. tryCompleteDelayedRequests()
  64. isNewLeader
  65. }

maybeIncrementLeaderHW方法会尝试后移leader副本的hw,当ISR集合发送增减或者ISR中一个副本的leo发生变化时,都会导致isr集合中最小leo变大,所以这种情况要调用maybeIncrementLeaderHW进行检查
 

  1. private def maybeIncrementLeaderHW(leaderReplica: Replica): Boolean = {
  2. //获取ISR中所有副本的LEO
  3. val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
  4. //把ISR集合最小的LEO作为新的HW
  5. val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
  6. //获取当前HW
  7. val oldHighWatermark = leaderReplica.highWatermark
  8. //比较两个HW,更新。
  9. if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
  10. leaderReplica.highWatermark = newHighWatermark
  11. debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
  12. true
  13. } else {
  14. debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
  15. .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
  16. false
  17. }
  18. }

makeFollower把local的replica设置为follower副本

  1. /**
  2. * Make the local replica the follower by setting the new leader and ISR to empty
  3. * If the leader replica id does not change, return false to indicate the replica manager
  4. */
  5. def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
  6. //加锁
  7. inWriteLock(leaderIsrUpdateLock) {
  8. //获取需要分配的AR集合
  9. val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
  10. //获取leader的brokersID
  11. val newLeaderBrokerId: Int = partitionStateInfo.leader
  12. // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
  13. // to maintain the decision maker controller's epoch in the zookeeper path
  14. controllerEpoch = partitionStateInfo.controllerEpoch
  15. // add replicas that are new
  16. // 床面对应的replica对象
  17. allReplicas.foreach(r => getOrCreateReplica(r))
  18. // remove assigned replicas that have been removed by the controller
  19. // 根据partitionStateInfo信息更新AR集合
  20. (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
  21. // 空集合,ISR集合在Leader副本上进行维护,Follower副本上不维护ISR集合信息
  22. inSyncReplicas = Set.empty[Replica]
  23. leaderEpoch = partitionStateInfo.leaderEpoch
  24. zkVersion = partitionStateInfo.zkVersion
  25. //查看leader是否发生变化
  26. if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
  27. false
  28. }
  29. else {
  30. // 更新leaderReplicaIdOpt字段
  31. leaderReplicaIdOpt = Some(newLeaderBrokerId)
  32. true
  33. }
  34. }
  35. }

ISR集合管理
Partition处理对副本的角色进行切换,还要管理ISR集合。分别有maybeExpandIsr()与maybeShrinkIsr()

  1. /**
  2. * Check and maybe expand the ISR of the partition.
  3. *
  4. * This function can be triggered when a replica's LEO has incremented
  5. */
  6. def maybeExpandIsr(replicaId: Int) {
  7. //只有leader副本才需要管理isr,所以先获取leader副本对应的replica对象
  8. val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
  9. // check if this replica needs to be added to the ISR
  10. leaderReplicaIfLocal() match {
  11. case Some(leaderReplica) =>
  12. val replica = getReplica(replicaId).get
  13. //获取当前HW
  14. val leaderHW = leaderReplica.highWatermark
  15. //if判断,1. follower在不在isr集合中,AR集合中可以找到follower副本,follower副本的leo已经赶上HW
  16. if(!inSyncReplicas.contains(replica) &&
  17. assignedReplicas.map(_.brokerId).contains(replicaId) &&
  18. replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
  19. //把follower副本添加到ISR集合,形成新的isr集合
  20. val newInSyncReplicas = inSyncReplicas + replica
  21. info("Expanding ISR for partition [%s,%d] from %s to %s"
  22. .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","),
  23. newInSyncReplicas.map(_.brokerId).mkString(",")))
  24. // update ISR in ZK and cache
  25. //写到zk中保存
  26. updateIsr(newInSyncReplicas)
  27. //更新Partition.inSyncReplicas字段
  28. replicaManager.isrExpandRate.mark()
  29. }
  30. // check if the HW of the partition can now be incremented
  31. // since the replica maybe now be in the ISR and its LEO has just incremented
  32. // 尝试更新HW
  33. maybeIncrementLeaderHW(leaderReplica)
  34. case None => false // nothing to do if no longer leader
  35. }
  36. }
  37. // 尝试执行延迟任务
  38. if (leaderHWIncremented)
  39. tryCompleteDelayedRequests()
  40. }

在分布式系统中,各个节点通过网络交互可能出现阻塞和延迟,导致ISR集合中的部分Follower服务无法和leader进行同步。如果此时ProducerRequest的acks是-1,那就要等待长时间。
为了避免出现这种情况,Partition会对ISR集合进行缩减,功能在maybeShrinkIsr中实现,在ReplicaManager中使用定时任务周期性地调用maybeShrinkIsr检查ISR集合中follower副本和leader副本与leader副本之间的同步差距。并对isr集合进行缩减。

  1. def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
  2. val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
  3. //获取leader副本对应replica对象
  4. leaderReplicaIfLocal() match {
  5. case Some(leaderReplica) =>
  6. // 检测follower副本中的lastCaughtUpTimeMsUnderlying字段,找到之后的follower副本集合,剔除出isr集合
  7. //无论是长时间没有和leader进行同步或者leo和hw相差太大,就可以从这个字段中反映出来。
  8. val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
  9. if(outOfSyncReplicas.size > 0) {
  10. val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
  11. assert(newInSyncReplicas.size > 0)
  12. info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
  13. inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
  14. // 生成新的ISR集合并在zk中存下
  15. updateIsr(newInSyncReplicas)
  16. // we may need to increment high watermark since ISR could be down to 1
  17. replicaManager.isrShrinkRate.mark()
  18. // 尝试更新HW
  19. maybeIncrementLeaderHW(leaderReplica)
  20. } else {
  21. false
  22. }
  23. case None => false // do nothing if no longer leader
  24. }
  25. }
  26. // some delayed operations may be unblocked after HW changed
  27. // 尝试执行延迟任务
  28. if (leaderHWIncremented)
  29. tryCompleteDelayedRequests()
  30. }

追加消息
在分区中,只有leader副本可以处理读写请求,appendMessagesToLeader提供向leader副本对应的log追加消息的功能。

  1. def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = {
  2. val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
  3. //获取leader副本对应replica对象
  4. val leaderReplicaOpt = leaderReplicaIfLocal()
  5. leaderReplicaOpt match {
  6. case Some(leaderReplica) =>
  7. val log = leaderReplica.log.get
  8. // 获取配置指定的最小isr集合大小的限制
  9. val minIsr = log.config.minInSyncReplicas
  10. val inSyncSize = inSyncReplicas.size
  11. // Avoid writing to leader if there are not enough insync replicas to make it safe
  12. // isr集合小于要求,跑出NotEnoughReplicasException异常给生产者
  13. if (inSyncSize < minIsr && requiredAcks == -1) {
  14. throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
  15. .format(topic, partitionId, inSyncSize, minIsr))
  16. }
  17. // 写入leader副本对应的log
  18. val info = log.append(messages, assignOffsets = true)
  19. // probably unblock some follower fetch requests since log end offset has been updated
  20. // 尝试执行delayedFetch
  21. replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic, this.partitionId))
  22. // we may need to increment high watermark since ISR could be down to 1
  23. // 尝试更新HW
  24. (info, maybeIncrementLeaderHW(leaderReplica))
  25. case None =>
  26. throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
  27. .format(topic, partitionId, localBrokerId))
  28. }
  29. }
  30. // some delayed operations may be unblocked after HW changed
  31. // 尝试执行延迟任务
  32. if (leaderHWIncremented)
  33. tryCompleteDelayedRequests()
  34. info
  35. }

检测HW位置
在介绍DelayProduce的执行条件时,提到了checkEnoughReplicasReachOffset方法,检测指定的消息是否已经被isr集合中的所有follower副本同步

  1. /*
  2. * Note that this method will only be called if requiredAcks = -1
  3. * and we are waiting for all replicas in ISR to be fully caught up to
  4. * the (local) leader's offset corresponding to this produce request
  5. * before we acknowledge the produce request.
  6. */
  7. def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short) = {
  8. leaderReplicaIfLocal() match {
  9. // 获取leader副本对应replica对象
  10. case Some(leaderReplica) =>
  11. // keep the current immutable replica list reference
  12. // 获取当前ISR集合
  13. val curInSyncReplicas = inSyncReplicas
  14. val numAcks = curInSyncReplicas.count(r => {
  15. if (!r.isLocal)
  16. if (r.logEndOffset.messageOffset >= requiredOffset) {
  17. trace("Replica %d of %s-%d received offset %d".format(r.brokerId, topic, partitionId, requiredOffset))
  18. true
  19. }
  20. else
  21. false
  22. else
  23. true /* also count the local (leader) replica */
  24. })
  25. trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, topic, partitionId))
  26. val minIsr = leaderReplica.log.get.config.minInSyncReplicas
  27. //比较HW 和消息的offset
  28. if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
  29. /*
  30. * The topic may be configured not to accept messages if there are not enough replicas in ISR
  31. * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
  32. */
  33. // 检测isr大小是否合法,太小则返回错误码
  34. if (minIsr <= curInSyncReplicas.size) {
  35. (true, Errors.NONE.code)
  36. } else {
  37. (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND.code)
  38. }
  39. } else
  40. (false, Errors.NONE.code)
  41. case None =>
  42. (false, Errors.NOT_LEADER_FOR_PARTITION.code)
  43. }
  44. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/700521
推荐阅读
相关标签
  

闽ICP备14008679号