赞
踩
kafka下载地址:http://kafka.apache.org/downloads
下载.tgz,win、linux都是这个安装包。
说明
#与单独的zk相比,启动脚本、配置文件的名称都不相同
bin/zookeeper-server-start.sh config/zookeeper.properties &
#需要显式指定配置文件的位置,win下不能直接双击bat启动,注意win、linux文件路径中的斜杠不同
bin\zookeeper-server-start.sh config\zookeeper.properties &
kafka的安装部署
#不能加参数-v,win可直接用解压软件解压 tar -zxf kafka_2.13-2.8.0.tgz mv kafka_2.13-2.8.0 kafka cd kafka #修改kafka的配置文件 vim config/server.properties #监听地址使用内网ip listeners=PLAINTEXT://192.168.1.1:9092 #广播地址使用公网ip advertised.listeners=PLAINTEXT://172.168.1.1:9092 #zk地址 zookeeper.connect=localhost:2181 #如果只是本地使用,则不用配置以上三项,使用默认的即可 #线程数,日志保存路径,默认分区数、副本数等配置,根据需要自行修改 #开放kafka的9092端口(以及zk的2181端口) firewall-cmd --add-port=9092/tcp --zone=public --permanent firewall-cmd --reload
重要的数据文件、日志文件一般不建议保存在临时目录/tmp下,以防不小心被清理了。
linux下启动、停止
#启动kafka
# &只是Ctrl+C不会终止应用运行,但关闭会话(eg.关闭xshell的这个标签页),会终止应用运行
#这种方式会在控制台打印日志,方便在安装部署时查看错误、定位问题
bin/kafka-server-start.sh config/server.properties &
#kafka部署没有问题后,后续都以守护进程的形式启动,关闭终端也不会终止kafka运行
bin/kafka-server-start.sh -daemon config/server.properties &
#可以根据日志或以下任意命令查看启动是否成功
#kafka主要使用java编写,所以可以使用jps查看
jps
ps -ef|grep kafka
#停止,也可直接kill -9
bin/kafka-server-stop.sh
win下启动、停止
#不能直接双击bat启动,需要显式指定配置文件的位置
bin\windows\kafka-server-start.bat config/server.properties
#可以双击bat停止
bin\windows\kafka-server-stop.bat
.bat后缀可以缺省,注意win、linux路径中斜杠的写法不同。
linux执行bin下的sh,win执行bin\windows下的bat,注意win、linux文件路径中斜杠的写法不同
#创建topic。topic信息是保存在zk上的,需要指定zk地址、分区数量、副本数量、topic名称 bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic xxx-topic #删除topic bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic xxx-topic #查看已存在的topic bin/kafka-topics.sh --list --zookeeper localhost:2181 #查看指定topic的描述信息,包含topicId、分区、副本、leader、isr等信息 bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic xxx-topic #发送消息,需要指定broker地址、使用的topic,出现 > 就可以发送消息了 bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic xxx-topic #接收|消费消息 #--from-beginning可选,会消费此topic中的所有消息,往往会发生重复消费,不加则从当前位置开始消费 bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic xxx-topic --from-beginning #可以指定配置文件,同理producer也可以指定配置文件 bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic xxx-topic --from-beginning --consumer.config config/consumer.properties
删除topic应该用命令删除,或者调用api进行删除,不要直接删除zk上的topic节点或者直接删除kafka的本地日志文件。
如果只删除了zk上的topic节点,或者只删除了kafka的本地日志文件,往往会造成kafka本地数据、zk远程数据不一致,导致kafka启动失败,此时应该根据报错信息确定数据不一致的topic,同时删除zk上对应的topic节点、kakfa对应的本地日志文件。
kafka都是以集群形式运行的,即使只有一个节点,也是以集群方式运行的。
kafka集群依赖于zk进行协调。
修改配置文件的broker.id即可,从0开始,递增,唯一标识一个broker。一个broker即一个kafka节点
kafka副本集(replicas):日志文件的副本,可以给每个topic设置副本集。
一个topic的多个part会分散在多个broker上,part在其它broker上会有备份。
kafka节点故障
kafka集群会自动移除故障的节点。
kafka会尽量将消息均匀地分散到各个节点上,避免部分节点过载。
最常用的是雅虎开源的CMAK,可在github上下载,3.x要求jdk11,jdk8可以使用2.x的版本。
下载压缩包,需要自己配置sbi的环境,自行进行编译,编译会得到一个用于部署的压缩包Kafka Manager。
#编译
./sbt clean dist
修改conf/application.conf
#指定zk地址,有多个节点时逗号分隔
kafka-manager.zkhosts="localhost:2181"
#启动,默认使用9000端口
bin/kafka-manager
#也可以指定配置文件的位置、使用的端口
bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080
通过ip:port访问,添加一个集群,设置集群名称、zk地址、kafka版本。
JMX可以反映kafka集群的metrics(指标),但比较耗性能,根据情况勾选。
如果要使用JMX,还需要在kafka的bin/kafka-server-start.sh中的顶部加一行
export JMX_PORT=9999
如果kafka已经启动,需要重启才会生效。
skewed是part分布是否均匀地指标。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。