赞
踩
log.dirs=/tmp/kafka-logs
1.对目标topic添加TTL/retention.ms配置项
bin/kafka-configs.sh --zookeeper 10.251.35.201:2181 --entity-type topics --alter --add-config retention.ms=10000 --entity-name bdcr_dws.dws_pro_sinv_ds
2.检查目标topic的配置项
bin/kafka-configs.sh --zookeeper 10.251.35.201:2181 --entity-type topics --describe --entity-name bdcr_dws.dws_pro_sinv_ds
3.移除目标topic的TTL/retention.ms配置项
bin/kafka-configs.sh --zookeeper 10.251.35.201:2181 --entity-type topics --alter --delete-config retention.ms --entity-name bdcr_dws.dws_pro_sinv_ds
4.在server.propertie中设置全局策略,log.retention.hours/minutes/ms是Kafka segment log的保存周期, 默认保存7天
log.retention.hours=72
log.cleanup.policy=delete
5.可通过以下命令查看目标topic在清空数据后其占用空间是否变小,清空需要等待一定时间,因为:log.retention.check.interval.ms 删除策略的检查周期
ls -l /tmp/kafka-logs/MyTopic-0/
1.显示时间戳相应的offset sh bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.251.35.46:9092,10.251.35.64:9092,10.251.35.68:9092 -topic bdc_yg_order_ec_kudu_binlog -time 1616687915000 2.向kafka中写数据: bin/kafka-console-producer.sh --broker-list 10.83.80.14:9092 --topic zhangkeketest 3.读取kafka中的数据: bin/kafka-console-consumer.sh --bootstrap-server 10.83.80.14:9092 --topic zhangkeketest --from-beginning 4.创建topic,--replication-factor指定partition的replicas数 bin/kafka-topics.sh --create --topic zhangkeke1 --zookeeper 10.83.80.14:2181 --replication-factor 2 --partitions 3 5.删除topic(不是彻底删除,彻底删除需要进入kafka的配置文件和zookeeper的配置文件来进行彻底删除): bin/kafka-topics.sh --delete --zookeeper 10.83.80.14:2181 --topic sentinel_metric_dev 6.查看kafka中有哪些topic: bin/kafka-topics.sh --list --zookeeper localhost:2181 7.查看指定topic的详细信息 bin/kafka-topics.sh --topic zhangkeke1 --describe --zookeeper 10.83.80.5:2181 8.查看topic的分区情况、leader、副本数、ISR bin/kafka-topics.sh --topic bdcr_dws.dws_base_gms_inv_data_dd --describe --zookeeper 10.251.35.201:2181 9.查看kafka的所有消费者组 bin/kafka-consumer-groups.sh --bootstrap-server 10.251.35.46:9092 --list 10.查看消费者组的偏移量、Lag值等信息 bin/kafka-consumer-groups.sh --bootstrap-server 10.251.35.46:9092 --describe --group sub20211202008 11.将数据追加到文本中: bin/kafka-console-consumer.sh --bootstrap-server bjds-yg-bi-prd-10-240-20-71-belle.lan:9092 --topic zhangkeketest1 >> /home/bi_bdc_etl/zkk/zkk.txt 12.手动设置offset bin/kafka-consumer-groups.sh --bootstrap-server kafka_ip:kafka_port --group 3 --topic test --execute --reset-offsets --to-offset 1524510
1.指定 JMX port 端口启动,指定 jmx,可以方便监控 Kafka 集群 JMX_PORT=9991 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties 2.增加 Topic 的 partition 数 /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 5 3.查看 topic 指定分区 offset 的最大值或最小值,time 为 -1 时表示最大值,为 -2 时表示最小值(已调试成功) bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic bdcr_dws.dws_pro_sinv_ds --time -1 --broker-list 10.251.35.201:9092 --partitions 0 4.指定分区消费数据(已调试成功) bin/kafka-console-consumer.sh --bootstrap-server 10.251.35.201:9092 --topic bdcr_dws.dws_pro_sinv_ds --offset latest --partition 0 5.指定分区只取一条(已调试成功) bin/kafka-console-consumer.sh --bootstrap-server 10.251.35.201:9092 --topic bdcr_dws.dws_pro_sinv_ds --offset latest --partition 0 --max-messages 1 6.全数据只取一条(已调试成功) bin/kafka-console-consumer.sh --bootstrap-server 10.251.35.201:9092 --topic bdcr_dws.dws_pro_sinv_ds --max-messages 1 7.指定消费者组消费数据(已调试成功) bin/kafka-console-consumer.sh --bootstrap-server 10.251.35.201:9092 --topic bdcr_dws.dws_pro_sinv_ds -group test_group --from-beginning 8.删除消费者组 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --delete 9.删除group中的topic bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test --delete 10.平衡leader bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092 11.自带压测工具 bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092 12.查看不可用的分区 bin/kafka-topics.sh --describe --unavailable-partitions --zookeeper 10.251.35.201:2181 --topic bdcr_dws.dws_pro_sinv_ds
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class JavaConsumeTopicKnowsThroughputPerSecond { public static void main(String[] args){ Properties properties = new Properties(); properties.put("bootstrap.servers", "10.9.251.30:9092");//xxx是服务器集群的ip properties.put("group.id", "id16"); properties.put("enable.auto.commit", "true"); properties.put("auto.offset.reset", "latest"); //earliest latest properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); kafkaConsumer.subscribe(Arrays.asList("kafka_11_test")); int i = 0; while (true) { // ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1000)); ConsumerRecords<String, String> records = kafkaConsumer.poll(10); for (ConsumerRecord<String, String> record : records) { // System.out.printf("offset = %d, value = %s", record.offset(), record.value()); // System.out.println(record.partition()); System.out.println(++i); } } } } <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.1</version> </dependency>
$ /opt/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh -bootstrap-server 0.0.0.0:9092 --describe --group myConsumerGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myConsumerGroup userMsg 0 0 0 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 1 1 1 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 2 0 0 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 3 0 0 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
1.生产者生产第二条消息msg2,查看myConsumerGroup消费者组:
bash-4.4# /opt/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh -bootstrap-server 0.0.0.0:9092 --describe --group myConsumerGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myConsumerGroup userMsg 0 0 0 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 1 1 1 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 2 0 0 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 3 1 1 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
生产更多的消息:
bash-4.4# /opt/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh -bootstrap-server 0.0.0.0:9092 --describe --group myConsumerGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myConsumerGroup userMsg 0 3 3 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 1 5 5 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 2 7 7 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 3 5 5 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
再开启一个消费者,同样属于 myConsumerGroup。那么消费者组中有2个消费者。查看myConsumerGroup,看到多了一个CONSUMER-ID. 说明这个消费者组中有2个消费者。
bash-4.4# /opt/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh -bootstrap-server 0.0.0.0:9092 --describe --group myConsumerGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myConsumerGroup userMsg 0 3 3 0 consumer-myConsumerGroup-1-033a09ab-9942-473d-9d54-386b27718725 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 1 5 5 0 consumer-myConsumerGroup-1-033a09ab-9942-473d-9d54-386b27718725 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 2 7 7 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 3 5 5 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
2.生产2条消息,消费者组里的消费者各收到一条。
查看消费者组信息。消费2条之前:
bash-4.4# /opt/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh -bootstrap-server 0.0.0.0:9092 --describe --group myConsumerGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myConsumerGroup userMsg 0 3 3 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 1 5 5 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 2 7 7 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 3 5 5 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
消费2条之后: 可以看到两个消费者各消费一条
bash-4.4# /opt/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh -bootstrap-server 0.0.0.0:9092 --describe --group myConsumerGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myConsumerGroup userMsg 0 4 4 0 consumer-myConsumerGroup-1-033a09ab-9942-473d-9d54-386b27718725 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 1 5 5 0 consumer-myConsumerGroup-1-033a09ab-9942-473d-9d54-386b27718725 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 2 8 8 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
myConsumerGroup userMsg 3 5 5 0 consumer-myConsumerGroup-1-48c4f84d-f40a-4d15-b0aa-071c39c3d4f5 /172.18.0.5 consumer-myConsumerGroup-1
3.如果使消费者组myConsumerGroup增加到5个消费者,那么使用该命令会发现之后四个消费者在消费!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。