赞
踩
kafka常用命令
先查看所有消费组
./kafka-consumer-groups.sh --zookeeper kafka148:2181 --list
# ./kafka-consumer-groups.sh --zookeeper kafka148:2181 --list
console-consumer-65556
console-consumer-23367
console-consumer-97039
test-consumer-group-001
console-consumer-79785
注意:--zookeeper 后面指定地址:端口,可以使用hostname也可以使用IP地址
查看指定消费组详情
./kafka-consumer-groups.sh --zookeeper kafka148:2181 --describe --group test-consumer-group-001
# ./kafka-consumer-groups.sh --zookeeper kafka148:2181 --describe --group test-consumer-group-001
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
test-consumer-group-001 tmp02 0 17 17 0 test-consumer-group-001_kafka148-1673419620936-521e018a-0
test-consumer-group-001 tmp03 0 4 4 0 test-consumer-group-001_kafka148-1673419620936-521e018a-0
创建Topic主题
./kafka-topics.sh --zookeeper kafka148:2181 --create --partitions 1 --replication-factor 1 --topic tmp02
./kafka-topics.sh --zookeeper kafka148:2181 --create --partitions 1 --replication-factor 1 --topic tmp03
我这里创建时指定了Topic的分区数为1个、每个分区分配1个副本。如果要设置分区数为3个、每个分区分配3个副本则设置为:--partitions 3 --replication-factor 3。
注意:副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。
查看Topic信息
./kafka-topics.sh --zookeeper kafka148:2181 --describe --topic tmp02
./kafka-topics.sh --zookeeper kafka148:2181 --describe --topic tmp03
# ./kafka-topics.sh --zookeeper kafka148:2181 --describe --topic tmp02
Topic:tmp02 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: tmp02 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
# ./kafka-topics.sh --zookeeper kafka148:2181 --describe --topic tmp02,tmp03
Topic:tmp02 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: tmp02 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic:tmp03 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: tmp03 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
命令行终端消费Topic消息(数据):
./kafka-console-consumer.sh --zookeeper kafka148:2181 --topic tmp02 --from-beginning --property print.timestamp=true
--from-beginning 是为了指定从头(earlist)开始消费数据,否则如果不指定则默认从latest开始消费数据
--property print.timestamp=true 参数设置是为了打印时间戳,kafka生产者写入数据都会自动生成时间戳(精确到毫秒)
如果要指定消费者group.id等更多消费者参数,可以通过--consumer.config参数指定消费者配置文件进行消费:
./kafka-console-consumer.sh --zookeeper kafka148:2181 --topic tmp02 --from-beginning --property print.timestamp=true --consumer.config ../config/consumer.properties
consumer.properties配置文件中可指定相关参数:
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#consumer group id,如果不指定则会自动生成,自动生成的格式为consumer-group-xxxx
group.id=test-consumer-group-001
#consumer timeout
#consumer.timeout.ms=5000
#consumer client id,注意:如果并发场景下设置了client.id则需要设置成不同的id,如果不设置、会自动生成一个非空的字符串
#client.id=test-client-001
注意:消费时 --topic | --blacklist | --whitelist 只能选择三者中的一种方式
Topic白名单方式消费消息(数据)、一个消费者可以同时订阅多个Topics:
./kafka-console-consumer.sh --zookeeper kafka148:2181 --whitelist tmp02,tmp03 --from-beginning --property print.timestamp=true --consumer.config ../config/consumer.properties
注意:
如果kafka创建消费者报错 consumer zookeeper is not a recognized option,可能原因是:--zookeeper 是一个过时的方法,只能在低版本使用。在高版本中这种启动方式已经被删除了。
kafka版本高于0.90的启动消费者的方法: ./kafka-console-consumer.sh --bootstrap-server 192.168.100.148:9092 --topic tmp03 --from-beginning
tmp02写数据(输入JSON数据):
./kafka-console-producer.sh --broker-list kafka148:9092 --topic tmp02
{"id":1,"name":"test01"}
{"id":2,"name":"test02"}
{"id":3,"name":"test03"}
{"id":4,"name":"test04"}
{"id":5,"name":"test05"}
{"id":6,"name":"test06"}
{"id":7,"name":"test07"}
tmp03写数据(输入JSON数据):
./kafka-console-producer.sh --broker-list kafka148:9092 --topic tmp03
{"id":1,"name":"test01","info":"aaa"}
{"id":2,"name":"test02","info":"bbb"}
topic消费进度(offset)查看
消费进度(offset)查看方法1:
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper kafka148:2181 --group test-consumer-group-001
kafka高版本可能ConsumerOffsetChecker废弃、被ConsumerGroupCommand替代
# ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper kafka148:2181 --group test-consumer-group-001
[2023-01-11 15:35:44,148] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group Topic Pid Offset logSize Lag Owner
test-consumer-group-001 tmp02 0 17 17 0 test-consumer-group-001_kafka148-1673419620936-521e018a-0
test-consumer-group-001 tmp03 0 4 4 0 test-consumer-group-001_kafka148-1673419620936-521e018a-0
注意:
1) --group 参数必须指定
2) 如果订阅了多个topics时,可以通过 --topic tmp02,tmp03 指定topic,如果不指定 --topic 则默认为订阅的所有topics
消费进度(offset)查看方法2:
查看消费组的 offset 消费情况:
kafka-consumer-groups.sh 命令可以查看 offset 消费情况:
先查看所有消费组
./kafka-consumer-groups.sh --zookeeper kafka148:2181 --list
查看指定消费组信息、包括 offset 消费情况
./kafka-consumer-groups.sh --zookeeper kafka148:2181 --describe --group test-consumer-group-001
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
test-consumer-group-001 tmp02 0 10 10 0 test-consumer-group-001_kafka148-1673416999855-1e20c37d-0
如果指定白名单方式消费多个topics,则查看消费组的信息如下:
# ./kafka-consumer-groups.sh --zookeeper kafka148:2181 --describe --group test-consumer-group-001
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
test-consumer-group-001 tmp02 0 13 13 0 test-consumer-group-001_kafka148-1673418527275-878b7120-0
test-consumer-group-001 tmp03 0 2 2 0 test-consumer-group-001_kafka148-1673418527275-878b7120-0
删除消费者(consumer):
根据group删除指定的一个消费组
./kafka-consumer-groups.sh --zookeeper kafka148:2181 --delete --group test-consumer-group-001
./kafka-consumer-groups.sh --zookeeper kafka148:2181 --list
增加Topic分区数(修改topic设置为3个分区)
./kafka-topics.sh --zookeeper kafka148:2181 --alter --topic tmp03 --partitions 3
kafka-console-consumer.sh 的参数列表:
--topic 被消费的topic
--whitelist 正则表达式,指定要包含以供使用的主题的白名单
--partition 指定分区。除非指定 --offset ,否则从分区结束(latest)开始消费
--offset 执行消费的起始offset位置。默认值:latest。支持:latest | earliest | <offset>
--consumer-property 将用户定义的属性以key=value的形式传递给使用者
--consumer.config 消费者配置属性文件,请注意:[consumer-property]优先于此配置
--formatter 用于格式化kafka消息以供显示的类的名称,默认值:kafka.tools.DefaultMessageFormatter。支持:
kafka.tools.DefaultMessageFormatter
kafka.tools.LoggingMessageFormatter
kafka.tools.NoOpMessageFormatter
kafka.tools.ChecksumMessageFormatter
--property 初始化消息格式化程序的属性。支持:
print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.deserializer>
--from-beginning 从存在的最早消息开始,而不是从最新消息开始。
--max-messages 消费的最大数据量。若不指定,则持续消费下去
--timeout-ms 在指定时间间隔内没有消息可用时退出
--skip-message-on-error 如果处理消息时出错,请跳过它而不是暂停
--bootstrap-server 必需的参数(使用高版本的消费者时)。要连接的服务器
--zookeeper 必需的参数(使用低版本的消费者时)。连接zookeeper的字符串。可以给出多个URL以允许故障转移
--key-deserializer 指定消费key的编码方式。org.apache.kafka.common.serialization.StringDeserializer
--value-deserializer 指定消息体的编码方式。org.apache.kafka.common.serialization.StringDeserializer
--enable-systest-events 除记录消费的消息外,还记录消费者的生命周期(用于系统测试)
--isolation-level 设置为:read_committed 则过滤掉未提交的事务性消息。设置为:read_uncommitted 则读取所有消息。默认值为:read_uncommitted
--group 指定消费者所属组的ID
--blacklist 要从消费中排除的主题黑名单
--csv-reporter-enabled 如果设置,将启用csv metrics报告器
--delete-consumer-offsets 如果指定,则启动时删除zookeeper中的消费者信息
--metrics-dir 输出csv度量值,需与[csv-reporter-enable]配合使用
========================
注意:kafka 部署的如果是高版本(例如:kafka_2.12-3.5.1)则不支持 --zookeeper 参数
高版本上指定 --zookeeper 参数执行会报错: zookeeper is not a recognized option
./kafka-topics.sh --zookeeper kafka148:2181 --list
Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
kafka 高版本上需要用 --bootstrap-server 参数
./kafka-topics.sh --bootstrap-server kafka148:9092 --list
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。