当前位置:   article > 正文

Kafka集群部署和操作命令(超级详细)_kafka集群命令

kafka集群命令

N.1 环境准备

 N.1.1 集群规划

bigData111

bigData112

bigData113

zookeep

kafka

zookeep

kafka

zookeep

kafka

N.1.2 虚拟机准备

1)每台主机分别关闭防火墙

2)安装 zookeep分布式搭建.note 和jdk

N.2 Kafka集群部署

0)版本介绍

0)前提JDK环境准备

如果要自己指定jdk路径,吧使用环境变量的话,配置如下。

vi kafka-run-class.sh

export JAVA_HOME=/usr/local/src/jdk1.8

第三方工具指定JDK路径思路.note

1)解压安装包

2.11是scala版本,0.11是kafka版本

官网 下载地址 Index of /dist/kafka

[itstar@bigData111 software]$ tar -zxvf kafka_2.11-0.11.0.2.tgz -C /softWare/

注意scala版本是2.11版本,0.11表示kafka版本。

2)修改解压后的文件名称

[itstar@bigData111 module]$ mv kafka_2.11-0.11.0.2/ kafka_2.11

3)在/softWare/kafka目录下创建logs文件夹

[itstar@bigData111 kafka]$ mkdir logs

4)修改配置文件

[itstar@bigData111 kafka]$ cd config/

[itstar@bigData111 config]$ vi server.properties

#broker的全局唯一编号,不能重复 ( self # 这个id最好和zookeep的id一 一对应起来)

broker.id=1

#是否允许删除topic

delete.topic.enable=true

#处理网络请求的线程数量

num.network.threads=3

#用来处理磁盘IO的线程数量

num.io.threads=8

#发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

#接收套接字的缓冲区大小

socket.receive.buffer.bytes=102400

#请求套接字的最大缓冲区大小

socket.request.max.bytes=104857600

#kafka运行日志存放的路径

log.dirs=/softWare/kafka_2.11/logs

#topic在当前broker上的分区个数

num.partitions=1

#用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1

#segment文件保留的最长时间,超时将被删除

log.retention.hours=168

#配置连接Zookeeper集群地址

zookeeper.connect=bigdata111:2181,bigdata112:2181,bigdata113:2181

5)把kafka分发给每一个节点

分别在bigData111和bigData112上修改配置文件/softWare/kafka_2.11/config/server.properties中的broker.id=2、broker.id=3

注:broker.id不得重复

6)配置环境变量

(每台节点都要配置)

7启动zookeeper后再启动kafka(确保先启动zookeeper, 后在启动kafka)

开启集群,注意启动的时候 不要ctrl +z 进行停止了。

[itstar@bigdata11 kafka]$ nohup bin/kafka-server-start.sh config/server.properties &

[itstar@bigdata12 kafka]$ nohup bin/kafka-server-start.sh config/server.properties &

[itstar@bigdata13 kafka]$ nohup bin/kafka-server-start.sh config/server.properties &

关闭集群

[itstar@bigdata11 kafka]$ bin/kafka-server-stop.sh stop

[itstar@bigdata12 kafka]$ bin/kafka-server-stop.sh stop

[itstar@bigdata13 kafka]$ bin/kafka-server-stop.sh stop

8)如果要重装kafka的话,只需要 kafka_2.11/logs 全部删除后,在进行启动kafka

N.3 Kafka命令行操作

0)相关命令

Kafka命令大全_kafka版本查看命令_day day day ...的博客-CSDN博客

# kafka0.9版本前用的是 --zookeeper 参数,0.10开始用--bootstrap-server参数。 但有些命令这个两个是存在交叉用的。

1)查看topic和删除topic

(1)查看Topic列表

[root@bigData111 kafka]$ bin/kafka-topics.sh --zookeeper ip:2181 --list

(2)查看某个Topic的详情

[root@bigData111 kafka]$ bin/kafka-topics.sh --zookeeper ip:2181 --describe --topic topic_1

(3)删除

bin/kafka-topics.sh --zookeeper ip:2181 --delete --topic topic_1

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

2)查看消费者组和删除消费者组

消费者组不存在创建命令的,只有消费数据的时候临时去创建的

(1)查看 bin/kafka-consumer-groups.sh --bootstrap-server ip:9092 --list

(2)删除 bin/kafka-consumer-groups.sh --zookeeper 134.224.115.40:2181 --delete --group id

注意 目前测试发现 只要消费者退出了(进程也要确保kill了),消费者组的id也会自动删除

3)创建topic

[root@bigData111 kafka]$

bin/kafka-topics.sh --zookeeper bigData111:2181 --create --replication-factor 3 --partitions 1 --topic test

选项说明:

--topic 定义topic名

--replication-factor  定义副本数

--partitions  定义分区数

4)生产者-发送数据

[root@bigData111 kafka]$

# 9092 会启动一个新端口

bin/kafka-console-producer.sh --broker-list bigData111:9092 --topic topic_1

>hello world

>okok

5)消费者-读取数据

(1)不使用消费者组的情况

[0] 命令:

bin/kafka-console-consumer.sh --bootstrap-server bigData111:9092 --from-beginning --topic topic_1

# 9092 是生产者的端口,不是zookeep端口

[2] --from-beginning"为可选参数,表示要从头消费消息

(2)使用消费者组的情况

每个ConsumerGroup消费组 只能对同一个分区最多消费一次, 而同一个分区可以被不同组消费;

[0] 命令:

bin/kafka-console-consumer.sh --bootstrap-server 134.224.115.40:9092

--topic topic_1 --from-beginning --consumer-property group=myid

[1] 不使用group的话,启动10个consumer消费一个topic,这10个consumer都能得到topic的所有数据,相当于这个topic中的任一条消息被消费10次。

[2] 使用group的话,连接时带上groupid,topic的消息会分发到10个consumer上,每条消息只被消费1次

[3] 注意消费者组的情况系

允许消费者组中多个消费者并行有序处理消息,组中的消费者数量最好不要大于 topic 的 partition(分区)数量。

    消费者数=分区数:每个消费者消费一个分区的消息;

    消费者数<分区数:某些消费者会处理多个分区的消息;

    消费者数>分区数:多余的消费者将空等,无法处理消息;

(3)消费者和生产者分区数设置比例

kafka消费者默认即为一个线程对象。为了达到最快的消费速度:

消费者服务器数 * 每台cpu核数 = kafkaTopicPartition数。所以消费者过多也是浪费的(但要注意消费者读取阶段是浪费的,数据的处理阶段是不浪费的)。

消费者服务器可以是spark集群,flik集群等都可以。(spark章节将会具体讲到)

N.4 Kafka案例如下

(0)前提注意

启动的时候 每次退出的时候 记得把生产者 和消费者给kill下 ,否则会积累太多

(1)不使用消费者组的情况

消费命令:bin/kafka-console-consumer.sh --bootstrap-server 134.224.115.40:9092 --from-beginning --topic topic_1

(2)使用消费者组的情况

消费命令:bin/kafka-console-consumer.sh --bootstrap-server 134.224.115.40:9092 --topic topic_1 --from-beginning --consumer-property group=11

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/936743
推荐阅读
相关标签
  

闽ICP备14008679号