赞
踩
Kafka使用Replica对象表示一个分区的副本:
- class Replica(val brokerId: Int,//副本所在的brokerID
- val partition: Partition,//副本对应的分区
- time: Time = SystemTime,
- initialHighWatermarkValue: Long = 0L,
- //本地副本对应的Log对象
- val log: Option[Log] = None) extends Logging {
- // the high watermark offset value, in non-leader replicas only its message offsets are kept
- //消费者只能获取到HW之前消息。此字段由leader副本负责更新维护,其他副本只保留一份
- @volatile private[this] var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(initialHighWatermarkValue)
- // the log end offset value, kept in all replicas;
- // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
- // 追加到log中的最新消息的offset,可以直接从Log.nextOffsetMeatada字段获取,非leader副本从leader副本拉取获取。
- @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
-
- val topic = partition.topic
- val partitionId = partition.partitionId
- //记录Follower副本最后一次赶上leader副本的时机
- private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(time.milliseconds)
- }

分区Partition:
- class Partition(val topic: String,//topic和分区号
- val partitionId: Int,
- time: Time,
- replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
- private val localBrokerId = replicaManager.config.brokerId
- //Broker上的LogManager对象
- private val logManager = replicaManager.logManager
- //操作Zookeeper的辅助类
- private val zkUtils = replicaManager.zkUtils
- //所有副本的集合
- private val assignedReplicaMap = new Pool[Int, Replica]
- // The read lock is only required when multiple reads are executed and needs to be in a consistent manner
- private val leaderIsrUpdateLock = new ReentrantReadWriteLock()
- private var zkVersion: Int = LeaderAndIsr.initialZKVersion
- // Leader副本的年代信息
- @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
- // 该分区的leader副本的ID
- @volatile var leaderReplicaIdOpt: Option[Int] = None
- // 维护了分区的isr集合
- @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica]
-
- /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
- * One way of doing that is through the controller's start replica state change command. When a new broker starts up
- * the controller sends it a start replica command containing the leader for each partition that the broker hosts.
- * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
- * each partition. */
- private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
- this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
-
- private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
- val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
- }

下面Partition的方法主要有5个:
1. 创建副本对象,getOrCreateReplica
2. 副本的leader和follower角色切换,makeLeader和makeFollower
3. isr集合管理,分别有maybeExpandIsr()与maybeShrinkIsr()
4. 消息写入,appendMessagesToLeader
5. 检测HW位置,checkEnoughReplicasReachOffset
创建副本:
- // 在AR集合(assignedReplicaMap)中查找指定副本的Replica对象,找不到就添加到AR中。
- def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
- //在AR集合(assignedReplicaMap)中查找指定副本的Replica对象
- val replicaOpt = getReplica(replicaId)
- replicaOpt match {
- //查到指定的replica对象,直接返回
- case Some(replica) => replica
- case None =>
- //判断是否为localReplica
- if (isReplicaLocal(replicaId)) {
- //获取配置消息,Zookeeper中的配置会覆盖默认的配置
- val config = LogConfig.fromProps(logManager.defaultConfig.originals,
- AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
- // 创建local副本指定的log,包括文件夹,如果log已经存在就直接返回。
- val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
- // 获取指定目录的对应的offsetcheckpoint对象,负责管理log目录下的replication-offset-checkpoint文件。
- val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
- //读取replication-offset-checkpoint文件的HW
- val offsetMap = checkpoint.read
- if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
- info("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
- // 把offset和LEO比较,值作为这个副本的HW
- val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
- //创建replica对象,添加到assignedReplicaMap集合中管理
- val localReplica = new Replica(replicaId, this, time, offset, Some(log))
- addReplicaIfNotExists(localReplica)
- } else {
- //非本地副本,直接创建replica对象并添加到AR中。
- val remoteReplica = new Replica(replicaId, this, time)
- addReplicaIfNotExists(remoteReplica)
- }
- //返回创建的replica
- getReplica(replicaId).get
- }
- }

副本角色切换
Broker会根据KafkaController发送到LeaderAndISRRequest请求控制副本的leader和follwer角色切换。Paritition.makeLeader()方法是处理LeaderAndISRRequest中比较重要的环节之一,它会把Local Replica设置成Leader副本。
- /*
- * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset
- * from the time when this broker was the leader last time) and setting the new leader and ISR.
- * If the leader replica id does not change, return false to indicate the replica manager.
- */
- public class PartitionState {
- public final int controllerEpoch;
- //leader副本的ID
- public final int leader;
- public final int leaderEpoch;
- //ISR集合,保存的是ID
- public final List<Integer> isr;
- public final int zkVersion;
- //AR集合
- public final Set<Integer> replicas;
- }
- def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
- val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
- //获取需要分配的AR集合
- val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
- // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
- // to maintain the decision maker controller's epoch in the zookeeper path
- controllerEpoch = partitionStateInfo.controllerEpoch
- // add replicas that are new
- // 创建AR集合中所有副本对应的Replica对象
- allReplicas.foreach(replica => getOrCreateReplica(replica))
- //获取ISR集合
- val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
- // remove assigned replicas that have been removed by the controller
- // 根据allReplicas更新assignedReplicas集合
- (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
- //更新Partition字段
- inSyncReplicas = newInSyncReplicas//更新isr集合、leaderEpoch、zkVersion
- leaderEpoch = partitionStateInfo.leaderEpoch
- zkVersion = partitionStateInfo.zkVersion
- // 检测leader是否发生变化
- val isNewLeader =
- if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {
- //laeder所在的brokerID没有变化
- false
- } else {
- //之前这个leader不在这个broker上,就更新leaderReplicaIdOpt
- leaderReplicaIdOpt = Some(localBrokerId)
- true
- }
- //获取local Replica
- val leaderReplica = getReplica().get
- // we may need to increment high watermark since ISR could be down to 1
- if (isNewLeader) {
- // construct the high watermark metadata for the new leader replica
- // 初始化leader的HW metadata
- leaderReplica.convertHWToLocalOffsetMetadata()
- // reset log end offset for remote replicas
- // 重置所有远程副本的LEO是-1
- assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
- }
- //尝试更新HW
- (maybeIncrementLeaderHW(leaderReplica), isNewLeader)
- }
- // some delayed operations may be unblocked after HW changed
- // 如果HW增加了,那么DelayFetch可能满足条件了,这里检查
- if (leaderHWIncremented)
- tryCompleteDelayedRequests()
- isNewLeader
- }

maybeIncrementLeaderHW方法会尝试后移leader副本的hw,当ISR集合发送增减或者ISR中一个副本的leo发生变化时,都会导致isr集合中最小leo变大,所以这种情况要调用maybeIncrementLeaderHW进行检查
- private def maybeIncrementLeaderHW(leaderReplica: Replica): Boolean = {
- //获取ISR中所有副本的LEO
- val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
- //把ISR集合最小的LEO作为新的HW
- val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
- //获取当前HW
- val oldHighWatermark = leaderReplica.highWatermark
- //比较两个HW,更新。
- if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
- leaderReplica.highWatermark = newHighWatermark
- debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
- true
- } else {
- debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
- .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
- false
- }
- }

makeFollower把local的replica设置为follower副本
- /**
- * Make the local replica the follower by setting the new leader and ISR to empty
- * If the leader replica id does not change, return false to indicate the replica manager
- */
- def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
- //加锁
- inWriteLock(leaderIsrUpdateLock) {
- //获取需要分配的AR集合
- val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
- //获取leader的brokersID
- val newLeaderBrokerId: Int = partitionStateInfo.leader
- // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
- // to maintain the decision maker controller's epoch in the zookeeper path
- controllerEpoch = partitionStateInfo.controllerEpoch
- // add replicas that are new
- // 床面对应的replica对象
- allReplicas.foreach(r => getOrCreateReplica(r))
- // remove assigned replicas that have been removed by the controller
- // 根据partitionStateInfo信息更新AR集合
- (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
- // 空集合,ISR集合在Leader副本上进行维护,Follower副本上不维护ISR集合信息
- inSyncReplicas = Set.empty[Replica]
- leaderEpoch = partitionStateInfo.leaderEpoch
- zkVersion = partitionStateInfo.zkVersion
- //查看leader是否发生变化
- if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
- false
- }
- else {
- // 更新leaderReplicaIdOpt字段
- leaderReplicaIdOpt = Some(newLeaderBrokerId)
- true
- }
- }
- }

ISR集合管理
Partition处理对副本的角色进行切换,还要管理ISR集合。分别有maybeExpandIsr()与maybeShrinkIsr()
- /**
- * Check and maybe expand the ISR of the partition.
- *
- * This function can be triggered when a replica's LEO has incremented
- */
- def maybeExpandIsr(replicaId: Int) {
- //只有leader副本才需要管理isr,所以先获取leader副本对应的replica对象
- val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
- // check if this replica needs to be added to the ISR
- leaderReplicaIfLocal() match {
- case Some(leaderReplica) =>
- val replica = getReplica(replicaId).get
- //获取当前HW
- val leaderHW = leaderReplica.highWatermark
- //if判断,1. follower在不在isr集合中,AR集合中可以找到follower副本,follower副本的leo已经赶上HW
- if(!inSyncReplicas.contains(replica) &&
- assignedReplicas.map(_.brokerId).contains(replicaId) &&
- replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
- //把follower副本添加到ISR集合,形成新的isr集合
- val newInSyncReplicas = inSyncReplicas + replica
- info("Expanding ISR for partition [%s,%d] from %s to %s"
- .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","),
- newInSyncReplicas.map(_.brokerId).mkString(",")))
- // update ISR in ZK and cache
- //写到zk中保存
- updateIsr(newInSyncReplicas)
- //更新Partition.inSyncReplicas字段
- replicaManager.isrExpandRate.mark()
- }
-
- // check if the HW of the partition can now be incremented
- // since the replica maybe now be in the ISR and its LEO has just incremented
- // 尝试更新HW
- maybeIncrementLeaderHW(leaderReplica)
-
- case None => false // nothing to do if no longer leader
- }
- }
-
- // 尝试执行延迟任务
- if (leaderHWIncremented)
- tryCompleteDelayedRequests()
- }

在分布式系统中,各个节点通过网络交互可能出现阻塞和延迟,导致ISR集合中的部分Follower服务无法和leader进行同步。如果此时ProducerRequest的acks是-1,那就要等待长时间。
为了避免出现这种情况,Partition会对ISR集合进行缩减,功能在maybeShrinkIsr中实现,在ReplicaManager中使用定时任务周期性地调用maybeShrinkIsr检查ISR集合中follower副本和leader副本与leader副本之间的同步差距。并对isr集合进行缩减。
- def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
- val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
- //获取leader副本对应replica对象
- leaderReplicaIfLocal() match {
- case Some(leaderReplica) =>
- // 检测follower副本中的lastCaughtUpTimeMsUnderlying字段,找到之后的follower副本集合,剔除出isr集合
- //无论是长时间没有和leader进行同步或者leo和hw相差太大,就可以从这个字段中反映出来。
- val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
- if(outOfSyncReplicas.size > 0) {
- val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
- assert(newInSyncReplicas.size > 0)
- info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
- inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
- // 生成新的ISR集合并在zk中存下
- updateIsr(newInSyncReplicas)
- // we may need to increment high watermark since ISR could be down to 1
-
- replicaManager.isrShrinkRate.mark()
- // 尝试更新HW
- maybeIncrementLeaderHW(leaderReplica)
- } else {
- false
- }
-
- case None => false // do nothing if no longer leader
- }
- }
-
- // some delayed operations may be unblocked after HW changed
- // 尝试执行延迟任务
- if (leaderHWIncremented)
- tryCompleteDelayedRequests()
- }

追加消息
在分区中,只有leader副本可以处理读写请求,appendMessagesToLeader提供向leader副本对应的log追加消息的功能。
- def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = {
- val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
- //获取leader副本对应replica对象
- val leaderReplicaOpt = leaderReplicaIfLocal()
- leaderReplicaOpt match {
- case Some(leaderReplica) =>
- val log = leaderReplica.log.get
- // 获取配置指定的最小isr集合大小的限制
- val minIsr = log.config.minInSyncReplicas
- val inSyncSize = inSyncReplicas.size
-
- // Avoid writing to leader if there are not enough insync replicas to make it safe
- // isr集合小于要求,跑出NotEnoughReplicasException异常给生产者
- if (inSyncSize < minIsr && requiredAcks == -1) {
- throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
- .format(topic, partitionId, inSyncSize, minIsr))
- }
- // 写入leader副本对应的log
- val info = log.append(messages, assignOffsets = true)
- // probably unblock some follower fetch requests since log end offset has been updated
- // 尝试执行delayedFetch
- replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic, this.partitionId))
- // we may need to increment high watermark since ISR could be down to 1
- // 尝试更新HW
- (info, maybeIncrementLeaderHW(leaderReplica))
-
- case None =>
- throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
- .format(topic, partitionId, localBrokerId))
- }
- }
-
- // some delayed operations may be unblocked after HW changed
- // 尝试执行延迟任务
- if (leaderHWIncremented)
- tryCompleteDelayedRequests()
-
- info
- }

检测HW位置
在介绍DelayProduce的执行条件时,提到了checkEnoughReplicasReachOffset方法,检测指定的消息是否已经被isr集合中的所有follower副本同步
- /*
- * Note that this method will only be called if requiredAcks = -1
- * and we are waiting for all replicas in ISR to be fully caught up to
- * the (local) leader's offset corresponding to this produce request
- * before we acknowledge the produce request.
- */
- def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short) = {
- leaderReplicaIfLocal() match {
- // 获取leader副本对应replica对象
- case Some(leaderReplica) =>
- // keep the current immutable replica list reference
- // 获取当前ISR集合
- val curInSyncReplicas = inSyncReplicas
- val numAcks = curInSyncReplicas.count(r => {
- if (!r.isLocal)
- if (r.logEndOffset.messageOffset >= requiredOffset) {
- trace("Replica %d of %s-%d received offset %d".format(r.brokerId, topic, partitionId, requiredOffset))
- true
- }
- else
- false
- else
- true /* also count the local (leader) replica */
- })
-
- trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, topic, partitionId))
-
- val minIsr = leaderReplica.log.get.config.minInSyncReplicas
- //比较HW 和消息的offset
- if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
- /*
- * The topic may be configured not to accept messages if there are not enough replicas in ISR
- * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
- */
- // 检测isr大小是否合法,太小则返回错误码
- if (minIsr <= curInSyncReplicas.size) {
- (true, Errors.NONE.code)
- } else {
- (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND.code)
- }
- } else
- (false, Errors.NONE.code)
- case None =>
- (false, Errors.NOT_LEADER_FOR_PARTITION.code)
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。