Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。
例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具有 100 个 partition 的 Topic 。正常情况下,kafka 会为每个 Consumer 平均的分配 5 个分区。这个分配的过程就是 Rebalance
以下只针对 RangeAssignor 策略
class RangeAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId] val partitionAssignment = new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory)) for (topic <- ctx.myTopicThreadIds.keySet) { val curConsumers = ctx.consumersForTopic(topic) val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic) val nPartsPerConsumer = curPartitions.size / curConsumers.size val nConsumersWithExtraPart = curPartitions.size % curConsumers.size info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions + " for topic " + topic + " with consumers: " + curConsumers) for (consumerThreadId <- curConsumers) { val myConsumerPosition = curConsumers.indexOf(consumerThreadId) assert(myConsumerPosition >= 0) val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) /** * Range-partition the sorted partitions to consumers for better locality. * The first few consumers pick up an extra partition, if any. */ if (nParts <= 0) warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) else { for (i <- startPart until startPart + nParts) { val partition = curPartitions(i) info(consumerThreadId + " attempting to claim partition " + partition) // record the partition ownership decision val assignmentForConsumer = partitionAssignment.getAndMaybePut(consumerThreadId.consumer) assignmentForConsumer += (TopicAndPartition(topic, partition) -> consumerThreadId) } } } }
def getPartitionsForTopics(topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
val topic = topicAndPartitionMap._1
val partitionMap = topicAndPartitionMap._2
debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
(topic -> partitionMap.keys.toSeq.sortWith((s,t) => s < t))
def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = { val dirs = new ZKGroupDirs(group) val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir) val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]] for (consumer <- consumers) { val topicCount = TopicCount.constructTopicCount(group, consumer, this, excludeInternalTopics) for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) { for (consumerThreadId <- consumerThreadIdSet) consumersPerTopicMap.get(topic) match { case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers) case _ => consumersPerTopicMap.put(topic, List(consumerThreadId)) } } } for ( (topic, consumerList) <- consumersPerTopicMap ) consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t)) consumersPerTopicMap }
nPartsPerConsumer=10/3 =3
nConsumersWithExtraPart=10%3 =1
myConsumerPosition= curConsumers.indexof(aaa) =0
startPart= 3*0+0.min(1) = 0
nParts = 3+(if (0 + 1 > 1) 0 else 1)=3+1=4
public static void main(String[] args) { int partitionNums = 8; int consumerNums = 5; cal_1(partitionNums, consumerNums); } private static void cal_1(int partitionNums, int consumerNums) { int nPartsPerConsumer = partitionNums / consumerNums; int nConsumersWithExtraPart = partitionNums % consumerNums; for (int i = 0; i < consumerNums; i++) { int startPart = nPartsPerConsumer * i + Math.min(i, nConsumersWithExtraPart); int nParts = nPartsPerConsumer + ((i + 1 > nConsumersWithExtraPart) ? 0 : 1); System.out.println("consumer : " + i + " -> [" + startPart + ", " + (startPart + nParts - 1) + "]"); } } ``` 参考: https://www.cnblogs.com/dongxiao-yang/p/6238029.html https://blog.csdn.net/wingofeagle/article/details/60966125 [《Kafka对reblance的优化,你了解嘛》](http://www.pianshen.com/article/6904702793/) [《关于kafka中ISR、AR、HW、LEO、LSO、LW的含义详解》](https://blog.csdn.net/lukabruce/article/details/101012815)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。