赞
踩
不会,kafka中数据的删除跟有没有消费者消费完全无关。
数据的删除,只跟kafka broker上面上面的这两个配置有关:
- # 数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略,log.retention.bytes和log.retention.minutes或log.retention.hours任意一个达到要求,都会执行删除
- # 有2删除数据文件方式: 按照文件大小删除:log.retention.bytes 按照2中不同时间粒度删除:分别为分钟,小时
- log.retention.hours=168
-
- # topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
- # log.retention.bytes=-1
- log.retention.bytes=1073741824 #数据最多1G
Kafka日志管理器中会有一个专门的日志删除任务来周期性检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300,000,即5分钟。
当前日志分段的保留策略有3种:基于时间的保留策略、基于日志大小的保留策略以及基于日志起始偏移量的保留策略。
2.1、 基于时间
日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值retentionMs来寻找可删除的的日志分段文件集合deletableSegments。retentionMs可以通过broker端参数log.retention.hours、log.retention.minutes以及log.retention.ms来配置,其中log.retention.ms的优先级最高,log.retention.minutes次之,log.retention.hours最低。默认情况下只配置了log.retention.hours参数,其值为168,故默认情况下日志分段文件的保留时间为7天。
2.2、 基于日志大小
日志删除任务会检查当前日志的大小是否超过设定的阈值retentionSize来寻找可删除的日志分段的文件集合deletableSegments。retentionSize可以通过broker端参数log.retention.bytes来配置,默认值为-1,表示无穷大。注意log.retention.bytes配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。
2.3、 基于日志起始偏移量
参考文档:https://blog.csdn.net/u013256816/article/details/80418297
3.1、日志删除(Log Deletion):按照一定的保留策略来直接删除不符合条件的日志分段。
3.2、日志压缩(Log Compaction):针对每个消息的key进行整合,对于有相同key的的不同value值,只保留最后一个版本。
可以通过broker端参数log.cleanup.policy来设置日志清理策略,此参数默认值为“delete”,即采用日志删除的清理策略。如果要采用日志压缩的清理策略的话,就需要将log.cleanup.policy设置为“compact”,并且还需要将log.cleaner.enable(默认值为true)设定为true。通过将log.cleanup.policy参数设置为“delete,compact”还可以同时支持日志删除和日志压缩两种策略。
- Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
- at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:775)
该Consumer不能提交offset了,因为它已经出局了,是因为你的处理小时时间长于你要报告给server的时间。同时还告诉我们怎么处理:要么增加超时时间,要么减少每次poll回来的消息个数。
异常解决:
# vim server.properties
listeners=PLAINTEXT://192.168.20.112:9092 #打开注释
- //1、通过上面的配置文件生成 Producer 对象
- Producer producer = new KafkaProducer(kafkaProperties);
- //2、生成 ProducerRecord 对象,并制定 Topic,key 以及 value
- //创建名为testTopic的队列,键为testkey,值为testValue的ProducerRecord对象
- ProducerRecord<String,String> record = new ProducerRecord<>("testTopic","testkey","testValue");
- //3、发送消息
- producer.send(record);
通过配置文件构造一个生产者对象 producer,然后指定主题名称,键值对,构造一个 ProducerRecord 对象,最后使用生产者Producer 的 send() 方法发送 ProducerRecord 对象,send() 方法会返回一个包含 RecordMetadata 的 Future 对象,不过通常我们会忽略返回值。
和上面的名字一样——发送就忘记,生产者只管发送,并不管发送的结果是成功或失败。通常如果我们不关心发送结果,那么就可以使用此种方式。
- //1、通过上面的配置文件生成 Producer 对象
- Producer producer = new KafkaProducer(kafkaProperties);
-
- //2、生成 ProducerRecord 对象,并制定 Topic,key 以及 value
- //创建名为testTopic的队列,键为testkey,值为testValue的ProducerRecord对象
- ProducerRecord<String,String> record = new ProducerRecord<>("testTopic","testkey","testValue");
- //3、同步发送消息
- try {
- //通过send()发送完消息后返回一个Future对象,然后调用Future对象的get()方法等待kafka响应
- //如果kafka正常响应,返回一个RecordMetadata对象,该对象存储消息的偏移量
- //如果kafka发生错误,无法正常响应,就会抛出异常,我们便可以进行异常处理
- producer.send(record).get();
- } catch (Exception e) {
- //4、异常处理
- e.printStackTrace();
- }
和上面普通发送消息一样,只不过这里我们调用了 Future 对象的 get() 方法来等待 kafka 服务器的响应,程序运行到这里会产生阻塞,直到获取kafka集群的响应。而这个响应有两种情况:
1、正常响应:返回一个 RecordMetadata 对象,通过该对象我们能够获取消息的偏移量、分区等信息。
2、异常响应:基本上来说会发生两种异常,
一类是可重试异常,该错误可以通过重发消息来解决。比如连接错误,可以通过再次连接后继续发送上一条未发送的消息;再比如集群没有首领(no leader),因为我们知道集群首领宕机之后,会有一个时间来进行首领的选举,如果这时候发送消息,肯定是无法发送的。
二类是无法重试异常,比如消息太大异常,对于这类异常,KafkaProducer 不会进行任何重试,直接抛出异常。
同步发送消息适合需要保证每条消息的发送结果,优点是能够精确的知道什么消息发送成功,什么消息发送失败,而对于失败的消息我们也可以采取措施进行重新发送。缺点则是增加了每条消息发送的时间,当发送消息频率很高时,此种方式便不适合了。
有同步发送,基本上就会有异步发送了。同步发送每发送一条消息都得等待kafka服务器的响应,之后才能发送下一条消息,那么我们不是在错误产生时马上处理,而是记录异常日志,然后马上发送下一条消息,而这个异常再通过回调函数去处理,这就是异步发送。
1、首先我们要实现一个继承 org.apache.kafka.clients.producer.Callback 接口,然后实现其唯一的 onCompletion 方法。
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.RecordMetadata;
-
-
- public class KafkaCallback implements Callback{
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if(e != null){
- //异常处理
- e.printStackTrace();
- }
- }
- }
2、发送消息时,传入这个回调类。
- //异步发送消息
- producer.send(record,new KafkaCallback());
或者通过内部类实现:
- public static void main(String[] args) {
- // 获取生产者
- Producer<String, String> producer = MQDict.getKafkaProducer();
- for (int i = 0; i < 100; i++) {
- ProducerRecord<String, String> record = new ProducerRecord<String, String>(MQDict.PRODUCER_TOPIC, "key" + Integer.toString(i), "value" + Integer.toString(i));
- // 异步发送
- producer.send(record, new Callback() {
-
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- // exception == null代表消息发送成功
- System.out.println("消息发送成功......");
- } else {
- // 消息发送失败,执行错误的逻辑
- System.out.println("消息发送失败......");
- if (exception instanceof RetriableException) {
- // 处理可重试瞬时异常
- // ...
- } else {
- // 处理不可重试异常
- // ...
- }
-
- }
- }
- });
- }
- System.out.println("消息生产结束......");
- // 关闭kafkaProduce对象
- producer.close();
- System.out.println("关闭生产者......");
- }
异常解决:
vim kafka-run-class.sh
去掉这个配置
-XX:+UseCompressedOops
https://blog.csdn.net/Dongguabai/article/details/86543698
- 1.订阅消息可以订阅多个主题
- 2.ConsumerConfig.GROUP_ID_CONFIG表示消费者的分组,kafka根据分组名称判断是不是同一组消费者,同一组消费者去消费一个主题的数据的时候,数据将在这一组消费者上面轮询。
- 3.主题涉及到分区的概念,同一组消费者的个数不能大于分区数。因为:一个分区只能被同一群组的一个消费者消费。出现分区小于消费者个数的时候,可以动态增加分区。
- 4.注意和生产者的对比,Properties中的key和value是反序列化,而生产者是序列化。
- 1:查看当前服务器中的所有topic。--zookeeper master:2181指定zookeeper。
- bin/kafka-topics.sh --list --zookeeper master:2181
-
- 2:创建topic。--partitions 3,指定三个分区。--replication-factor 1指定备份的副本数量。--topic topicTest,指定topic的名称。
- ./kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 3 --topic topicTest
-
- 3:删除topic
- bin/kafka-topics.sh --delete --zookeeper master:2181 --topic topicTest
- 注意:需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
-
- 4:通过shell命令发送消息。生产者。--broker-list master:9092
- bin/kafka-console-producer.sh --broker-list master:9092 --topic topicTest
-
- 可能会报错(This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt),这个版本不支持key为null:
-
- ./kafka-console-producer.sh --broker-list 192.168.20.112:9092 --topic testTopic1 \--property parse.key=true \--property key.separator=,
-
- 发送消息格式(testkey,testvalue)
-
- 5:通过shell消费消息。消费者。--from-beginning从最开始消费。
- bin/kafka-console-consumer.sh --zookeeper master:2181 --from-beginning --topic topicTest
-
- 收到消息:testvalue
-
- 获取全量的topic上的数据:
- ./kafka-console-consumer.sh --zookeeper 192.168.20.112:2181 --topic topicTest --from-beginning
-
- 6:查看消费位置
- bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper master:2181 --group testGroup
-
- 7:查看某个Topic的详情
- bin/kafka-topics.sh --topic SimpleDemo3 --describe --zookeeper 192.168.20.112:2181
-
- #下面是显示信息
- Topic:ssports PartitionCount:1 ReplicationFactor:2 Configs:
- Topic: SimpleDemo3 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1
- #分区数为1 复制因子为2 当前SimpleDemo3的分区为0
- #Replicas: 0,1 复制的为0,1
-
- 8:kafka topic增加partition,修改topic分区数
- ./kafka-topics.sh --alter --topic SimpleDemo3 --zookeeper 192.168.20.112:2181 --partitions 6
生产者(Producer):
- public class ProducerSimpleDemo {
-
- public static void main(String[] args) {
- // 获取生产者
- Producer<String, String> producer = MQDict.getKafkaProducer();
-
- for (int i = 0; i < 500; i++) {
- // 方式1
- // 构造好kafkaProducer实例以后,下一步就是构造消息实例。生成 ProducerRecord 对象,并制定 Topic,key 以及 value
- // producer.send(new ProducerRecord<>(MQDict.PRODUCER_TOPIC, "key" + Integer.toString(i), "value" + Integer.toString(i)));
-
- // 方式2
- // 构造待发送的消息对象ProduceRecord的对象,指定消息要发送到的topic主题,分区以及对应的key和value键值对。 注意,分区和key信息可以不用指定,由kafka自行确定目标分区。
- ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(MQDict.PRODUCER_TOPIC, "key" + Integer.toString(i), "value" + Integer.toString(i));
- // 调用kafkaProduce的send方法发送消息
- producer.send(producerRecord);
- }
- System.out.println("消息生产结束......");
- // 关闭kafkaProduce对象
- producer.close();
- System.out.println("关闭生产者......");
- }
-
- }
消费者(KafkaConsumer):
- public class ConsumerSimpleDemo {
-
- public static void main(String[] args) {
- // 获取消费者
- KafkaConsumer<String, String> consumer = MQDict.getKafkaConsumer();
-
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_SECOND_OUT); // 拉取消息,阻塞时间5秒
-
- if (records.isEmpty())
- break;
- // 遍历消息并打印value
- records.forEach(rec -> System.out
- .println("主题topic:" + rec.topic() + "; topickey:" + rec.key() + "; topicval:" + rec.value()));
-
-
- /*for (ConsumerRecord<String, String> record : records) {
- // 简单的打印输出
- System.out.println(
- "offset = " + record.offset() + ",key = " + record.key() + ",value =" + record.value());
- }*/
- }
- // 关闭消费者
- consumer.close();
- }
-
- }
此项目是熟悉kafka工具以及用Java调用kafkaApi进行消息的发送与消费的测试案例。
项目特性
1、kafka生产者、消费者简单demo,Java调用。
2、kafka同步、异步调用。
3、kafka自定义分区的使用。
4、kafka自定义指定序列化生产的消息。
5、kafka拦截器链的使用。
6、kafka消费者消费消息之每个线程维护一个KafkaConsumer实例,用户创建多个线程消费topic数据,每个线程都会创建专属该线程的KafkaConsumer实例。
7、单独KafkaConsumer实例和多worker线程。使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,之后worker线程完成处理后上报位移状态,由全局consumer提交位移。
8、获取所有kafka所有的Topic。
9、获取当前topic每个分区内最新的30条消息(如果topic额分区内有没有30条,就获取实际消息)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。