赞
踩
2、解压
解压命令:tar –zxvf kafka_2.13-2.4.0.tgz
移动: cp -R kafka_2.13-2.4.0 /zjl/program/
创建软链接: ln -s kafka_2.13-2.4.0/ /zjl/program/kafka 解压后可使用
配置zookeeper集群参见:
zookeeper安装配置 - pk.com.cn - 博客园压后即可使用
3、kafka配置
3.1、配置server.properties
- 位置: /kafka/config
-
-
- 默认配置 advertised.listeners=PLAINTEXT://your.host.name:9092
-
-
- 修改为 advertised.listeners=PLAINTEXT://ip:9092
-
-
- 例:advertised.listeners=PLAINTEXT://192.168.244.128:9092
-
-
- ip为服务器ip
- hostname和端口是用来建议给生产者和消费者使用的,如果没有设置,将会使用listeners的配置,如果listeners也没有配置,将使用java.net.InetAddress.getCanonicalHostName()来获取这个hostname和port,对于ipv4,基本就是localhost了。
- "PLAINTEXT"表示协议,可选的值有PLAINTEXT和SSL,hostname可以指定IP地址,也可以用"0.0.0.0"表示对所有的网络接口有效,如果hostname为空表示只对默认的网络接口有效。也就是说如果你没有配置advertised.listeners,就使用listeners的配置通告给消息的生产者和消费者,这个过程是在生产者和消费者获取源数据(metadata)。
- 主要注意三个配置:
-
-
- broker.id标识本机
- log.dirs是kafka接收消息存放路径
- zookeeper.connect指定连接的zookeeper集群地址

3.2、kafka集群计划
- 三台机器:192.168.244.128 192.168.244.130 192.168.244.131
-
- 修改server.properties文件,只修改三项,其他根据需要配置
-
- 192.168.244.128
-
- broker.id=1
- log.dirs=/zjl/program/kafka/datalogs/kafka-logs
- zookeeper.connect=192.168.244.128:2181,192.168.244.130:2181,192.168.244.131:2181
- 192.168.244.130
-
- broker.id=2
- log.dirs=/zjl/program/kafka/datalogs/kafka-logs
- zookeeper.connect=192.168.244.128:2181,192.168.244.130:2181,192.168.244.131:2181
- 192.168.244.131
-
- broker.id=3
- log.dirs=/zjl/program/kafka/datalogs/kafka-logs
- zookeeper.connect=192.168.244.128:2181,192.168.244.130:2181,192.168.244.131:2181

3.3、启动
- 三个节点分别执行:nohup ./kafka-server-start.sh -daemon ../config/server.properties &
-
- 启动时若报错java.net.UnknownHostException XXX
-
- 命令查看:hostname 打印出本机hostname 我的机器是 zjlLinux1
-
- 修改文件 :vim /etc/hosts
-
- 添加内容 192.168.244.128 zjlLinux1 #ip是本机IP 后边是hostname
3.4、较验启动是否成功
ps –ef|grep kafka
3.5、kafka-manager
- 下载地址:https://github.com/yahoo/kafka-manager/releases
-
- https://github.com/yahoo/CMAK/releases
-
- 编译
-
- tar -zxvf 1.3.3.17.tar.gz
-
- cd kafka-manager-1.3.3.17/
-
- ./sbt clean dist
-
- 配置
-
- 在解压后的conf目录中打开 application.conf文件,修改其中的配置信息,最主要的内容为:kafka-manager.zkhosts="192.168.1.22:2181,192.168.1.23:2181,192.168.1.24:2181" 配置为Kafka的 zookeeper 服务器。你还可以通过环境变量ZK_HOSTS配置这个参数值。
-
- 启动
-
- 在解压的目录中,使用下面的命令启动Kafka-manager : bin/kafka-manager
-
- 默认情况下端口为9000,你还可以通过下面的命令指定配置文件和端口: bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=9000
-
- 启动后,从浏览器打开:http://[服务器IP]:9000/ 就可以看到配置页面了

4、kafka基本命令操作
1、创建TOPIC
./kafka-topics.sh --create --zookeeper 192.168.244.128:2181,192.168.244.130:2181,192.168.244.131:2181 --replication-factor 3 --partitions 6 --topic kfk_test
2、列出创建的topic
./kafka-topics.sh --list --zookeeper 192.168.244.128:2181,192.168.244.130:2181,192.168.244.131:2181
3、生成数据
注意这里的端口号是kafka的端口 9092
./kafka-console-producer.sh -broker-list 192.168.244.128:9092,192.168.244.130:9092,192.168.244.131:9092 --topic kfk_test
功连接后,可以看到如下输出:
5、zookeeper基础配置及指令
5.1、四字母命令
5.2、watcher通知机制
参考zookeeper 中 Watcher 通知机制的一点理解_拖垃圾的专栏-CSDN博客
关于watcher机制大体的理解可以为,当每个节点发生变化,都会触发watcher事件,类似于mysql的触发器。zk中 watcher是一次性的,触发后立即销毁。
- stat path [watch] 设置watch事件
- get path [watch]设置watch事件
- 子节点创建和删除时触发watch事件,子节点修改不会触发该事件
#添加watch 事件
- [zk: localhost:2181(CONNECTED) 18] stat /longfei watch
- Node does not exist: /longfei
- #创建longfei节点时触发watcher事件
- [zk: localhost:2181(CONNECTED) 19] create /longfei test
-
- WATCHER::
-
- WatchedEvent state:SyncConnected type:NodeCreated path:/longfei
- Created /longfei
#使用get命令添加watch事件
- [zk: localhost:2181(CONNECTED) 20] get /longfei watch
- test
- cZxid = 0x20000000e
- ctime = Sat Jun 02 14:43:15 UTC 2018
- mZxid = 0x20000000e
- mtime = Sat Jun 02 14:43:15 UTC 2018
- pZxid = 0x20000000e
- cversion = 0
- dataVersion = 0
- aclVersion = 0
- ephemeralOwner = 0x0
- dataLength = 4
- numChildren = 0
- #修改节点触发watcher事件
- [zk: localhost:2181(CONNECTED) 21] set /longfei new_test
-
- WATCHER::
-
- WatchedEvent state:SyncConnected type:NodeDataChanged path:/longfei
- cZxid = 0x20000000e
- ctime = Sat Jun 02 14:43:15 UTC 2018
- mZxid = 0x20000000f
- mtime = Sat Jun 02 14:45:06 UTC 2018
- pZxid = 0x20000000e
- cversion = 0
- dataVersion = 1
- aclVersion = 0
- ephemeralOwner = 0x0
- dataLength = 8
- numChildren = 0
- [zk: localhost:2181(CONNECTED) 22]
- #删除触发watcher事件
- [zk: localhost:2181(CONNECTED) 23] get /longfei watch
- new_test
- cZxid = 0x20000000e
- ctime = Sat Jun 02 14:43:15 UTC 2018
- mZxid = 0x20000000f
- mtime = Sat Jun 02 14:45:06 UTC 2018
- pZxid = 0x20000000e
- cversion = 0
- dataVersion = 1
- aclVersion = 0
- ephemeralOwner = 0x0
- dataLength = 8
- numChildren = 0
- [zk: localhost:2181(CONNECTED) 24] delete /longfei
-
- WATCHER::
-
- WatchedEvent state:SyncConnected type:NodeDeleted path:/longfei
- [zk: localhost:2181(CONNECTED) 25]

设置watch事件:stat path [watch]
设置watch事件:get path [watch]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。