当前位置:   article > 正文

4.Kafka常用操作命令_kafka-topics.sh create

kafka-topics.sh create

操作命令

以下所有操作命令都kafka安装目录下操作。

  1. 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
  • 1
  • -daemon:表示以非阻塞式后台启动kafka
  1. 关闭kafka
bin/kafka-server-stop.sh
  • 1
  1. 创建topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test_topic
  • 1
  • –zookeeper:连接zookeeper服务地址(2.8++版本使用–bootstrap-server <kafka连接地址>)
  • –create:创建topic行为
  • –replication-factor:副本数
  • –partitions:分区数
  • –topic:指定要创建topic名称
    注:副本数不能大于kafka集群数,否则报错:Replication factor: 3 larger than available brokers: 1.
  1. 查看所有topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
  • 1
  • –zookeeper:连接zookeeper服务地址(2.8++版本使用–bootstrap-server <kafka连接地址>)
  • –list:列举所有topic
  1. 删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test_topic
  • 1
  • –zookeeper:连接zookeeper服务地址(2.8++版本使用–bootstrap-server <kafka连接地址>)
  • –delete:删除topic行为
  • –topic:指定要删除topic名称
    注:需要server.properties 中设置 delete.topic.enable=true 否则只是标记删除
  1. 查看指定topic详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic first
  • 1
  • –zookeeper:连接zookeeper服务地址(2.8++版本使用–bootstrap-server <kafka连接地址>)
  • –describe:描述topic行为
  • –topic:指定要描述topic名称
  1. 生产消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
  • 1
  • –bootstrap-server:连接kafka服务地址,旧版本用–broker-list
  • –topic:指定生产的消息将发送到哪个topic
  1. 消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first
  • 1
  • –bootstrap-server:连接kafka服务地址,旧版本用–broker-list
  • –from-beginning:可选,从最开始的消息开始消费,为空则默认只消费最新的消息
  • –topic:指定将消费哪个topic的消息
  1. 查看所有消费者组
kafka9092/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  • 1
  • –bootstrap-server:连接kafka服务地址,旧版本用–broker-list
  • –list:列出所有消费者组名称
  1. 查看指定消费者组详情
kafka9092/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --describe
  • 1
  • –bootstrap-server:连接kafka服务地址,旧版本用–broker-list
  • –group:指定消费者组名称
  • –describe:对指定消费者组进行描述
    在这里插入图片描述
    表头描述:
    GROUP:消费者组名称
    TOPIC:主题名称
    PARTITION:分区号
    CURRENT-OFFSET:当前消费者组在指定主题分区的偏移量
    LOG-END-OFFSET:当前消费者组在指定主题分区的最大偏移量,如果CURRENT-OFFSET = LOG-END-OFFSET则表示当前分区的偏移量为最新的,等价于latest
    LAG:当前偏移量,LAG = LOG-END-OFFSET - CURRENT-OFFSET
  1. 重置指定消费者组偏移量
kafka9092/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test_topic --reset-offsets --to-offset 100 --execute
  • 1
  • –bootstrap-server:连接kafka服务地址,旧版本用–broker-list
  • –group:指定消费者组名称
  • –topic:指定主题名称
  • –reset-offsets:指定重置策略
  • –execute:重置方案

重置策略:
–to-earliest:把位移调整到分区当前最小位移
–to-latest:把位移调整到分区当前最新位移
–to-current:把位移调整到分区当前位移
–to-offset <offset>: 把位移调整到指定位移处
–shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
–to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
–by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
–from-file <file>:从CSV文件中读取调整策略

执行方案:
–execute:执行真正的位移调整
–export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用

注:consumer group状态必须是inactive的(否则报错:Error: Assignments can only be reset if the group 'test_group' is inactive, but the current state is Stable.),即不能是处于正在工作中的状态;不加执行方案,默认是只做打印操作

  1. Kafka Producer 压力测试
bin/kafka-producer-perf-test.sh --topic topic_test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=10.211.55.5:8082,10.211.55.5:8083,10.211.55.5:8084 batch.size=16384 linger.ms=0
  • 1
  • topic:指定压力测试的主题。
  • record-size:一条信息有多大,单位是字节。
  • num-records:总共发送多少条信息。
  • throughput:每秒多少条信息,设成-1,表示不限流,尽可能快的生产数据,可测出生产者最大吞吐量。
  • producer-props:配置生产者的信息,比如batch.size、linger.ms,配置格式:键=值。
  1. Kafka Consumer压力测试
bin/kafka-consumer-perf-test.sh --bootstrap-server 10.211.55.5:8082,10.211.55.5:8083,10.211.55.5:8084 --topic topic_test --messages 1000000 --consumer.config config/consumer.properties 
  • 1
  • –bootstrap-server:指定kafka集群地址。
  • –topic:指定 topic 的名称。
  • –messages:总共要消费的消息条数,和生产者压力测试–num-records参数对应。
  • –consumer.config:指定消费者配置文件。
  • –group:指定消费者组ID,默认:perf-consumer-19668。
  1. 查看Kafka堆使用情况
#参数pid的值通过jps来查询kafka对应的pid值
jhsdb jmap --heap --pid 29495
  • 1
  • 2

查看kafka集群是否成功

  1. 创建集群主题
    创建主题的时候指定集群和副本数,不报一下类似的错误并且创建主题成功,则表示集群成功。
    Error while executing topic command : Replication factor: 3 larger than available brokers: 1.
  2. 查看zookeeper集群信息
    查看zookeeper的brokers/ids信息,显示所有kafka的broker.id则表示集群成功。
    在这里插入图片描述
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号