当前位置:   article > 正文

kafka 集群的安装以及配置_kafka的安装与配置实验的总结

kafka的安装与配置实验的总结

1、安装

1.1 下载与安装

kafka下载地址:Apache Kafka

需要说明的是,kafka的安装依赖于zk,zk的部署可直接参考《Zookeeper介绍与基本部署》。当然,kafka默认也内置了zk的启动脚本,在kafka安装路径的bin目录下,名称为zookeeper-server-start.sh,如果不想独立安装zk,可直接使用该脚本。

1.2 配置kafka

kafka 的配置文件在/app/www/kafka/config/server.properties中

  1. broker.id=0
  2. listeners=PLAINTEXT://10.0.2.5:9092
  3. num.network.threads=3
  4. num.io.threads=8
  5. socket.send.buffer.bytes=102400
  6. socket.receive.buffer.bytes=102400
  7. socket.request.max.bytes=104857600
  8. log.dirs=/tmp/kafka-logs
  9. num.partitions=1
  10. num.recovery.threads.per.data.dir=1
  11. offsets.topic.replication.factor=1
  12. transaction.state.log.replication.factor=1
  13. transaction.state.log.min.isr=1
  14. log.retention.hours=168
  15. log.segment.bytes=1073741824
  16. log.retention.check.interval.ms=300000
  17. zookeeper.connect=10.0.2.4:2181,10.0.2.5:2181,10.0.2.6:2181
  18. zookeeper.connection.timeout.ms=18000
  19. group.initial.rebalance.delay.ms=0
  20. auto.create.topics.enable=true
  21. delete.topics.enable=true

配置说明:

  • broker.id:每个broker在集群中的唯一标识,正整数。当该服务器的ip地址发生变更,但broker.id未变,则不会影响consumers的消费情况
  • listeners:kafka的监听地址与端口,在实际测试中如果写0.0.0.0会报错。
  • num.network.threads:kafka用于处理网络请求的线程数
  • num.io.threads:kafka用于处理磁盘io的线程数
  • socket.send.buffer.bytes:发送数据的缓冲区
  • socket.receive.buffer.bytes:接收数据的缓冲区
  • socket.request.max.bytes:允许接收的最大数据包的大小(防止数据包过大导致OOM)
  • log.dirs:kakfa用于保存数据的目录,所有的消息都会存储在该目录当中。可以通过逗号来指定多个路径,kafka会根据最少被使用的原则选择目录分配新的partition。需要说明的是,kafka在分配partition的时候选择的原则不是按照磁盘空间大小来定的,而是根据分配的partition的个数多少而定
  • num.partitions:设置新创建的topic的默认分区数
  • number.recovery.threads.per.data.dir:用于恢复每个数据目录时启动的线程数
  • log.retention.hours:配置kafka中消息保存的时间,还支持log.retention.minutes和log.retention.ms。如果多个同时设置会选择时间最短的配置,默认为7天。
  • log.retention.check.interval.ms:用于检测数据过期的周期
  • log.segment.bytes:配置partition中每个segment数据文件的大小。默认为1GB。超出该大小后,会自动创建一个新的segment文件。
  • zookeeper.connect:指定连接的zk的地址,zk中存储了broker的元数据信息。可以通过逗号来设置多个值。格式为:hostname:port/path。hostname为zk的主机名或ip,port为zk监听的端口。/path表示kafka的元数据存储到zk上的目录,如果不设置,默认为根目录
  • zookeeper.connection.timeout:kafka连接zk的超时时间
  • group.initial.rebalance.delay.ms:在实际环境当中,当将多个consumer加入到一个空的consumer group中时,每加入一个consumer就会触发一次对partition消费的重平衡,如果加入100个,就得重平衡100次,这个过程就会变得非常耗时。通过设置该参数,可以延迟重平衡的时间,比如有100个consumer会在10s内全部加入到一个consumer group中,就可以将该值设置为10s,10s之后,只需要做一次重平衡即可。默认为0则代表不开启该特性。
  • auto.create.topics.enable:当有producer向一个不存在的topic中写入消息时,是否自动创建该topic
  • delete.topics.enable:kafka提供了删除topic的功能,但默认并不会直接将topic数据物理删除。如果要从物理上删除(删除topic后,数据文件也一并删除),则需要将此项设置为true

1.3 zookeeper安装配置

使用二进制包安装

  1. vim zoo.cfg
  2. tickTime=2000
  3. initLimit=10
  4. syncLimit=5
  5. dataDir=/apps/www/zookeeper/data
  6. clientPort=2181
  7. 4lw.commands.whitelist=* #开启之后可以查看 使用 echo status | nc localhost 2181
  8. server.1=10.0.2.4:2888:3888
  9. server.2=10.0.2.5:2888:3888
  10. server.3=10.0.2.6:2888:3888

zookeeper 配置说明

tickTime 这个时间是作为zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是说每个tickTime时间就会发送一个心跳。

initLimit 这个配置项是用来配置zookeeper接受客户端(这里所说的客户端不是用户连接zookeeper服务器的客户端,而是zookeeper服务器集群中连接到leader的follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。

当已经超过10个心跳的时间(也就是tickTime)长度后 zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 10x2000=20秒。

syncLimit 这个配置项标识leader与follower之间发送消息,请求和应答时间长度,最长不能超过多少个tickTime的时间长度,总的时间长度就是52000=10秒。

*dataDir** 顾名思义就是zookeeper保存数据的目录,默认情况下zookeeper将写数据的日志文件也保存在这个目录里;

clientPort 这个端口就是客户端连接Zookeeper服务器的端口,Zookeeper会监听这个端口接受客户端的访问请求;

server.A=B:C:D中的A是一个数字,表示这个是第几号服务器,B是这个服务器的IP地址,C第一个端口用来集群成员的信息交换,表示这个服务器与集群中的leader服务器交换信息的端口,D是在leader挂掉时专门用来进行选举leader所用的端口。

创建myid文件

  1. vim /apps/www/zookeeper/data/myid
  2. 1 #server.1=10.0.2.4:2888:3888 id 为server 后面的数字

1.4 启停操作

首先启动/停止zookeeper

  1. sh ./bin/zkServer.sh --config /apps/www/zookeeper/conf start
  2. sh ./bin/zkServer.sh --config /apps/www/zookeeper/conf stop

启动/停止kafka

  1. sh /apps/www/kafka/bin/kafka-server-start.sh -daemon /apps/www/kafka/config/server.properties
  2. sh kafka-server-stop.sh

1.5 查看集群状态

1.5.1 查看zookeeper 集群状态

  1. [root@master-2-4 zookeeper]# ./bin/zkServer.sh status
  2. /bin/java
  3. ZooKeeper JMX enabled by default
  4. Using config: /apps/www/zookeeper/bin/../conf/zoo.cfg
  5. Client port found: 2181. Client address: localhost. Client SSL: false.
  6. Mode: follower
  1. [root@node1-2-5 zookeeper]# ./bin/zkServer.sh status
  2. /bin/java
  3. ZooKeeper JMX enabled by default
  4. Using config: /apps/www/zookeeper/bin/../conf/zoo.cfg
  5. Client port found: 2181. Client address: localhost. Client SSL: false.
  6. Mode: leader
  1. [root@node2-2-6 zookeeper]# ./bin/zkServer.sh status
  2. /usr/bin/java
  3. ZooKeeper JMX enabled by default
  4. Using config: /apps/www/zookeeper/bin/../conf/zoo.cfg
  5. Client port found: 2181. Client address: localhost. Client SSL: false.
  6. Mode: follower

1.5.2 查看kafka 集群状态

  1. [root@master-2-4 kafka]# echo status | nc localhost 2181
  2. Zookeeper version: 3.7.1-a2fb57c55f8e59cdd76c34b357ad5181df1258d5, built on 2022-05-07 06:45 UTC
  3. Clients:
  4. /10.0.2.4:45318[1](queued=0,recved=66,sent=70)
  5. /10.0.2.5:41290[1](queued=0,recved=36,sent=36)
  6. /0:0:0:0:0:0:0:1:35494[0](queued=0,recved=1,sent=0)
  7. Latency min/avg/max: 0/1.7941/17
  8. Received: 104
  9. Sent: 107
  10. Connections: 3
  11. Outstanding: 0
  12. Zxid: 0x10000004d
  13. Mode: follower
  14. Node count: 30
  1. [root@node1-2-5 kafka]# echo status | nc localhost 2181
  2. Zookeeper version: 3.7.1-a2fb57c55f8e59cdd76c34b357ad5181df1258d5, built on 2022-05-07 06:45 UTC
  3. Clients:
  4. /0:0:0:0:0:0:0:1:39736[0](queued=0,recved=1,sent=0)
  5. Latency min/avg/max: 0/0.0/0
  6. Received: 2
  7. Sent: 1
  8. Connections: 1
  9. Outstanding: 0
  10. Zxid: 0x10000004d
  11. Mode: leader
  12. Node count: 30
  13. Proposal sizes last/min/max: 344/36/344
  1. [root@node2-2-6 kafka]# echo status | nc localhost 2181
  2. Zookeeper version: 3.7.1-a2fb57c55f8e59cdd76c34b357ad5181df1258d5, built on 2022-05-07 06:45 UTC
  3. Clients:
  4. /10.0.2.6:32934[1](queued=0,recved=180,sent=180)
  5. /0:0:0:0:0:0:0:1:44018[0](queued=0,recved=1,sent=0)
  6. Latency min/avg/max: 0/0.8398/11
  7. Received: 211
  8. Sent: 210
  9. Connections: 2
  10. Outstanding: 0
  11. Zxid: 0x10000004d
  12. Mode: follower
  13. Node count: 30

2、基本操作

2.1 创建topic

上面完成了kafka的部署,通过验证部署我们发现当前没有topic,所以创建一个topic如下:

3.0 以上使用 --bootstrap-server

  1. ./bin/kafka-topics.sh --create --zookeeper 10.0.2.4:2181,10.0.2.5:2181,10.0.2.6:2181 --replication-factor 2 --partitions 3 --topic myfirsttopic
  2. # ./bin/kafka-topics.sh --create --bootstrap-server 10.0.2.4:9092,10.0.2.5:9092,10.0.2.6:9092 --replication-factor 2 --partitions 3 --topic myfirsttopic
  3. Created topic myfirsttopic.

参数说明:

  • –create:创建一个topic
  • –zookeeper: 指定zookeeper集群的主机列表,多个server可使用逗号分隔,这里因为kafka和zk是在同一个server,所以直接连接了本机的2181端口
  • –replication-factor:指定创建这个topic的副本数
  • –partitions:指定该topic的分区数
  • –topic:指定topic的名称

2.2 列出现有的topic

上面通过操作zk就可以看到topic相关信息,接下来我们直接通过kafka命令行来进行相关操作:

  1. # ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
  2. # ./bin/kafka-topics.sh --bootstrap-server 10.0.2.4:9092 --list
  3. myfirsttopic

2.3 查看topic的详细信息

  1. #查看myfirsttopic的详细信息
  2. # ./bin/kafka-topics.sh --bootstrap-server 10.0.2.4:9092 --describe --topic myfirsttopic
  3. # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic
  4. Topic:myfirsttopic PartitionCount:3 ReplicationFactor:2 Configs:
  5. Topic: myfirsttopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0
  6. Topic: myfirsttopic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
  7. Topic: myfirsttopic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1

参数说明:

  • –describe:查看topic的详细信息

输出说明:

  • leader:当前负责读写的leader broker
  • replicas:当前分区所有的副本对应的broker列表
  • isr:处于活动状态的broker

2.4 增加topic的partition数量

  1. # ./bin/kafka-topics.sh --bootstrap-server 10.0.2.4:9092 --alter --partitions 6 --topic myfirsttopic
  2. # ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 6 --topic myfirsttopic
  3. WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
  4. Adding partitions succeeded!
  5. # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic
  6. Topic:myfirsttopic PartitionCount:6 ReplicationFactor:2 Configs:
  7. Topic: myfirsttopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0
  8. Topic: myfirsttopic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
  9. Topic: myfirsttopic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
  10. Topic: myfirsttopic Partition: 3 Leader: 0 Replicas: 0,2 Isr: 0,2
  11. Topic: myfirsttopic Partition: 4 Leader: 1 Replicas: 1,0 Isr: 1,0
  12. Topic: myfirsttopic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

2.5 修改一个topic的副本数

  1. #创建一个topic名为mysecondtopic,指定分区为2,副本为1
  2. # ./bin/kafka-topics.sh --bootstrap-server 10.0.2.5:9092 --create --replication-factor 1 --partitions 2 --topic mysecondtopic
  3. # ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2 --topic mysecondtopic
  4. Created topic mysecondtopic.
  5. #查看新创建的topic详细信息
  6. # ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mysecondtopic
  7. Topic:mysecondtopic PartitionCount:2 ReplicationFactor:1 Configs:
  8. Topic: mysecondtopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  9. Topic: mysecondtopic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
  10. #将broker.id为0上的partition的副本由原来的[0]扩充为[0,2],将broker.id为1上的partition的副本由原来的[1]扩充为[1,2]。
  11. #需要先创建一个json文件如下:
  12. # cat partitions-to-move.json
  13. {
  14. "partitions":
  15. [
  16. {
  17. "topic":"mysecondtopic",
  18. "partition": 0,
  19. "replicas": [0,2]
  20. },
  21. {
  22. "topic": "mysecondtopic",
  23. "partition": 1,
  24. "replicas": [1,2]
  25. }
  26. ],
  27. "version": 1
  28. }
  29. #执行副本修改
  30. # ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./partitions-to-move.json --execute
  31. Current partition replica assignment
  32. {"version":1,"partitions":[{"topic":"mysecondtopic","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"mysecondtopic","partition":0,"replicas":[0],"log_dirs":["any"]}]}
  33. Save this to use as the --reassignment-json-file option during rollback
  34. Successfully started reassignment of partitions.
  35. #再次查看topic状态,发现副本数由按照预期发生变更
  36. # ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mysecondtopic
  37. Topic:mysecondtopic PartitionCount:2 ReplicationFactor:2 Configs:
  38. Topic: mysecondtopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0
  39. Topic: mysecondtopic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1

2.6 删除一个topic

  1. #执行删除操作
  2. # ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myfirsttopic
  3. Topic myfirsttopic is marked for deletion.
  4. Note: This will have no impact if delete.topic.enable is not set to true.
  5. #查看topic,可以看到myfirsttopic已被删除
  6. # ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
  7. __consumer_offsets
  8. mysecondtopic

2.7 通过producer生产消息

  1. # ./bin/kafka-console-producer.sh --broker-list 10.1.60.29:9092 --topic mysecondtopic
  2. >hello kafka!
  3. >hello world!
  4. >just a test!
  5. >
  6. >hi world!
  7. >hahahaha!
  8. >

2.8 通过consumer消费消息

  1. # ./bin/kafka-console-consumer.sh --bootstrap-server 10.1.60.29:9092 --topic mysecondtopic --from-beginning
  2. hello kafka!
  3. just a test!
  4. hi world!
  5. hello world!
  6. hahahaha!

2.9 查看Kafka中指定topic的数据

./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --zookeeper 127.0.0.1:2181 --topic wechat_label_new --from-beginning  --max-messages 300 > result.txt

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/441959
推荐阅读
相关标签
  

闽ICP备14008679号