赞
踩
kafka实现高可靠性的基础是使用副本机制,具体实现方式是,同一个分区下的多个副本分散在不同的broker机器上,它们保存相同的消息数据以实现高可靠性。下面将谈谈kafka的副本机制。
Broker:服务代理节点。对于Kafka而言,Broker可以简单地看作一个独立的Kafka服务节点或Kafka服务实例。大多数情况下也可以将Broker看作一台Kafka服务器,前提是这台服务器上只部署了一个Kafka实例。一个或多个Broker组成了一个Kafka集群。一般而言,我们更习惯使用首字母小写的broker来表示服务代理节点
Controller:是Broker的管理节点,在Broker启动的时候会试着在zookeeper上创建controller临时节点,创建成功的Broker就有了Controller的身份。随后会读取zookeeper上的节点信息缓存到本地,并监听一些zookeeper上的brokers、topics、partitions等节点,当监听到相应的变化后会更新本地缓存并发送到其他的follower节点
Topic:Kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。
Partition:一个分区只属于单个主题,很多时候也会把分区称为主题分区。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序
Replica:kafka通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”的关系,其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务。Kafka通过多副本机制实现了故障的自动转移,当Kafka集群中某个broker失效时仍然能保证服务可用
LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=11,那么表示该副本保存了11条消息,位移值范围是[0, 10]
HW:即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的(replicated),下图中的hw=6,表示消费者只能消费到6以前的消息
ISR:partition的leader会追踪和维护ISR中所有follower的滞后状态。如果滞后太多(数量滞后和时间滞后两个维度,replica.lag.time.max.ms和replica.lag.max.message可配置),leader会把该replica从ISR中移除。被移除ISR的replica一直在追赶leader。如上图,leader写入数据后并不会commit,只有ISR列表中的所有folower同步之后才会commit,把滞后的follower移除ISR主要是避免写消息延迟。设置ISR主要是为了broker宕掉之后,重新选举partition的leader从ISR列表中选择
ReplicaManager 可以说是 Server 端重要的组成部分,Server 端的多种类型的请求都是调用ReplicaManager 来处理:
我们将详解这些请求流程
对于Broker而已,它管理的分区和副本的主要方式是保存哪些是leader副本,哪些是follower副本,但是这些信息不可能一层不变的(当某个broker重启或宕机的时候,该broker下的分区将会重新选举新的分区leader),而这些变更则是通过Controller发送LeaderAndIsr请求到Broker完成的
LeaderAndIsr请求数据:
public class LeaderAndIsrRequestData implements ApiMessage { int controllerId; int controllerEpoch; long brokerEpoch; byte type; List<LeaderAndIsrPartitionState> ungroupedPartitionStates; List<LeaderAndIsrTopicState> topicStates; List<LeaderAndIsrLiveLeader> liveLeaders; ... } public static class LeaderAndIsrPartitionState implements Message { String topicName; int partitionIndex; int controllerEpoch; int leader; int leaderEpoch; List<Integer> isr; int zkVersion; List<Integer> replicas; List<Integer> addingReplicas; List<Integer> removingReplicas; }
def stopReplicas(correlationId: Int, controllerId: Int, controllerEpoch: Int, brokerEpoch: Long, partitionStates: Map[TopicPartition, StopReplicaPartitionState] ): (mutable.Map[TopicPartition, Errors], Errors) = { replicaStateChangeLock synchronized { val responseMap = new collection.mutable.HashMap[TopicPartition, Errors] if (controllerEpoch < this.controllerEpoch) { stateChangeLogger.warn(s"Ignoring StopReplica request from " + s"controller $controllerId with correlation id $correlationId " + s"since its controller epoch $controllerEpoch is old. " + s"Latest known controller epoch is ${this.controllerEpoch}") (responseMap, Errors.STALE_CONTROLLER_EPOCH) } else { this.controllerEpoch = controllerEpoch ... stopPartitions(stoppedPartitions).foreach { case (topicPartition, e) => if (e.isInstanceOf[KafkaStorageException]) { stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " + "partition is in an offline log directory") } else { stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition due to an unexpected " + s"${e.getClass.getName} exception: ${e.getMessage}") responseMap.put(topicPartition, Errors.forException(e)) } responseMap.put(topicPartition, Errors.forException(e)) } (responseMap, Errors.NONE) } } } protected def stopPartitions(partitionsToStop: Map[TopicPartition, Boolean]): Map[TopicPartition, Throwable] = { // First stop fetchers for all partitions. val partitions = partitionsToStop.keySet replicaFetcherManager.removeFetcherForPartitions(partitions) replicaAlterLogDirsManager.removeFetcherForPartitions(partitions) // Second remove deleted partitions from the partition map. Fetchers rely on the // ReplicaManager to get Partition's information so they must be stopped first. val partitionsToDelete = mutable.Set.empty[TopicPartition] partitionsToStop.forKeyValue { (topicPartition, shouldDelete) => if (shouldDelete) { getPartition(topicPartition) match { case hostedPartition: NonOffline => if (allPartitions.remove(topicPartition, hostedPartition)) { maybeRemoveTopicMetrics(topicPartition.topic) // Logs are not deleted here. They are deleted in a single batch later on. // This is done to avoid having to checkpoint for every deletions. hostedPartition.partition.delete() } } partitionsToDelete += topicPartition } completeDelayedFetchOrProduceRequests(topicPartition) } // Third delete the logs and checkpoint. val errorMap = new mutable.HashMap[TopicPartition, Throwable]() if (partitionsToDelete.nonEmpty) { // Delete the logs and checkpoint. logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, e)) } errorMap }
UpdateMetadata 请求主要controller监听到元数据变化了,通知其他非controller的broker更新数据,使集群的broker的元数据保持一致
def appendRecords(timeout: Long, requiredAcks: Short, internalTopicsAllowed: Boolean, origin: AppendOrigin, entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] => Unit, delayedProduceLock: Option[Lock] = None, recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, origin, entriesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) ... if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock) val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) } else { // we can respond immediately val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus } responseCallback(produceResponseStatus) } ... } private def isValidRequiredAcks(requiredAcks: Short): Boolean = { requiredAcks == -1 || requiredAcks == 1 || requiredAcks == 0 } def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { if (operation.safeTryCompleteOrElse { watchKeys.foreach(key => watchForOperation(key, operation)) if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet() }) return true // if it cannot be completed by now and hence is watched, add to the expire queue also if (!operation.isCompleted) { if (timerEnabled) timeoutTimer.add(operation)//加入到时间轮 if (operation.isCompleted) { // cancel the timer task operation.cancel() } } false }
appendToLocalLog方法主要是根据发送的主题分区的信息找到相应的partition,并调用partition的appendRecordsToLeader方法
appendRecordsToLeader方法中,当ack是-1的时候并且当前isr的集合大小小于要求最小同步副本数的时候,则会抛出异常NotEnoughReplicasException。否则则调用日志类的appendAsLeader把日志追加到xxx.log里面,如果成功写入文件返回LogAppendInfo
def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = { val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { leaderLogIfLocal match { case Some(leaderLog) => val minIsr = leaderLog.config.minInSyncReplicas val inSyncSize = isrState.isr.size if (inSyncSize < minIsr && requiredAcks == -1) { throw new NotEnoughReplicasException(s"The size of the current ISR ${isrState.isr} " + s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition") } val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin, interBrokerProtocolVersion) (info, maybeIncrementLeaderHW(leaderLog)) case None => throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d" .format(topicPartition, localBrokerId)) } } info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same) }
private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long = time.milliseconds): Boolean = { // maybeIncrementLeaderHW is in the hot path, the following code is written to // avoid unnecessary collection generation var newHighWatermark = leaderLog.logEndOffsetMetadata remoteReplicasMap.values.foreach { replica => // Note here we are using the "maximal", see explanation above if (replica.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset && (curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || isrState.maximalIsr.contains(replica.brokerId))) { newHighWatermark = replica.logEndOffsetMetadata } } leaderLog.maybeIncrementHighWatermark(newHighWatermark) match { case Some(oldHighWatermark) => debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark") true case None => def logEndOffsetString: ((Int, LogOffsetMetadata)) => String = { case (brokerId, logEndOffsetMetadata) => s"replica $brokerId: $logEndOffsetMetadata" } if (isTraceEnabled) { val replicaInfo = remoteReplicas.map(replica => (replica.brokerId, replica.logEndOffsetMetadata)).toSet val localLogInfo = (localBrokerId, localLogOrException.logEndOffsetMetadata) trace(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old value. " + s"All current LEOs are ${(replicaInfo + localLogInfo).map(logEndOffsetString)}") } false } }
在上面LeaderAndIsr 请求的时候提到,ReplicaManager接受到请求后,如果分区的leader的brokerId和本地的不一样,则改broker就是这个分区的follow副本,则会启动ReplicaFetcherThread线程不断发送fetch请求同步leader副本的数据。
上面是ReplicaFetchThread的继承图,ReplicaFetcherThread继承了ShutdownableThread,ShutdownableThread继承了AbstractFetcherThread类,其中AbstractFetcherThread实现了Thread的run方法
----------ShutdownableThread---------------------------------- def doWork(): Unit override def run(): Unit = { isStarted = true info("Starting") try { while (isRunning) doWork()//一直循环执行doWork,其中doWork是抽象方法,其实现在AbstractFetcherThread } catch { case e: FatalExitError => shutdownInitiated.countDown() shutdownComplete.countDown() info("Stopped") Exit.exit(e.statusCode()) case e: Throwable => if (isRunning) error("Error due to", e) } finally { shutdownComplete.countDown() } info("Stopped") } -----------AbstractFetcherThread-------------------------- override def doWork(): Unit = { maybeTruncate()//截取日志 maybeFetch() //向远程的leader分区拉取日志 }
可以理解ReplicaFetcherThread就一直处理截断日志和拉取远程leader的分区日志数据的过程
为了保证副本之间数据的一致性,kafka会进行日志截断操作,例如下图中的场景broker1是p1的leader,broker2是p1的follower:其中消息1、2、3这个消息都被成功写入并被broker2复制成功,此时生产者发送消息4过来,broker1刚写入到日志文件来没有等到broker2同步完就宕机了,随后broker2重新被选举为p1的leader。当broker1重启成为p1的follower时,消息4在broker1上存在但是在broker2上不存在这样就导致副本的数据不一致。所以kafka就在拉取日志的时候会先做截断日志的处理来保证副本的数据一致。
private def maybeTruncate(): Unit = { val (partitionsWithEpochs, partitionsWithoutEpochs) = fetchTruncatingPartitions() if (partitionsWithEpochs.nonEmpty) { truncateToEpochEndOffsets(partitionsWithEpochs) } if (partitionsWithoutEpochs.nonEmpty) { truncateToHighWatermark(partitionsWithoutEpochs) } } private def truncateToEpochEndOffsets(latestEpochsForPartitions: Map[TopicPartition, EpochData]): Unit = { val endOffsets = fetchEpochEndOffsets(latestEpochsForPartitions)//向远程leader partition 拉取罪行的endOffsets //Ensure we hold a lock during truncation. inLock(partitionMapLock) { //Check no leadership and no leader epoch changes happened whilst we were unlocked, fetching epochs val epochEndOffsets = endOffsets.filter { case (tp, _) => val curPartitionState = partitionStates.stateValue(tp) val partitionEpochRequest = latestEpochsForPartitions.getOrElse(tp, { throw new IllegalStateException( s"Leader replied with partition $tp not requested in OffsetsForLeaderEpoch request") }) val leaderEpochInRequest = partitionEpochRequest.currentLeaderEpoch curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch } val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions) handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets") updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets) } }
同步主分区的日志数据我们分客户端的处理(即follower分区的处理)和服务端处理(即leader分区的处理)
public FetchRequest build(short version) { if (version < 3) { maxBytes = DEFAULT_RESPONSE_MAX_BYTES; } FetchRequestData fetchRequestData = new FetchRequestData(); fetchRequestData.setReplicaId(replicaId); fetchRequestData.setMaxWaitMs(maxWait);//最多等待的时候,如果leader没有数据会等待一段时间在拉取数据 fetchRequestData.setMinBytes(minBytes);//为了保证拉取的性能,这里会做赞批处理 fetchRequestData.setMaxBytes(maxBytes);//如果延迟的消息过多,则会分多长拉取同步消息 ...... FetchRequestData.FetchPartition fetchPartition = new FetchRequestData.FetchPartition() .setPartition(topicPartition.partition()) .setCurrentLeaderEpoch(partitionData.currentLeaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))//当前的leaderEpoch .setLastFetchedEpoch(partitionData.lastFetchedEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH)) .setFetchOffset(partitionData.fetchOffset)//拉取的offset .setLogStartOffset(partitionData.logStartOffset)//lso .setPartitionMaxBytes(partitionData.maxBytes); fetchTopic.partitions().add(fetchPartition); } ..... return new FetchRequest(fetchRequestData, version); }
--------------processFetchRequest------------------------------------ if (responseData.nonEmpty) { // process fetched data inLock(partitionMapLock) { responseData.forKeyValue { (topicPartition, partitionData) => Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState => // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request. // In this case, we only want to process the fetch response if the partition state is ready for fetch and // the current offset is the same as the offset requested. val fetchPartitionData = sessionPartitions.get(topicPartition) if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) { partitionData.error match { case Errors.NONE => try { // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset, partitionData)//如果有同步的消息数据,则进行写消息处理 logAppendInfoOpt.foreach { logAppendInfo => val validBytes = logAppendInfo.validBytes val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset val lag = Math.max(0L, partitionData.highWatermark - nextOffset) fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
def updateFollowerFetchState(followerId: Int, followerFetchOffsetMetadata: LogOffsetMetadata, followerStartOffset: Long, followerFetchTimeMs: Long, leaderEndOffset: Long): Boolean = { getReplica(followerId) match { case Some(followerReplica) => val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L val prevFollowerEndOffset = followerReplica.logEndOffset followerReplica.updateFetchState( followerFetchOffsetMetadata, followerStartOffset, followerFetchTimeMs, leaderEndOffset) val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L val leaderLWIncremented = newLeaderLW > oldLeaderLW maybeExpandIsr(followerReplica, followerFetchTimeMs) val leaderHWIncremented = if (prevFollowerEndOffset != followerReplica.logEndOffset) { inReadLock(leaderIsrUpdateLock) { leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs)) } } else { false } if (leaderLWIncremented || leaderHWIncremented) tryCompleteDelayedRequests() true case None => false } } private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long = time.milliseconds): Boolean = { // maybeIncrementLeaderHW is in the hot path, the following code is written to // avoid unnecessary collection generation var newHighWatermark = leaderLog.logEndOffsetMetadata remoteReplicasMap.values.foreach { replica => // Note here we are using the "maximal", see explanation above if (replica.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset && (curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || isrState.maximalIsr.contains(replica.brokerId))) { newHighWatermark = replica.logEndOffsetMetadata } } leaderLog.maybeIncrementHighWatermark(newHighWatermark) match { case Some(oldHighWatermark) => debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark") true case None => def logEndOffsetString: ((Int, LogOffsetMetadata)) => String = { case (brokerId, logEndOffsetMetadata) => s"replica $brokerId: $logEndOffsetMetadata" } if (isTraceEnabled) { val replicaInfo = remoteReplicas.map(replica => (replica.brokerId, replica.logEndOffsetMetadata)).toSet val localLogInfo = (localBrokerId, localLogOrException.logEndOffsetMetadata) trace(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old value. " + s"All current LEOs are ${(replicaInfo + localLogInfo).map(logEndOffsetString)}") } false } }
参考:深入理解Kafka:核心设计和实践原理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。