当前位置:   article > 正文

kafka consumer 分区 reblance 算法_rebalance 算法

rebalance 算法

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。

例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具有 100 个 partition 的 Topic 。正常情况下,kafka 会为每个 Consumer 平均的分配 5 个分区。这个分配的过程就是 Rebalance

以下只针对 RangeAssignor 策略

1. kafka reblance 源码

class RangeAssignor() extends PartitionAssignor with Logging {

def assign(ctx: AssignmentContext) = {
val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
val partitionAssignment =
  new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
for (topic <- ctx.myTopicThreadIds.keySet) {
  val curConsumers = ctx.consumersForTopic(topic)
  val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)

  val nPartsPerConsumer = curPartitions.size / curConsumers.size
  val nConsumersWithExtraPart = curPartitions.size % curConsumers.size

  info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
	" for topic " + topic + " with consumers: " + curConsumers)

  for (consumerThreadId <- curConsumers) {
	val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
	assert(myConsumerPosition >= 0)
	val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
	val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)

	/**
	 *   Range-partition the sorted partitions to consumers for better locality.
	 *  The first few consumers pick up an extra partition, if any.
	 */
	if (nParts <= 0)
	  warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
	else {
	  for (i <- startPart until startPart + nParts) {
		val partition = curPartitions(i)
		info(consumerThreadId + " attempting to claim partition " + partition)
		// record the partition ownership decision
		val assignmentForConsumer = partitionAssignment.getAndMaybePut(consumerThreadId.consumer)
		assignmentForConsumer += (TopicAndPartition(topic, partition) -> consumerThreadId)
	  }
	}
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
 def getPartitionsForTopics(topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
    getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
      val topic = topicAndPartitionMap._1
      val partitionMap = topicAndPartitionMap._2
      debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
      (topic -> partitionMap.keys.toSeq.sortWith((s,t) => s < t))
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = {
    val dirs = new ZKGroupDirs(group)
    val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
    val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]]
    for (consumer <- consumers) {
      val topicCount = TopicCount.constructTopicCount(group, consumer, this, excludeInternalTopics)
      for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {
        for (consumerThreadId <- consumerThreadIdSet)
          consumersPerTopicMap.get(topic) match {
            case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
            case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
          }
      }
    }
    for ( (topic, consumerList) <- consumersPerTopicMap )
      consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
    consumersPerTopicMap
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

举例说明,一个拥有十个分区的topic,相同group拥有三个consumerid为aaa,ccc,bbb的消费者

  1. 由后两段代码可知,获取consumerid列表和partition分区列表都是已经排好序的,所以

    curConsumers=(aaa,bbb,ccc)

    curPartitions=(0,1,2,3,4,5,6,7,8,9)

  2. 整除/取余

    nPartsPerConsumer=10/3 =3

    nConsumersWithExtraPart=10%3 =1

  3. 假设当前客户端id为aaa

    myConsumerPosition= curConsumers.indexof(aaa) =0

  4. 计算分区范围

    startPart= 3*0+0.min(1) = 0

    nParts = 3+(if (0 + 1 > 1) 0 else 1)=3+1=4

    所以aaa对应的分区号为[0,4),即0,1,2,3前面四个分区

    同理可得bbb对应myConsumerPosition=1,对应分区4,5,6中间三个分区

    ccc对应myConsumerPosition=2,对应7,8,9最后三个分区

2. 速算

public static void main(String[] args) {

	int partitionNums = 8;
	int consumerNums = 5;

	cal_1(partitionNums, consumerNums);
}

private static void cal_1(int partitionNums, int consumerNums) {
	int nPartsPerConsumer = partitionNums / consumerNums;
	int nConsumersWithExtraPart = partitionNums % consumerNums;

	for (int i = 0; i < consumerNums; i++) {
		int startPart = nPartsPerConsumer * i + Math.min(i, nConsumersWithExtraPart);
		int nParts = nPartsPerConsumer + ((i + 1 > nConsumersWithExtraPart) ? 0 : 1);

		System.out.println("consumer : " + i + " -> [" + startPart + ", " + (startPart + nParts - 1) + "]");
	}
}
``` 

参考:
https://www.cnblogs.com/dongxiao-yang/p/6238029.html
https://blog.csdn.net/wingofeagle/article/details/60966125
[《Kafka对reblance的优化,你了解嘛》](http://www.pianshen.com/article/6904702793/)
[《关于kafka中ISR、AR、HW、LEO、LSO、LW的含义详解》](https://blog.csdn.net/lukabruce/article/details/101012815)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/764328
推荐阅读
相关标签
  

闽ICP备14008679号