赞
踩
将服务器添加到Kafka集群很容易,只需为其分配唯一的代理ID,然后在新服务器上启动Kafka。但是,不会为这些新服务器自动分配任何数据分区,因此,除非将分区移至它们,否则在创建新主题之前它们将不会做任何工作。因此,通常在将计算机添加到群集时,您将需要将一些现有数据迁移到这些计算机。
数据迁移过程是手动启动的,但完全自动化。在幕后,Kafka会将新服务器添加为要迁移的分区的跟随者,并允许其完全复制该分区中的现有数据。新服务器完全复制该分区的内容并加入同步副本后,现有副本之一将删除其分区的数据。
分区重新分配工具可用于在代理之间移动分区。理想的分区分配将确保所有代理之间的数据负载和分区大小均匀。分区重新分配工具没有能力自动研究Kafka群集中的数据分布,并四处移动分区以实现均匀的负载分布。因此,管理员必须弄清楚应该移动哪些主题或分区。
分区重新分配工具可以在3种互斥模式下运行:
创建一个分区为三,副本数为二的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic topic_test
查看topic情况
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_test
编写迁移文件
vi topics-to-move.json
topics-to-move.json:
{"topics": [{"topic": "topic_test"}], "version":1}
一旦json文件准备就绪,使用分区重新分配工具生成候选分配
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"topic_test","partition":2,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic_test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"topic_test","partition":0,"replicas":[0,1],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"topic_test","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic_test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic_test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]}]}
该工具生成一个候选分配,将所有分区从主题topic_test移动到代理1,2。但请注意,此时分区移动尚未开始,它只是告诉您当前的分配和建议的新分配。应保存当前分配,以防您想要回滚它。新的赋值应保
存在json文件中(例如expand-cluster-reassignment.json),以使用–execute选项输入到工具
保存当前分配
vi old-expand-cluster-reassignment.json
old-expand-cluster-reassignment.json:
{"version":1,"partitions":[{"topic":"topic_test","partition":2,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic_test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"topic_test","partition":0,"replicas":[0,1],"log_dirs":["any","any"]}]}
vi expand-cluster-reassignment.json
expand-cluster-reassignment.json:
{"version":1,"partitions":[{"topic":"topic_test","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic_test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic_test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]}]}
执行迁移
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
查看迁移结果
可以看到分区已经迁移成功
也可以通过命令查看结果
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
首先我们需要还原原本分区
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file old-expand-cluster-reassignment.json --execute
然后将副本2迁移至分区0中
编写迁移文件
vi increase-replication-factor.json
increase-replication-factor.json:
{"version":1,"partitions":[{"topic":"topic_test","partition":0,"replicas":[0,1,2]}]}
执行迁移
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
可以看到副本2已经成功迁移到分区1中。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。