赞
踩
1)在由于 Producer 和 Consumer 都只会与 Leader 角色的分区副本相连,所以 kafka 需要以集群的组织形式提供主题下的消息高可用。kafka 支持主备复制,所以消息具备高可用和持久性。
2)kafka 一个分区可以有多个副本,这些副本保存在不同的 broker 上。每个分区的副本中都会有一个作为 Leader。当一个 broker 失败时,Leader 在这台 broker 上的分区都会变得不可用,kafka 会自动移除 Leader,再其他副本中选一个作为新的 Leader。
3)在通常情况下,增加分区可以提供 kafka 集群的吞吐量。然而,也应该意识到集群的总分区数或是单台服务器上的分区数过多,会增加不可用及延迟的风险。
# 切换目录
cd /usr/local/kafka/
# 拷贝三份 kafka_2.12-2.8.0 分别命名为:kafka-01, kafka-02, kafka-03
cp -rf kafka_2.12-2.8.0 ./kafka-01
cp -rf kafka_2.12-2.8.0 ./kafka-02
cp -rf kafka_2.12-2.8.0 ./kafka-03
broker.id=0, log.dirs=/usr/local/kafka/kafka-01/logs, port=9092
# 切换目录 cd /usr/local/kafka/ # 修改 kafka-01 的配置文件 kafka-01/config/server.properties vim kafka-01/config/server.properties # 修改以下几个配置: broker.id=0 log.dirs=/usr/local/kafka/kafka-01/logs # listeners=PLAINTEXT://localhost:9092 # host.name=localhost # port=9092 # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110): listeners=PLAINTEXT://172.18.30.110:9092
broker.id=1, log.dirs=/usr/local/kafka/kafka-02/logs, port=9093
# 切换目录 cd /usr/local/kafka/ # 修改 kafka-02 的配置文件 kafka-02/config/server.properties vim kafka-02/config/server.properties # 修改以下几个配置: broker.id=1 log.dirs=/usr/local/kafka/kafka-02/logs # listeners=PLAINTEXT://localhost:9093 # host.name=localhost # port=9093 # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110): listeners=PLAINTEXT://172.18.30.110:9093
broker.id=2, log.dirs=/usr/local/kafka/kafka-03/logs, port=9094
# 切换目录 cd /usr/local/kafka/ # 修改 kafka-03 的配置文件 kafka-03/config/server.properties vim kafka-03/config/server.properties # 修改以下几个配置: broker.id=2 log.dirs=/usr/local/kafka/kafka-03/logs # listeners=PLAINTEXT://localhost:9094 # host.name=localhost # port=9094 # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110): listeners=PLAINTEXT://172.18.30.110:9094
删除命令:rm -rf logs/*
# 切换目录
cd /usr/local/kafka/
# 删除 节点1:kafka-01 以前的日志文件
rm -rf kafka-01/logs/*
# 删除 节点2:kafka-02 以前的日志文件
rm -rf kafka-02/logs/*
# 删除 节点3:kafka-03 以前的日志文件
rm -rf kafka-03/logs/*
启动命令:bin/kafka-server-start.sh config/server.properties
# 切换目录
cd /usr/local/kafka/
# 启动 节点1:kafka-01
kafka-01/bin/kafka-server-start.sh kafka-01/config/server.properties
# 启动 节点2:kafka-02
kafka-02/bin/kafka-server-start.sh kafka-02/config/server.properties
# 启动 节点3:kafka-03
kafka-03/bin/kafka-server-start.sh kafka-03/config/server.properties
创建命令:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic heima-par --partitions 3 --replication-factor 3
# 切换目录
cd /usr/local/kafka/
# 创建集群
kafka-01/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic heima-par --partitions 3 --replication-factor 3
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
kafka-01/bin/kafka-topics.sh --create --zookeeper 172.18.30.110:2181 --topic heima-par --partitions 3 --replication-factor 3
# 查看新创建的主题 heima-par 的详细信息
kafka-01/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic heima-par
添加分区命令:
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic heima-par --partitions 4
# 切换目录
cd /usr/local/kafka/
# 创建集群
kafka-01/bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic heima-par --partitions 4
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
kafka-01/bin/kafka-topics.sh --alter --zookeeper 172.18.30.110:2181 --topic heima-par --partitions 4
# 查看新创建的主题 heima-par 的详细信息
kafka-01/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic heima-par
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
kafka-01/bin/kafka-topics.sh --describe --zookeeper 172.18.30.110:2181 --topic heima-par
添加 brokder 节点 命令:
cp -rf kafka-01 ./kafka-04
# 切换目录
cd /usr/local/kafka/
# 添加 brokder 节点
cp -rf kafka-01 ./kafka-04
broker.id=3, log.dirs=/usr/local/kafka/kafka-04/logs, port=9095
# 切换目录 cd /usr/local/kafka/ # 修改 kafka-04 的配置文件 kafka-04/config/server.properties vim kafka-04/config/server.properties # 修改以下几个配置: broker.id=3 log.dirs=/usr/local/kafka/kafka-04/logs # listeners=PLAINTEXT://localhost:9095 # host.name=localhost # port=9095 # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110): listeners=PLAINTEXT://172.18.30.110:9095
删除命令:rm -rf logs/*
# 切换目录 cd /usr/local/kafka/ # 删除 节点1:kafka-01 以前的日志文件 rm -rf kafka-01/logs/* # 删除 节点2:kafka-02 以前的日志文件 rm -rf kafka-02/logs/* # 删除 节点3:kafka-03 以前的日志文件 rm -rf kafka-03/logs/* # 删除 节点4:kafka-04 以前的日志文件 rm -rf kafka-04/logs/*
启动命令:bin/kafka-server-start.sh config/server.properties
# 切换目录 cd /usr/local/kafka/ # 启动 节点1:kafka-01 kafka-01/bin/kafka-server-start.sh kafka-01/config/server.properties # 启动 节点2:kafka-02 kafka-02/bin/kafka-server-start.sh kafka-02/config/server.properties # 启动 节点3:kafka-03 kafka-03/bin/kafka-server-start.sh kafka-03/config/server.properties # 启动 节点4:kafka-04 kafka-04/bin/kafka-server-start.sh kafka-04/config/server.properties
# 切换目录
cd /usr/local/kafka/
# 查看新创建的主题 heima-par 的详细信息
kafka-01/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic heima-par
# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
kafka-01/bin/kafka-topics.sh --describe --zookeeper 172.18.30.110:2181 --topic heima-par
将原先分布在 broker 1-3 节点上的分区重新分布到 broker 1-4 节点上,需要借助 kafka-reassign-partitions.sh 工具生成 reassign plan ,
# 切换目录
cd /usr/local/kafka/
# 定义一个 reassign.json 文件
vim kafka-01/reassign.json
# 文件内容如下
{"topics":[{"topic":"heima-par"}],
"version":1
}
命令:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file reassign.json --broker-list “0,1,2,3” --generate
命令参数说明:
-generate :表示指定类型参数。
–topics-to-move-json-file : 指定分区重份配对应的主题清单路径。
# 切换目录 cd /usr/local/kafka/ # 使用 kafka-reassign-partitions.sh 工具生成 reassign plan kafka-01/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file kafka-01/reassign.json --broker-list "0,1,2,3" --generate # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110): kafka-01/bin/kafka-reassign-partitions.sh --zookeeper 172.18.30.110:2181 --topics-to-move-json-file kafka-01/reassign.json --broker-list "0,1,2,3" --generate # 执行完命令会生成2个字符串。 Current partition replica assignment {"version":1,"partitions":[{"topic":"heima-par","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"heima-par","partition":1,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"heima-par","partition":2,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"heima-par","partition":3,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"heima-par","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"heima-par","partition":1,"replicas":[0,2,3],"log_dirs":["any","any","any"]},{"topic":"heima-par","partition":2,"replicas":[1,3,0],"log_dirs":["any","any","any"]},{"topic":"heima-par","partition":3,"replicas":[2,0,1],"log_dirs":["any","any","any"]}]}
注意:第二步命令输出两个 json 字符串,第一个 JSON 内容为当前的分区副本分配情况,第二个为重新分配的候选方案,注意这里只是生成一份可行性的方案,并没有真正执行重分配的动作。
我们将第二个 JSON 内容保存到名为 result.json 文件里面(文件名不重要,文件格式也不一定要以json为结尾,只要保证内容是 json 即可),然后执行这些 reassign plan:
# 切换目录
cd /usr/local/kafka/
# 创建并编辑 result.json 文件。
vim kafka-01/result.json
## 粘贴以下内容:
{"version":1,"partitions":[{"topic":"heima-par","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"heima-par","partition":1,"replicas":[0,2,3],"log_dirs":["any","any","any"]},{"topic":"heima-par","partition":2,"replicas":[1,3,0],"log_dirs":["any","any","any"]},{"topic":"heima-par","partition":3,"replicas":[2,0,1],"log_dirs":["any","any","any"]}]}
命令:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file result.json --execute
命令参数说明:
–execute :执行命令。
–reassignment-json-file : 指定分区重分配 json 文件。
# 切换目录 cd /usr/local/kafka/ # 使用 kafka-reassign-partitions.sh 工具生成 reassign plan kafka-01/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file kafka-01/result.json --execute # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110): kafka-01/bin/kafka-reassign-partitions.sh --zookeeper 172.18.30.110:2181 --reassignment-json-file kafka-01/result.json --execute # 查看执行进度 kafka-01/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file kafka-01/result.json --verify # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110): kafka-01/bin/kafka-reassign-partitions.sh --zookeeper 172.18.30.110:2181 --reassignment-json-file kafka-01/result.json --verify # 查看新创建的主题 heima-par 的详细信息 kafka-01/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic heima-par # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110): kafka-01/bin/kafka-topics.sh --describe --zookeeper 172.18.30.110:2181 --topic heima-par
RangeAssignor 策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个 topic,RangeAssignor 策略会将消费组内所有订阅这个 topic 的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。
RoundRobinAssignor 策略的原理是将消费组内所有消费者以及消费者所订阅的所有 topic 的 partition 按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者。RoundRobinAssignor 策略对应的 partition.assignment.strategy 参数值为: org.apache.kafka.clients.consumer.RoundRobinAssignor。
上一节关联链接请点击
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。