当前位置:   article > 正文

Kafka分区副本分配规则_kafka副本指定

kafka副本指定

Kafka分区副本分配规则

1、前言

 我们在创建topic或者是新增分区时,如果不指定分区副本的分配方式,Kafka会自动帮我们分配,那Kafka是如何帮我们分配的呢?我们如果指定分区副本的分配方式,Kafka会做哪些事情呢?今天我们就来介绍一下。

2、自动分配

Kafka分区副本自动分配在三个地方用到,它们分别是:

  1. 创建topic时
  2. topic新增分区时
  3. 使用脚本自动重分配副本时

我们可以通过配置启动类参数生成分配策略

--zookeeper xxxx:2181 --topics-to-move-json-file config/move-json-file.json --broker-list "0,1,2,3" --generate  
move-json-file.json文件中内容为:
{
  "topics":[
    {"topic":"topicTest"}
  ],
 "version":1
}
以上参数表示为对topicTest重分配,希望分配到0123这几个broker上

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

重分配的源码是在ReassignPartitionsCommand.generateAssignment中

def generateAssignment(zkClient: KafkaZkClient, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicPartition, Seq[Int]], Map[TopicPartition, Seq[Int]]) = {
    // 解析文件内容,获取要重分配的topic
    val topicsToReassign = parseTopicsData(topicsToMoveJsonString)
    // topic去重,如果存在重复,则抛出异常
    val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
    if (duplicateTopicsToReassign.nonEmpty)
      throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
    // 从zk节点/brokers/topics/{topicName}获取topic分区副本当前的分配方式
    val currentAssignment = zkClient.getReplicaAssignmentForTopics(topicsToReassign.toSet)

    // 按topic将当前副本分配情况分组
    val groupedByTopic = currentAssignment.groupBy { case (tp, _) => tp.topic }
    // 判断是否启用机架感知模式
    val rackAwareMode = if (disableRackAware) RackAwareMode.Disabled else RackAwareMode.Enforced
    val adminZkClient = new AdminZkClient(zkClient)
    // 从zk节点brokers/ids中获取所有在线节点,并和传入的brokers集合取交集,如果没有交集则取所有在线节点
    val brokerMetadatas = adminZkClient.getBrokerMetadatas(rackAwareMode, Some(brokerListToReassign))

    val partitionsToBeReassigned = mutable.Map[TopicPartition, Seq[Int]]()
    groupedByTopic.foreach { case (topic, assignment) =>
      // 副本数量是第一个分区的副本数量,需要注意的是每个分区副本的分配数量一般相同,但也可设置为不同
      val (_, replicas) = assignment.head
      // 具体的分配算法,包括有机架感知方式和无机架感知方式
      val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
      partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) =>
        new TopicPartition(topic, partition) -> replicas
      }
    }
    (partitionsToBeReassigned, currentAssignment)
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

分区副本具体的分配算法有两种:无机架方式和有机架方式,
这两个算法的分配原则为:

  1. 尽量将副本平均分配在所有的broker上
  2. 每个broker上分配到的leader副本尽可能一样多
  3. 分区的多个副本被分配在不同的broker上
  4. 有机架分配方式要求集群中每个broker都要有机架信息否则抛出异常

下面将分别介绍这两种方式

a. 无机架方式分配

算法比较复杂,具体的分配方式可以看https://www.szzdzhp.com/kafka/Source_code/source-fenpei-rule.html

b. 有机架方式分配

(1)机架介绍
  • 机架(rack)相当于一个组,该组里面可能有多个broker,单个broker只属于一个组
  • 通过修改server.properties来指定broker属于哪个特定的组(机架)
broker.rack=rackName
  • 1
(2)有机架方式分配的目的

 这相当于broker的分组能力,可以将不同组的brokers分配到不同的区域中,以提高单个区域发生故障时整个集群的可用性,即容灾

(3)分配规则

 类似无机架分配方式,算法尽量将单个分区的每个副本分配到不同的组(机架)内,最多会分配到min(rackSize, replication-factor) 个不同的组中
源码在AdminUtils.assignReplicasToBrokersRackAware中,感兴趣的同学可以研究一下

c. 问题

 上面说过,创建topic时也会进行分区副本的分配,但是传进的brokerList是无序的。新增分区分配副本时传进的brokerList是有序的,而且保留了原来的分区副本分配方式,重分配副本只是在新的分区上进行,但是为了保证新分区副本近可能均匀分到各个broker上,新分区副本分配是在创建topic基础之上继续进行,问题在于创建topic时的副本分配传入的brokerList是无序的,而新增分区时的副本分配传入的brokerList是有序的,这就导致新增分区的副本在各个broker上的分配可能是不均匀的
这个问题详细说明可以移步https://www.szzdzhp.com/kafka/other/maybe-bug1.html

3、指定分配规则分配

可参考我的另一篇文章Kafka分区副本重分配源码分析

参考文献

  1. https://blog.csdn.net/zuodaoyong/article/details/105069133
  2. http://kane-xie.github.io/2016/08/05/2016-08-05_Kafka%E6%9C%BA%E6%9E%B6%E6%84%9F%E7%9F%A5/
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/742104?site
推荐阅读
相关标签
  

闽ICP备14008679号