赞
踩
现在在用kafka集群有3个节点,即zk1、zk2、zk3,现需要将broker节点扩容至4个,以提供更高的数据处理能力。
1.1 新扩容1个broker节点zk4 ,并部署kafka应用程序;我是虚拟机,全部在一台上跑的测试,只启用了一个zookeeper服务没启用zookeeper集群,但是和实际应用差不多。
cat ../config/server1.properties
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka_2.10-0.10.1.0/kafka-logs #因为是在一台上面部署了4个实例,所以修改了不同的数据路径
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.cleanup.policy=delete
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
broker.id=1
listeners=PLAINTEXT://zk1:9092
advertised.listeners=PLAINTEXT://zk1:9092
default.replication.factor=2
delete.topic.enable=true
message.max.byte=5242880
replica.fetch.max.bytes=5242880
zookeeper.connect=zk1:2181
#auto.leader.rebalance.enable=true #自动平衡各个broker的leader分布
##ZK2、ZK3、ZK4配置,基本一样,修改broker.id、listeners、advertised.listeners
1.2 查看集群中当前所有可用的topic和指定topic的详细信息,同步前的分区和副本状态。
./kafka-topics.sh --list --zookeeper zk1:2181
__consumer_offsets
test6test7
./kafka-topics.sh --describe --zookeeper zk1:2181 --topic test6
Topic:test6 PartitionCount:12 ReplicationFactor:3 Configs:
Topic: test6 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test6 Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1,3
Topic: test6 Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 2,1,3
Topic: test6 Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1,2,3
Topic: test6 Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: test6 Partition: 5 Leader: 3 Replicas: 3,2,1 Isr: 2,1,3
Topic: test6 Partition: 6 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test6 Partition: 7 Leader: 2 Replicas: 2,3,1 Isr: 2,1,3
Topic: test6 Partition: 8 Leader: 3 Replicas: 3,1,2 Isr: 2,1,3
Topic: test6 Partition: 9 Leader: 1 Replicas: 1,3,2 Isr: 1,2,3
Topic: test6 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: test6 Partition: 11 Leader: 3 Replicas: 3,2,1 Isr: 2,1,3
1.3 kafka集群分区日志迁移
扩容迁移的目的就是把原来topic为test6分布在3个节点上的topic(12partitions, 3replicas),将全部partitions重新分布到全部的6个节点上去。
topics-to-move-test6.json文件:
cat << EOF > topic-to-move.json
{"topics": [{"topic": "test6"}],
"version":1}
EOF
使用–generate生成迁移计划:
./kafka-reassign-partitions.sh --zookeeper zk1:2181 --topics-to-move-json-file ./topic-to-move.json --broker-list "1,2,3,4" --generate
Current partition replica assignment{"version":1,"partitions":[{"topic":"test6","partition":11,"replicas":[3,2,1]},{"topic":"test6","partition":9,"replicas":[1,3,2]},{"topic":"test6","partition":3,"replicas":[1,3,2]},{"topic":"test6","partition":7,"replicas":[2,3,1]},{"topic":"test6","partition":6,"replicas":[1,2,3]},{"topic":"test6","partition":10,"replicas":[2,1,3]},{"topic":"test6","partition":4,"replicas":[2,1,3]},{"topic":"test6","partition":2,"replicas":[3,1,2]},{"topic":"test6","partition":8,"replicas":[3,1,2]},{"topic":"test6","partition":0,"replicas":[1,2,3]},{"topic":"test6","partition":1,"replicas":[2,3,1]},{"topic":"test6","partition":5,"replicas":[3,2,1]}]}
Proposed partition reassignment configuration{"version":1,"partitions":[{"topic":"test6","partition":11,"replicas":[2,1,3]},{"topic":"test6","partition":9,"replicas":[4,3,1]},{"topic":"test6","partition":3,"replicas":[2,3,4]},{"topic":"test6","partition":6,"replicas":[1,3,4]},{"topic":"test6","partition":7,"replicas":[2,4,1]},{"topic":"test6","partition":10,"replicas":[1,4,2]},{"topic":"test6","partition":4,"replicas":[3,1,2]},{"topic":"test6","partition":8,"replicas":[3,2,4]},{"topic":"test6","partition":2,"replicas":[1,2,3]},{"topic":"test6","partition":0,"replicas":[3,4,1]},{"topic":"test6","partition":1,"replicas":[4,1,2]},{"topic":"test6","partition":5,"replicas":[4,2,3]}]}
生成一个向broker 1,2,3,4迁移指定topic为test6的迁移计划。输出内容中包括当前的分布配置和即将改变后的分布配置。将以上命令的输出内容保存为json文件。其中当前分布配置备份为backup-test6.json,改变后的分布配置保存为expand-cluster-reassignment-test6.json。
使用–execute执行迁移计划:
./kafka-reassign-partitions.sh --zookeeper zk1:2181 --reassignment-json-file ./expand-cluster-reassignment.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test6","partition":11,"replicas":[3,2,1]},{"topic":"test6","partition":9,"replicas":[1,3,2]},{"topic":"test6","partition":3,"replicas":[1,3,2]},{"topic":"test6","partition":7,"replicas":[2,3,1]},{"topic":"test6","partition":6,"replicas":[1,2,3]},{"topic":"test6","partition":10,"replicas":[2,1,3]},{"topic":"test6","partition":4,"replicas":[2,1,3]},{"topic":"test6","partition":2,"replicas":[3,1,2]},{"topic":"test6","partition":8,"replicas":[3,1,2]},{"topic":"test6","partition":0,"replicas":[1,2,3]},{"topic":"test6","partition":1,"replicas":[2,3,1]},{"topic":"test6","partition":5,"replicas":[3,2,1]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
使用–verify进行迁移结果的验证:
./kafka-reassign-partitions.sh --zookeeper 172.18.6.169:2181 --reassignment-json-file ./expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [test6,2] completed successfully
Reassignment of partition [test6,1] completed successfully
Reassignment of partition [test6,8] completed successfully
Reassignment of partition [test6,10] completed successfully
Reassignment of partition [test6,0] completed successfully
Reassignment of partition [test6,7] completed successfully
Reassignment of partition [test6,11] completed successfully
Reassignment of partition [test6,9] completed successfully
Reassignment of partition [test6,3] completed successfully
Reassignment of partition [test6,4] completed successfully
Reassignment of partition [test6,6] completed successfully
Reassignment of partition [test6,5] completed successfully
查看重分布结果:
./kafka-topics.sh --describe --zookeeper zk1:2181 --topic test6
Topic:test6 PartitionCount:12 ReplicationFactor:3 Configs:
Topic: test6 Partition: 0 Leader: 1 Replicas: 3,4,1 Isr: 1,3,4
Topic: test6 Partition: 1 Leader: 4 Replicas: 4,1,2 Isr: 2,1,4
Topic: test6 Partition: 2 Leader: 3 Replicas: 1,2,3 Isr: 2,1,3
Topic: test6 Partition: 3 Leader: 2 Replicas: 2,3,4 Isr: 2,3,4
Topic: test6 Partition: 4 Leader: 2 Replicas: 3,1,2 Isr: 2,1,3
Topic: test6 Partition: 5 Leader: 4 Replicas: 4,2,3 Isr: 2,3,4
Topic: test6 Partition: 6 Leader: 1 Replicas: 1,3,4 Isr: 1,3,4
Topic: test6 Partition: 7 Leader: 2 Replicas: 2,4,1 Isr: 2,1,4
Topic: test6 Partition: 8 Leader: 3 Replicas: 3,2,4 Isr: 2,3,4
Topic: test6 Partition: 9 Leader: 4 Replicas: 4,3,1 Isr: 1,3,4
Topic: test6 Partition: 10 Leader: 2 Replicas: 1,4,2 Isr: 2,1,4
Topic: test6 Partition: 11 Leader: 3 Replicas: 2,1,3 Isr: 2,1,3
可以看到ISR显示已经将扩容进来的ZK4的broker副本同步了,对应的Partition的Leader也已经均衡到了新的broker上了。(实际应用中可能没这么快完成同步状态和leader的均衡重分布,根据数据量大小等来决定)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。