当前位置:   article > 正文

kafka常用命令_kafka-consumer-groups.sh

kafka-consumer-groups.sh

kafka常用命令

先查看所有消费组
./kafka-consumer-groups.sh --zookeeper kafka148:2181  --list
# ./kafka-consumer-groups.sh --zookeeper kafka148:2181  --list   
console-consumer-65556
console-consumer-23367
console-consumer-97039
test-consumer-group-001
console-consumer-79785

注意:--zookeeper 后面指定地址:端口,可以使用hostname也可以使用IP地址


查看指定消费组详情
./kafka-consumer-groups.sh --zookeeper kafka148:2181  --describe --group test-consumer-group-001

# ./kafka-consumer-groups.sh --zookeeper kafka148:2181  --describe --group test-consumer-group-001   
GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
test-consumer-group-001        tmp02                          0          17              17              0               test-consumer-group-001_kafka148-1673419620936-521e018a-0
test-consumer-group-001        tmp03                          0          4               4               0               test-consumer-group-001_kafka148-1673419620936-521e018a-0


创建Topic主题
./kafka-topics.sh --zookeeper kafka148:2181 --create  --partitions 1  --replication-factor 1 --topic  tmp02
./kafka-topics.sh --zookeeper kafka148:2181 --create  --partitions 1  --replication-factor 1 --topic  tmp03

我这里创建时指定了Topic的分区数为1个、每个分区分配1个副本。如果要设置分区数为3个、每个分区分配3个副本则设置为:--partitions 3  --replication-factor 3。
注意:副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。

查看Topic信息
./kafka-topics.sh --zookeeper kafka148:2181 --describe --topic  tmp02
./kafka-topics.sh --zookeeper kafka148:2181 --describe --topic  tmp03

# ./kafka-topics.sh --zookeeper kafka148:2181 --describe --topic  tmp02
Topic:tmp02     PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: tmp02    Partition: 0    Leader: 1       Replicas: 1     Isr: 1

# ./kafka-topics.sh --zookeeper kafka148:2181 --describe --topic  tmp02,tmp03
Topic:tmp02     PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: tmp02    Partition: 0    Leader: 1       Replicas: 1     Isr: 1
Topic:tmp03     PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: tmp03    Partition: 0    Leader: 1       Replicas: 1     Isr: 1

命令行终端消费Topic消息(数据):
./kafka-console-consumer.sh --zookeeper kafka148:2181 --topic tmp02 --from-beginning --property print.timestamp=true

--from-beginning 是为了指定从头(earlist)开始消费数据,否则如果不指定则默认从latest开始消费数据
--property print.timestamp=true 参数设置是为了打印时间戳,kafka生产者写入数据都会自动生成时间戳(精确到毫秒)

如果要指定消费者group.id等更多消费者参数,可以通过--consumer.config参数指定消费者配置文件进行消费:
./kafka-console-consumer.sh --zookeeper kafka148:2181 --topic tmp02 --from-beginning --property print.timestamp=true --consumer.config ../config/consumer.properties

consumer.properties配置文件中可指定相关参数:

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#consumer group id,如果不指定则会自动生成,自动生成的格式为consumer-group-xxxx
group.id=test-consumer-group-001

#consumer timeout
#consumer.timeout.ms=5000

#consumer client id,注意:如果并发场景下设置了client.id则需要设置成不同的id,如果不设置、会自动生成一个非空的字符串
#client.id=test-client-001

注意:消费时 --topic | --blacklist | --whitelist 只能选择三者中的一种方式


Topic白名单方式消费消息(数据)、一个消费者可以同时订阅多个Topics:
./kafka-console-consumer.sh --zookeeper kafka148:2181 --whitelist tmp02,tmp03 --from-beginning --property print.timestamp=true --consumer.config ../config/consumer.properties

注意:
如果kafka创建消费者报错 consumer zookeeper is not a recognized option,可能原因是:--zookeeper 是一个过时的方法,只能在低版本使用。在高版本中这种启动方式已经被删除了。
kafka版本高于0.90的启动消费者的方法: ./kafka-console-consumer.sh --bootstrap-server 192.168.100.148:9092 --topic tmp03 --from-beginning


tmp02写数据(输入JSON数据):
./kafka-console-producer.sh --broker-list kafka148:9092 --topic tmp02
{"id":1,"name":"test01"}
{"id":2,"name":"test02"}
{"id":3,"name":"test03"}
{"id":4,"name":"test04"}
{"id":5,"name":"test05"}
{"id":6,"name":"test06"}
{"id":7,"name":"test07"}

tmp03写数据(输入JSON数据):
./kafka-console-producer.sh --broker-list kafka148:9092 --topic tmp03
{"id":1,"name":"test01","info":"aaa"}
{"id":2,"name":"test02","info":"bbb"}


topic消费进度(offset)查看

消费进度(offset)查看方法1:

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper kafka148:2181 --group test-consumer-group-001
kafka高版本可能ConsumerOffsetChecker废弃、被ConsumerGroupCommand替代

# ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper kafka148:2181 --group test-consumer-group-001
[2023-01-11 15:35:44,148] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
test-consumer-group-001 tmp02                          0   17              17              0               test-consumer-group-001_kafka148-1673419620936-521e018a-0
test-consumer-group-001 tmp03                          0   4               4               0               test-consumer-group-001_kafka148-1673419620936-521e018a-0

注意:
1) --group 参数必须指定
2) 如果订阅了多个topics时,可以通过 --topic tmp02,tmp03 指定topic,如果不指定 --topic 则默认为订阅的所有topics


消费进度(offset)查看方法2:

查看消费组的 offset 消费情况:
kafka-consumer-groups.sh 命令可以查看 offset 消费情况:
先查看所有消费组
./kafka-consumer-groups.sh --zookeeper kafka148:2181  --list

查看指定消费组信息、包括 offset 消费情况
./kafka-consumer-groups.sh --zookeeper kafka148:2181  --describe --group test-consumer-group-001
GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
test-consumer-group-001        tmp02                          0          10              10              0               test-consumer-group-001_kafka148-1673416999855-1e20c37d-0

如果指定白名单方式消费多个topics,则查看消费组的信息如下:
# ./kafka-consumer-groups.sh --zookeeper kafka148:2181  --describe --group test-consumer-group-001
GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
test-consumer-group-001        tmp02                          0          13              13              0               test-consumer-group-001_kafka148-1673418527275-878b7120-0
test-consumer-group-001        tmp03                          0          2               2               0               test-consumer-group-001_kafka148-1673418527275-878b7120-0


删除消费者(consumer):
根据group删除指定的一个消费组
./kafka-consumer-groups.sh --zookeeper kafka148:2181  --delete --group test-consumer-group-001
./kafka-consumer-groups.sh --zookeeper kafka148:2181  --list


增加Topic分区数(修改topic设置为3个分区)
./kafka-topics.sh --zookeeper kafka148:2181 --alter --topic tmp03 --partitions 3

kafka-console-consumer.sh 的参数列表:
--topic 被消费的topic
--whitelist 正则表达式,指定要包含以供使用的主题的白名单
--partition 指定分区。除非指定 --offset ,否则从分区结束(latest)开始消费
--offset 执行消费的起始offset位置。默认值:latest。支持:latest | earliest | <offset>
--consumer-property 将用户定义的属性以key=value的形式传递给使用者
--consumer.config 消费者配置属性文件,请注意:[consumer-property]优先于此配置
--formatter 用于格式化kafka消息以供显示的类的名称,默认值:kafka.tools.DefaultMessageFormatter。支持:
    kafka.tools.DefaultMessageFormatter
    kafka.tools.LoggingMessageFormatter
    kafka.tools.NoOpMessageFormatter
    kafka.tools.ChecksumMessageFormatter
--property 初始化消息格式化程序的属性。支持:
    print.timestamp=true|false
    print.key=true|false
    print.value=true|false
    key.separator=<key.separator>
    line.separator=<line.separator>
    key.deserializer=<key.deserializer>
    value.deserializer=<value.deserializer>
--from-beginning 从存在的最早消息开始,而不是从最新消息开始。
--max-messages 消费的最大数据量。若不指定,则持续消费下去
--timeout-ms 在指定时间间隔内没有消息可用时退出
--skip-message-on-error  如果处理消息时出错,请跳过它而不是暂停 
--bootstrap-server 必需的参数(使用高版本的消费者时)。要连接的服务器
--zookeeper 必需的参数(使用低版本的消费者时)。连接zookeeper的字符串。可以给出多个URL以允许故障转移
--key-deserializer 指定消费key的编码方式。org.apache.kafka.common.serialization.StringDeserializer
--value-deserializer 指定消息体的编码方式。org.apache.kafka.common.serialization.StringDeserializer
--enable-systest-events 除记录消费的消息外,还记录消费者的生命周期(用于系统测试)
--isolation-level  设置为:read_committed 则过滤掉未提交的事务性消息。设置为:read_uncommitted 则读取所有消息。默认值为:read_uncommitted
--group 指定消费者所属组的ID
--blacklist 要从消费中排除的主题黑名单
--csv-reporter-enabled 如果设置,将启用csv metrics报告器
--delete-consumer-offsets 如果指定,则启动时删除zookeeper中的消费者信息
--metrics-dir 输出csv度量值,需与[csv-reporter-enable]配合使用

========================

注意:kafka 部署的如果是高版本(例如:kafka_2.12-3.5.1)则不支持 --zookeeper 参数
高版本上指定 --zookeeper 参数执行会报错: zookeeper is not a recognized option
./kafka-topics.sh --zookeeper kafka148:2181 --list
Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option

kafka 高版本上需要用 --bootstrap-server 参数
./kafka-topics.sh --bootstrap-server kafka148:9092 --list
 

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号