赞
踩
目录
Kafka的消息存储在磁盘中,为了控制磁盘占用空间,Kafka需要不断地对过去的一些消息进行清理工作。Kafka的每个分区都有很多的日志文件,这样也是为了方便进行日志的清理。在Kafka中,提供两种日志清理方式:
在Kafka的broker或topic配置中:
配置项 | 配置值 | 说明 |
---|---|---|
log.cleaner.enable | true(默认) | 开启自动清理日志功能 |
log.cleanup.policy | delete(默认) | 删除日志 |
log.cleanup.policy | compaction | 压缩日志 |
log.cleanup.policy | delete,compact | 同时支持删除、压缩 |
日志删除是以段(segment日志)为单位来进行定期清理的。
Kafka日志管理器中会有一个专门的日志删除任务来定期检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300,000,即5分钟。当前日志分段的保留策略有3种:
以下三种配置可以指定如果Kafka中的消息超过指定的阈值,就会将日志进行自动清理:
其中,优先级为 log.retention.ms > log.retention.minutes > log.retention.hours。默认情况,在broker中,配置如下:
log.retention.hours=168
也就是,默认日志的保留时间为168小时,相当于保留7天。
删除日志分段时:
设置topic的删除策略
日志删除任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合。可以通过broker端参数 log.retention.bytes 来配置,默认值为-1,表示无穷大。如果超过该大小,会自动将超出部分删除。
注意:
log.retention.bytes 配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。
每个segment日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么这些日志文件将会标记为删除。
Log Compaction是默认的日志删除之外的清理过时数据的方式。它会将相同的key对应的数据只保留一个版本。
生产者和消费者以极高的速度生产/消费大量数据或产生请求,从而占用broker上的全部资源,造成网络IO饱和。有了配额(Quotas)就可以避免这些问题。Kafka支持配额管理,从而可以对Producer和Consumer的produce&fetch操作进行流量限制,防止个别业务压爆服务器。
为所有client id设置默认值,以下为所有producer程序设置其TPS不超过1MB/s,即1048576/s,命令如下:
bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --add-config 'producer_byte_rate=1048576' --entity-type clients --entity-default
运行基准测试,观察生产消息的速率
bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 acks=1
结果:
50000 records sent, 1108.156028 records/sec (1.06 MB/sec)
对consumer限速与producer类似,只不过参数名不一样。
为指定的topic进行限速,以下为所有consumer程序设置topic速率不超过1MB/s,即1048576/s。命令如下:
bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-default
运行基准测试:
bin/kafka-consumer-perf-test.sh --broker-list node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 --topic test --fetch-size 1048576 --messages 500000
结果为:
MB.sec:1.0743
使用以下命令,删除Kafka的Quota配置
- bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --delete-config 'producer_byte_rate' --entity-type clients --entity-default
- bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --delete-config 'consumer_byte_rate' --entity-type clients --entity-default
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- spring.kafka.angyan.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
- spring.kafka.angyan.clientId=TEST_DEMO_MESSAGE
- spring.kafka.angyan.producer.compressionType=gzip
- spring.kafka.angyan.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.angyan.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
- # 提交延时
- spring.kafka.angyan.linger.ms=1000
- spring.kafka.angyan.template.defaultTopic=TEST_DEMO_TOPIC
- @Service
- public class KafkaServiceImpl implements KafkaService {
-
- @Qualifier("kafkaTemplate")
- @Autowired
- private KafkaTemplate kafkaRecordTemplate;
-
-
- @Override
- public String sendMessage(String key, byte[] bytes) {
- Map header = new HashMap();
- header.put(KafkaHeaders.KEY,key);
- MessageHeaders messageHeaders = new MessageHeaders(header);
- Message message = MessageBuilder.createMessage(bytes, messageHeaders);
- kafkaRecordTemplate.send(message);
-
- return null;
- }
-
- }
- pring.kafka.angyan.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
- spring.kafka.angyan.consumer.group.id=TEST_DEMO_MESSAGE
- spring.kafka.angyan.consumer.clientId=TEST_DEMO_MESSAGE
- spring.kafka.angyan.defaultTopic=TEST_DEMO_TOPIC
- spring.kafka.angyan.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.angyan.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
- @Component
- public class KafkaCustomer {
-
- @KafkaListener(topics = "${spring.kafka.angyan.defaultTopic}",containerFactory = "kafkaTemplateConsumer")
- public void testKafka(ConsumerRecord<String,byte[]> record){
- //处理业务逻辑
-
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。