赞
踩
文章中提供了kafka单机模式和集群模式两种部署方式。在案例中采用的kafka版本是2.11-1.0.0,部署在一台linux服务器中,各个版本中略有差异,同学们在使用过程中注意甄别。另外文中部署方式主要参考kafka中文官网内容。有兴趣的同学可以单独去研究一番。
另外:文中的大部分内容都是从官网照搬过来的,之所以这样做是为方便自己添加一些自己的理解,同时也是因为中文翻译的不是很完善,不利于新手理解。
下载 2.11-1.0.0版本并解压缩。.
注意:
kafak需要java运行环境,所以安装之前需要先检查自己的机器是否已经正确安装java。同时建议java版本在1.8以上。
> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0
Kafka 使用 ZooKeeper 如果你还没有ZooKeeper服务器,你需要先启动一个ZooKeeper服务器。 您可以通过与kafka打包在一起的便捷脚本来快速简单地创建一个单节点ZooKeeper实例。
单地创建一个单节点ZooKeeper实例。
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2022-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
现在启动Kafka服务器:
> bin/kafka-server-start.sh config/server.properties
[2022-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2022-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
现在我们的kafka单机版就已经启动完毕了。总结起来先启动zookeeper,再启动kafka服务即可。
让我们创建一个名为“test”的topic,它有一个分区和一个副本:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
现在我们可以运行list(列表)命令来查看这个topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
或者,您也可将broker配置为:在发布的topic不存在时,自动创建topic,而不是手动创建。
Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。
运行 producer,然后在控制台输入一些消息以发送到服务器。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
如果您将上述命令在不同的终端中运行,那么现在就可以将消息输入到生产者终端中,并将它们在消费终端中显示出来。
所有的命令行工具都有其他选项;运行不带任何参数的命令将显示更加详细的使用信息。
到此,一个简单的单机版kafka就已经安装完毕了。
对 Kafka来说,单个broker只是一个大小为一的集群,除了启动更多的broker实例外,没有什么变化。 为了深入了解它,让我们把集群扩展到三个节点(仍然在本地机器上)。
首先,为每个boker创建一个配置文件
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
现在编辑这些新文件并设置如下属性:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id属性是集群中每个节点的名称,这一名称是整数且唯一且永久的。我们必须修改端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有的broker在同一个端口注册,或者覆盖彼此的数据。
我们已经建立Zookeeper和一个单节点了,现在我们只需要启动两个新的节点。
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
注意 :
启动命令后面加了符号 ‘&’ 作用为了时程序放在后台运行。
如果是多节点的zookeeper来说,还需要在kafka配置文件中的zookeeper.connect属性中添加其他zookeeper节点地址,例如:
zookeeper.connect = 127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
现在创建一个副本为3的新topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
现在我们有一个集群,但是我们怎么才能知道那些代理在做什么呢?运行"describe topics"命令来查看:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
以下是对输出信息的解释。第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。
请注意,在示例中,节点1是该主题中唯一分区的领导者。Partition,Leader,Replicas后面的数字代表broker.id对应的属性。
我们可以在已创建的原始主题上运行相同的命令来查看它的位置:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
原来的主题没有副本且在服务器0上。我们创建集群时,这是唯一的服务器。
让我们发表一些信息给我们的新topic:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
现在我们来消费这些消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
让我们来测试一下容错性。 Broker 1 现在是 leader,让我们来停掉这个服务:
> ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564
领导权已经切换到一个从属节点,而且节点1也不在同步副本集中了:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
注意:
如果这时我们重新启动一个新的broker(broker.id属性值与前面几个不同),对于原有的topic(my-replicated-topic)来说,这个新的broker也不是承担任何读写功能。
不过,即便原先写入消息的leader已经不在,这些消息仍可用于消费:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
对于kafka集群部署,一般把kafak服务部署在三台独立的服务器中,topic一般设置为3个分区2个副本。这样即使其中一台服务器宕机也不影响kafak的正常使用。如果有可能还可以通过服务监控工具(如:supervisord)对服务运行状态进行监控。
如果你觉得文章对你有帮助请点赞收藏加关注。博客内容会持续更新中。谢谢!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。