赞
踩
Kafka 提供了一系列脚本用于命令行来操作 kafka。
创建一个名为 oldersix-topic 的 topic,副本数设置为3,分区数设置为2:
- bin/kafka-topics.sh \
- --create \
- --zookeeper 192.168.31.162:2181 \
- --replication-factor 3 \
- --partitions 2 \
- --topic oldersix-topic
- bin/kafka-topics.sh \
- --describe \
- --zookeeper 192.168.31.162:2181 \
- --topic oldersix-topic
我们来看下输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。
Kafka 生产者将消息发送到 topic 中去,同时负责选择将 message 发送到 topic 的哪一个partition中。通过 round-robin 做简单的负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多。
我们向新建的 oldersix-topic 中发送一些 message,kafka集群可以加上所有kafka节点
- bin/kafka-console-producer.sh \
- --broker-list 192.168.31.162:9092,192.168.31.162:9093,192.168.31.162:9094 \
- --topic oldersix-topic
传统的消息传递模式有2种:队列( queue) 和(publish-subscribe)
Kafka基于这2种模式提供了一种 consumer 的抽象概念:consumer group。
上图说明:由2个broker组成的kafka集群,某个主题总共有4个partition(P0-P3),分别位于不同的broker上。这个集群由2个Consumer Group消费, Consumer Group A 有2个consumer instances ,Consumer Group B有4个。
通常一个 topic 会有几个 Consumer Group ,每个 Consumer Group 都是一个逻辑上的订阅者( logical subscriber )。每个 Consumer Group 由多个 Consumer Instance 组成,从而达到可扩展和容灾的功能。
同一 Partion 的一条消息只能被同一个 Consumer Group 内的一个 Consumer 消费,但多个Consumer Group可同时消费这一消息。
一个 Partition 同一个时刻在一个 Consumer Group 中只能有一个 Consumer Instance 在消费,从而保证消费顺序。
Consumer Group 中的 Consumer Instance 的数量不能比一个 Topic 中的 partition 的数量多,否则,多出来的consumer消费不到消息。
Kafka 只在 Partition 的范围内保证消息消费的局部顺序性,不能在同一个 topic 中的多个partition中保证总的消费顺序性。
如果有在总体上保证消费顺序的需求,那么我们可以通过将 topic 的 partition 数量设置为1,将consumer group中的consumer instance数量也设置为1,但是这样会影响性能,所以kafka的顺序消费很少用。
- bin/kafka-console-consumer.sh \
- --bootstrap-server 192.168.31.162:9092,192.168.31.162:9093,192.168.31.162:9094 \
- --from-beginning --topic oldersix-topic
可以看到,在消费端,我们已经消费到了 Producer 发送的消息。
Kafka 的 commit log 的 partitions 分布在 kafka 集群中不同的 broker 上,每个 broker 都可以请求备份其他 broker 上 partition 上的数据。kafka 集群支持配置一个 partition 备份的数量。
针对每个 partition,都有一个 broker 起到 “leader” 角色作用,0 个或多个其他的 broker 作为“follwers”角色的作用。leader 处理所有的针对这个 partition 的读写请求,而 followers 被动复制 leader 的结果,不提供读写(主要是为了保证多副本数据与消费的一致性)。如果这个 leader 失效了,其中的一个 follower 将会自动的变成新的leader。
现在我们来测试我们容错性,因为 broker1目前是 oldersix-topic 的分区 0 的leader,所以我们要将其kill。
- # 查看 broker 1 进程号
- ps -ef | grep server.properties
- # kill 进程
- kill 2346
可以看到 zookeeepr 的节点中已经没有 broker1 了。
现在再执行命令:
- bin/kafka-topics.sh \
- --describe --zookeeper 192.168.31.162:2181 \
- --topic oldersix-topic
我们可以看到,分区 0 的 leader 节点已经变成了broker 2。要注意的是,在 Isr 中,已经没有了 1 号 broker 节点。leader的选举也是从ISR(in-sync replica)中进行的。
此时,我们依然可以 消费新消息:
- bin/kafka-console-consumer.sh \
- --bootstrap-server 192.168.31.162:9092,192.168.31.162:9093,192.168.31.162:9094 \
- --from-beginning --topic oldersix-topic
查看主题分区对应的 leader 信息:
get /brokers/topics/oldersix-topic/partitions/1/state
kafka 将很多集群关键信息记录在 zookeeper 里,保证自己的无状态,从而在水平扩容时非常方便。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。