赞
踩
1、 下载安装包
http://kafka.apache.org/downloads
注意要下载二进制版本
2、 解压并进入Kafka目录,笔者:D:\Kafka\kafka_2.13-2.7.1
3、 进入config目录找到文件server.properties并打开
4、 找到并编辑log.dirs=D:\Kafka\kafka_2.13-2.7.1\kafka-logs
5、 找到并编辑zookeeper.connect=localhost:2181
6、 Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181
7、 进入Kafka安装目录D:\Kafka\kafka_2.13-2.7.1,按下Shift+右键,选择“打开命令窗口”选项,打开命令行,输入:
.\bin\windows\kafka-server-start.bat .\config\server.properties
注意:不要关了这个窗口,启用Kafka前请确保ZooKeeper实例已经准备好并开始运行
在这里插入代码片
创建主题现在我们来创建一个名字为“test”的Topic,这个topic只有一个partition,并且备份因子也设置为1:
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
现在我们可以通过以下命令来查看kafka中目前存在的topic
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
除了我们通过手工的方式创建Topic,当producer发布一个消息到某个指定的Topic,这个Topic如果不存在,就自动创建。
.\bin\windows\kafka-topics.bat --delete --topic test --zookeeper localhost:2181
kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。
首先我们要运行发布消息的脚本,然后在命令中输入要发送的消息的内容:
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
>this is a msg
>this is a another msg
对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息:
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
如果想要消费之前的消息可以通过–from-beginning参数指定,如下命令:
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic test
如果你是通过不同的终端窗口来运行以上的命令,你将会看到在producer终端输入的内容,很快就会在consumer的终端窗口上显示出来。
以上所有的命令都有一些附加的选项;当我们不携带任何参数运行命令的时候,将会显示出这个命令的详细用法。
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --whitelist "test|test-2"
一条消息只能被某一个消费者消费的模式,类似queue模式,只需让所有消费者在同一个消费组里即可
分别在两个客户端执行如下消费命令,然后往主题里发送消息,结果只有一个客户端能收到消息
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test
一条消息能被多个消费者消费的模式,类似publish-subscribe模式费,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。我们再增加一个消费者,该消费者属于testGroup-2消费组,结果两个客户端都能收到消息
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=testGroup-2 --topic test
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-replicated-topic --from-beginning
.\bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list
.\bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group testGroup
current-offset:当前消费组的已消费偏移量
log-end-offset:主题对应分区消息的结束偏移量(HW)
lag:当前消费组未消费的消息数
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test1
.\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test1
以下是输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。
leader节点负责给定partition的所有读写请求。
replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
isr 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。
我们可以运行相同的命令查看之前创建的名称为”test“的topic
.\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test
之前设置了topic的partition数量为1,备份因子为1,因此显示就如上所示了。
可以进入kafka的数据文件存储目录查看test和test1主题的消息日志文件:
消息日志文件主要存放在分区文件夹里的以log结尾的日志文件里,如下是test1主题对应的分区0的消息日志:
当然我们也可以通过如下命令增加topic的分区数量(目前kafka不支持减少分区):
.\bin\windows\kafka-topics.bat -alter --partitions 3 --zookeeper localhost:2181 --topic test
可以这么来理解Topic,Partition和Broker
一个topic,代表逻辑上的一个业务数据集,比如按数据库里不同表的数据操作消息区分放入不同topic,订单相关操作消息放入订单topic,用户相关操作消息放入用户topic,对于大型网站来说,后端数据都是海量的,订单消息很可能是非常巨量的,比如有几百个G甚至达到TB级别,如果把这么多数据都放在一台机器上可定会有容量限制问题,那么就可以在topic内部划分多个partition来分片存储数据,不同的partition可以位于不同的机器上,每台机器上都运行一个Kafka的进程Broker。
为什么要对Topic下数据进行分区存储?
1、commit log文件会受到所在机器的文件系统大小的限制,分区之后可以将不同的分区放在不同的机器上,相当于对数据做了分布式存储,理论上一个topic可以处理任意数量的数据。
2、为了提高并行度。
对于kafka来说,一个单独的broker意味着kafka集群中只有一个节点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。为了有更好的理解,现在我们在一台机器上同时启动三个broker实例。
首先,我们需要建立好其他2个broker的配置文件:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
配置文件的需要修改的内容分别如下:
config/server-1.properties:
#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://localhost:9093
log.dir=D:/work/Kafaka/kafka_2.13-2.7.1/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=localhost:2181
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://localhost:9094
log.dir=D:/work/Kafaka/kafka_2.13-2.7.1/kafka-logs-2
zookeeper.connect=localhost:2181
目前我们已经有一个zookeeper实例和一个broker实例在运行了,现在我们只需要在启动2个broker实例即可:
.\bin\windows\kafka-server-start.bat config/server-1.properties
.\bin\windows\kafka-server-start.bat config/server-2.properties
查看zookeeper确认集群节点是否都注册成功:
现在我们创建一个新的topic,副本数设置为3,分区数设置为2:
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
查看下topic的情况
.\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-replicated-topic
以下是输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。
leader节点负责给定partition的所有读写请求,同一个主题不同分区leader副本一般不一样(为了容灾)
replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
isr 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。
现在我们向新建的 my-replicated-topic 中发送一些message,kafka集群可以加上所有kafka节点:
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092,localhost:9093,localhost:9094 --topic my-replicated-topic
>my test msg 1
>my test msg 2
现在开始消费:
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --topic my-replicated-topic
my test msg 1
my test msg 2
现在我们来测试我们容错性,因为broker0目前是my-replicated-topic的分区0的leader,所以我们要将其kill
ps -ef | grep server.properties
kill 14776
现在再执行命令:
.\bin\windows\kafka-topics.bat --describe --zookeeper localhost:9092 --topic my-replicated-topic
我们可以看到,分区0的leader节点已经变成了broker 1。要注意的是,在Isr中,已经没有了0号节点。leader的选举也是从ISR(in-sync replica)中进行的。
此时,我们依然可以 消费新消息:
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --topic my-replicated-topic
my test msg 1
my test msg 2
查看主题分区对应的leader信息:
kafka将很多集群关键信息记录在zookeeper里,保证自己的无状态,从而在水平扩容时非常方便。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。