赞
踩
kafka下载地址:Apache Kafka
需要说明的是,kafka的安装依赖于zk,zk的部署可直接参考《Zookeeper介绍与基本部署》。当然,kafka默认也内置了zk的启动脚本,在kafka安装路径的bin目录下,名称为zookeeper-server-start.sh,如果不想独立安装zk,可直接使用该脚本。
kafka 的配置文件在/app/www/kafka/config/server.properties中
- broker.id=0
- listeners=PLAINTEXT://10.0.2.5:9092
- num.network.threads=3
- num.io.threads=8
- socket.send.buffer.bytes=102400
- socket.receive.buffer.bytes=102400
- socket.request.max.bytes=104857600
- log.dirs=/tmp/kafka-logs
- num.partitions=1
- num.recovery.threads.per.data.dir=1
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
- log.retention.hours=168
- log.segment.bytes=1073741824
- log.retention.check.interval.ms=300000
- zookeeper.connect=10.0.2.4:2181,10.0.2.5:2181,10.0.2.6:2181
- zookeeper.connection.timeout.ms=18000
- group.initial.rebalance.delay.ms=0
- auto.create.topics.enable=true
- delete.topics.enable=true
配置说明:
使用二进制包安装
- vim zoo.cfg
- tickTime=2000
- initLimit=10
- syncLimit=5
- dataDir=/apps/www/zookeeper/data
- clientPort=2181
- 4lw.commands.whitelist=* #开启之后可以查看 使用 echo status | nc localhost 2181
- server.1=10.0.2.4:2888:3888
- server.2=10.0.2.5:2888:3888
- server.3=10.0.2.6:2888:3888
zookeeper 配置说明
tickTime 这个时间是作为zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是说每个tickTime时间就会发送一个心跳。
initLimit 这个配置项是用来配置zookeeper接受客户端(这里所说的客户端不是用户连接zookeeper服务器的客户端,而是zookeeper服务器集群中连接到leader的follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。
当已经超过10个心跳的时间(也就是tickTime)长度后 zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 10x2000=20秒。
syncLimit 这个配置项标识leader与follower之间发送消息,请求和应答时间长度,最长不能超过多少个tickTime的时间长度,总的时间长度就是52000=10秒。
*dataDir** 顾名思义就是zookeeper保存数据的目录,默认情况下zookeeper将写数据的日志文件也保存在这个目录里;
clientPort 这个端口就是客户端连接Zookeeper服务器的端口,Zookeeper会监听这个端口接受客户端的访问请求;
server.A=B:C:D中的A是一个数字,表示这个是第几号服务器,B是这个服务器的IP地址,C第一个端口用来集群成员的信息交换,表示这个服务器与集群中的leader服务器交换信息的端口,D是在leader挂掉时专门用来进行选举leader所用的端口。
创建myid文件
- vim /apps/www/zookeeper/data/myid
- 1 #server.1=10.0.2.4:2888:3888 id 为server 后面的数字
首先启动/停止zookeeper
- sh ./bin/zkServer.sh --config /apps/www/zookeeper/conf start
- sh ./bin/zkServer.sh --config /apps/www/zookeeper/conf stop
启动/停止kafka
- sh /apps/www/kafka/bin/kafka-server-start.sh -daemon /apps/www/kafka/config/server.properties
- sh kafka-server-stop.sh
- [root@master-2-4 zookeeper]# ./bin/zkServer.sh status
- /bin/java
- ZooKeeper JMX enabled by default
- Using config: /apps/www/zookeeper/bin/../conf/zoo.cfg
- Client port found: 2181. Client address: localhost. Client SSL: false.
- Mode: follower
- [root@node1-2-5 zookeeper]# ./bin/zkServer.sh status
- /bin/java
- ZooKeeper JMX enabled by default
- Using config: /apps/www/zookeeper/bin/../conf/zoo.cfg
- Client port found: 2181. Client address: localhost. Client SSL: false.
- Mode: leader
- [root@node2-2-6 zookeeper]# ./bin/zkServer.sh status
- /usr/bin/java
- ZooKeeper JMX enabled by default
- Using config: /apps/www/zookeeper/bin/../conf/zoo.cfg
- Client port found: 2181. Client address: localhost. Client SSL: false.
- Mode: follower
- [root@master-2-4 kafka]# echo status | nc localhost 2181
- Zookeeper version: 3.7.1-a2fb57c55f8e59cdd76c34b357ad5181df1258d5, built on 2022-05-07 06:45 UTC
- Clients:
- /10.0.2.4:45318[1](queued=0,recved=66,sent=70)
- /10.0.2.5:41290[1](queued=0,recved=36,sent=36)
- /0:0:0:0:0:0:0:1:35494[0](queued=0,recved=1,sent=0)
-
- Latency min/avg/max: 0/1.7941/17
- Received: 104
- Sent: 107
- Connections: 3
- Outstanding: 0
- Zxid: 0x10000004d
- Mode: follower
- Node count: 30
- [root@node1-2-5 kafka]# echo status | nc localhost 2181
- Zookeeper version: 3.7.1-a2fb57c55f8e59cdd76c34b357ad5181df1258d5, built on 2022-05-07 06:45 UTC
- Clients:
- /0:0:0:0:0:0:0:1:39736[0](queued=0,recved=1,sent=0)
-
- Latency min/avg/max: 0/0.0/0
- Received: 2
- Sent: 1
- Connections: 1
- Outstanding: 0
- Zxid: 0x10000004d
- Mode: leader
- Node count: 30
- Proposal sizes last/min/max: 344/36/344
- [root@node2-2-6 kafka]# echo status | nc localhost 2181
- Zookeeper version: 3.7.1-a2fb57c55f8e59cdd76c34b357ad5181df1258d5, built on 2022-05-07 06:45 UTC
- Clients:
- /10.0.2.6:32934[1](queued=0,recved=180,sent=180)
- /0:0:0:0:0:0:0:1:44018[0](queued=0,recved=1,sent=0)
-
- Latency min/avg/max: 0/0.8398/11
- Received: 211
- Sent: 210
- Connections: 2
- Outstanding: 0
- Zxid: 0x10000004d
- Mode: follower
- Node count: 30
上面完成了kafka的部署,通过验证部署我们发现当前没有topic,所以创建一个topic如下:
3.0 以上使用 --bootstrap-server
- ./bin/kafka-topics.sh --create --zookeeper 10.0.2.4:2181,10.0.2.5:2181,10.0.2.6:2181 --replication-factor 2 --partitions 3 --topic myfirsttopic
- # ./bin/kafka-topics.sh --create --bootstrap-server 10.0.2.4:9092,10.0.2.5:9092,10.0.2.6:9092 --replication-factor 2 --partitions 3 --topic myfirsttopic
- Created topic myfirsttopic.
参数说明:
上面通过操作zk就可以看到topic相关信息,接下来我们直接通过kafka命令行来进行相关操作:
- # ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
- # ./bin/kafka-topics.sh --bootstrap-server 10.0.2.4:9092 --list
- myfirsttopic
- #查看myfirsttopic的详细信息
- # ./bin/kafka-topics.sh --bootstrap-server 10.0.2.4:9092 --describe --topic myfirsttopic
- # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic
- Topic:myfirsttopic PartitionCount:3 ReplicationFactor:2 Configs:
- Topic: myfirsttopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0
- Topic: myfirsttopic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
- Topic: myfirsttopic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
参数说明:
输出说明:
- # ./bin/kafka-topics.sh --bootstrap-server 10.0.2.4:9092 --alter --partitions 6 --topic myfirsttopic
- # ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 6 --topic myfirsttopic
- WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
- Adding partitions succeeded!
-
- # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic
- Topic:myfirsttopic PartitionCount:6 ReplicationFactor:2 Configs:
- Topic: myfirsttopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0
- Topic: myfirsttopic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
- Topic: myfirsttopic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
- Topic: myfirsttopic Partition: 3 Leader: 0 Replicas: 0,2 Isr: 0,2
- Topic: myfirsttopic Partition: 4 Leader: 1 Replicas: 1,0 Isr: 1,0
- Topic: myfirsttopic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1
- #创建一个topic名为mysecondtopic,指定分区为2,副本为1
- # ./bin/kafka-topics.sh --bootstrap-server 10.0.2.5:9092 --create --replication-factor 1 --partitions 2 --topic mysecondtopic
- # ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2 --topic mysecondtopic
- Created topic mysecondtopic.
-
- #查看新创建的topic详细信息
- # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mysecondtopic
- Topic:mysecondtopic PartitionCount:2 ReplicationFactor:1 Configs:
- Topic: mysecondtopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
- Topic: mysecondtopic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
-
- #将broker.id为0上的partition的副本由原来的[0]扩充为[0,2],将broker.id为1上的partition的副本由原来的[1]扩充为[1,2]。
- #需要先创建一个json文件如下:
- # cat partitions-to-move.json
- {
- "partitions":
- [
- {
- "topic":"mysecondtopic",
- "partition": 0,
- "replicas": [0,2]
- },
- {
- "topic": "mysecondtopic",
- "partition": 1,
- "replicas": [1,2]
- }
- ],
- "version": 1
- }
-
- #执行副本修改
- # ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./partitions-to-move.json --execute
- Current partition replica assignment
-
- {"version":1,"partitions":[{"topic":"mysecondtopic","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"mysecondtopic","partition":0,"replicas":[0],"log_dirs":["any"]}]}
-
- Save this to use as the --reassignment-json-file option during rollback
- Successfully started reassignment of partitions.
-
- #再次查看topic状态,发现副本数由按照预期发生变更
- # ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mysecondtopic
- Topic:mysecondtopic PartitionCount:2 ReplicationFactor:2 Configs:
- Topic: mysecondtopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0
- Topic: mysecondtopic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1
- #执行删除操作
- # ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myfirsttopic
- Topic myfirsttopic is marked for deletion.
- Note: This will have no impact if delete.topic.enable is not set to true.
-
- #查看topic,可以看到myfirsttopic已被删除
- # ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
- __consumer_offsets
- mysecondtopic
- # ./bin/kafka-console-producer.sh --broker-list 10.1.60.29:9092 --topic mysecondtopic
- >hello kafka!
- >hello world!
- >just a test!
- >
- >hi world!
- >hahahaha!
- >
- # ./bin/kafka-console-consumer.sh --bootstrap-server 10.1.60.29:9092 --topic mysecondtopic --from-beginning
- hello kafka!
- just a test!
- hi world!
- hello world!
-
- hahahaha!
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --zookeeper 127.0.0.1:2181 --topic wechat_label_new --from-beginning --max-messages 300 > result.txt
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。