当前位置:   article > 正文

kafka_kafak消费者组的topic的lag值为-

kafak消费者组的topic的lag值为-

一,打开server.properties,查看kafka数据的存储文件

log.dirs=/tmp/kafka-logs

二,清空topic中的数据

1.对目标topic添加TTL/retention.ms配置项
bin/kafka-configs.sh --zookeeper 10.251.35.201:2181  --entity-type topics --alter --add-config retention.ms=10000 --entity-name bdcr_dws.dws_pro_sinv_ds

2.检查目标topic的配置项
bin/kafka-configs.sh --zookeeper 10.251.35.201:2181 --entity-type topics --describe --entity-name bdcr_dws.dws_pro_sinv_ds

3.移除目标topic的TTL/retention.ms配置项
bin/kafka-configs.sh --zookeeper 10.251.35.201:2181  --entity-type topics --alter --delete-config retention.ms --entity-name bdcr_dws.dws_pro_sinv_ds

4.在server.propertie中设置全局策略,log.retention.hours/minutes/ms是Kafka segment log的保存周期, 默认保存7天
log.retention.hours=72
log.cleanup.policy=delete

5.可通过以下命令查看目标topic在清空数据后其占用空间是否变小,清空需要等待一定时间,因为:log.retention.check.interval.ms 删除策略的检查周期
ls -l /tmp/kafka-logs/MyTopic-0/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

三,

kafka常用命令1

1.显示时间戳相应的offset
sh bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.251.35.46:9092,10.251.35.64:9092,10.251.35.68:9092 -topic bdc_yg_order_ec_kudu_binlog -time 1616687915000

2.向kafka中写数据:
bin/kafka-console-producer.sh --broker-list 10.83.80.14:9092 --topic zhangkeketest

3.读取kafka中的数据:
bin/kafka-console-consumer.sh --bootstrap-server 10.83.80.14:9092 --topic zhangkeketest --from-beginning

4.创建topic,--replication-factor指定partition的replicas数
bin/kafka-topics.sh --create --topic zhangkeke1 --zookeeper 10.83.80.14:2181 --replication-factor 2 --partitions 3

5.删除topic(不是彻底删除,彻底删除需要进入kafka的配置文件和zookeeper的配置文件来进行彻底删除):
bin/kafka-topics.sh --delete --zookeeper 10.83.80.14:2181 --topic sentinel_metric_dev

6.查看kafka中有哪些topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181

7.查看指定topic的详细信息
bin/kafka-topics.sh --topic zhangkeke1 --describe --zookeeper 10.83.80.5:2181

8.查看topic的分区情况、leader、副本数、ISR
bin/kafka-topics.sh --topic bdcr_dws.dws_base_gms_inv_data_dd --describe --zookeeper 10.251.35.201:2181

9.查看kafka的所有消费者组
bin/kafka-consumer-groups.sh --bootstrap-server 10.251.35.46:9092 --list

10.查看消费者组的偏移量、Lag值等信息
bin/kafka-consumer-groups.sh --bootstrap-server 10.251.35.46:9092 --describe --group sub20211202008

11.将数据追加到文本中:
bin/kafka-console-consumer.sh --bootstrap-server bjds-yg-bi-prd-10-240-20-71-belle.lan:9092 --topic zhangkeketest1 >> /home/bi_bdc_etl/zkk/zkk.txt

12.手动设置offset
bin/kafka-consumer-groups.sh --bootstrap-server kafka_ip:kafka_port --group 3 --topic  test --execute --reset-offsets --to-offset 1524510
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

kafka常用命令2

1.指定 JMX port 端口启动,指定 jmx,可以方便监控 Kafka 集群
JMX_PORT=9991 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

2.增加 Topic 的 partition 数
/usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 5

3.查看 topic 指定分区 offset 的最大值或最小值,time 为 -1 时表示最大值,为 -2 时表示最小值(已调试成功)
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic bdcr_dws.dws_pro_sinv_ds --time -1 --broker-list 10.251.35.201:9092 --partitions 0

4.指定分区消费数据(已调试成功)
bin/kafka-console-consumer.sh --bootstrap-server 10.251.35.201:9092 --topic bdcr_dws.dws_pro_sinv_ds --offset latest --partition 0

5.指定分区只取一条(已调试成功)
bin/kafka-console-consumer.sh --bootstrap-server 10.251.35.201:9092 --topic bdcr_dws.dws_pro_sinv_ds --offset latest --partition 0 --max-messages 1

6.全数据只取一条(已调试成功)
bin/kafka-console-consumer.sh --bootstrap-server 10.251.35.201:9092 --topic bdcr_dws.dws_pro_sinv_ds --max-messages 1

7.指定消费者组消费数据(已调试成功)
bin/kafka-console-consumer.sh --bootstrap-server 10.251.35.201:9092 --topic bdcr_dws.dws_pro_sinv_ds -group test_group --from-beginning

8.删除消费者组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --delete

9.删除group中的topic
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test --delete

10.平衡leader
bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092

11.自带压测工具
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092

12.查看不可用的分区
bin/kafka-topics.sh --describe --unavailable-partitions --zookeeper 10.251.35.201:2181 --topic bdcr_dws.dws_pro_sinv_ds
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

四,使用java消费topic可知该topic每秒新增多少数据量

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class JavaConsumeTopicKnowsThroughputPerSecond {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "10.9.251.30:9092");//xxx是服务器集群的ip
        properties.put("group.id", "id16");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.offset.reset", "latest");  //earliest  latest
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        kafkaConsumer.subscribe(Arrays.asList("kafka_11_test"));

        int i = 0;

        while (true) {
//            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1000));

            ConsumerRecords<String, String> records = kafkaConsumer.poll(10);

            for (ConsumerRecord<String, String> record : records) {
//                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
//                System.out.println(record.partition());

                System.out.println(++i);

            }
        }
    }

}

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.1</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

五,对“bin/kafka-consumer-groups.sh -bootstrap-server 10.251.35.201:9092 --describe --group test_group001”命令的解释,该命令是动态显示,会显示目标消费者组当前Lag值、该消费者组对应有效的消费者等

$ /opt/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh -bootstrap-server 0.0.0.0:9092 --describe --group myConsumerGroup
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                     HOST            CLIENT-ID
myConsumerGroup userMsg         0          0               0               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         1          1               1               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         2          0               0               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         3          0               0               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • TOPIC: 该消费者组消费的是哪些topic
  • PARTITION: 表示该消费者消费的是哪些分区
  • CURRENT-OFFSET: 表示消费者组最新消费的位移值, 此值在消费过程中是变化的
  • LOG-END-OFFSET: 表示topic所有分区当前的日志终端位移值, 因为我们生产了1数据, 所以此处是1
  • LAG: 表示滞后进度, 此值为LOG-END-OFFSET 与 CURRENT-OFFSET的差值, 代表的是滞后情况, 此值越大表示滞后严重, 本例LAG为0 说明没有消费滞后。

1.生产者生产第二条消息msg2,查看myConsumerGroup消费者组:

bash-4.4# /opt/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh -bootstrap-server 0.0.0.0:9092 --describe --group myConsumerGroup

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                     HOST            CLIENT-ID
myConsumerGroup userMsg         0          0               0               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         1          1               1               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         2          0               0               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         3          1               1               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

生产更多的消息:

bash-4.4# /opt/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh -bootstrap-server 0.0.0.0:9092 --describe --group myConsumerGroup

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                     HOST            CLIENT-ID
myConsumerGroup userMsg         0          3               3               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         1          5               5               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         2          7               7               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         3          5               5               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

再开启一个消费者,同样属于 myConsumerGroup。那么消费者组中有2个消费者。查看myConsumerGroup,看到多了一个CONSUMER-ID. 说明这个消费者组中有2个消费者。

bash-4.4# /opt/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh -bootstrap-server 0.0.0.0:9092 --describe --group myConsumerGroup

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                     HOST            CLIENT-ID
myConsumerGroup userMsg         0          3               3               0               consumer-myConsumerGroup-1-033a09ab-9942-473d-9d54-386b27718725 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         1          5               5               0               consumer-myConsumerGroup-1-033a09ab-9942-473d-9d54-386b27718725 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         2          7               7               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         3          5               5               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2.生产2条消息,消费者组里的消费者各收到一条。
查看消费者组信息。消费2条之前:

bash-4.4# /opt/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh -bootstrap-server 0.0.0.0:9092 --describe --group myConsumerGroup

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                     HOST            CLIENT-ID
myConsumerGroup userMsg         0          3               3               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         1          5               5               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         2          7               7               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         3          5               5               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

消费2条之后: 可以看到两个消费者各消费一条

bash-4.4# /opt/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh -bootstrap-server 0.0.0.0:9092 --describe --group myConsumerGroup

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                     HOST            CLIENT-ID
myConsumerGroup userMsg         0          4               4               0               consumer-myConsumerGroup-1-033a09ab-9942-473d-9d54-386b27718725 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         1          5               5               0               consumer-myConsumerGroup-1-033a09ab-9942-473d-9d54-386b27718725 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         2          8               8               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
myConsumerGroup userMsg         3          5               5               0               consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5     consumer-myConsumerGroup-1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3.如果使消费者组myConsumerGroup增加到5个消费者,那么使用该命令会发现之后四个消费者在消费!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/621322
推荐阅读
相关标签
  

闽ICP备14008679号