赞
踩
Consumer消费数据流程
offset相关
Consumer从kafka的磁盘中消费数据,所以不用担心数据丢失问题。
但是,Consumer作为一个消费者,是有可能出现宕机等问题的,也就意味着会出现重启后,继续消费的问题,那么就必须要消费者偏移量,消费到哪条数据了。
结论:offset是用来记录Consumer的消费位置的,由Consumer自己负责维护(提交),保存在kafka的broker的内置topic中
# consumer重启offset机制,三个可选值,过早的offset记录会被删除。 auto.offset.reset=latest # 默认值,从最新的offset继续消费数据。 # 自动提交offset 默认情况下Consumer的offset自动提交。 # ------------------配置参数----------------------- # 自动提交开启 enable.auto.commit=true # 默认值 # 自动提交的时间间隔 auto.commit.interval.ms=5000 # 默认值5000 单位毫秒。``` // java配置 config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); # 手动提交offset 通过代码的方式手动明确offset提交的方式。 `config.put(ConsumerConfig.ENABLE_AUTO_CO``MMIT_CONFIG,"false"); // 异步提交: consumer只需要发出提交offset的指令之后,就可以继续消费数据,不需要等待本地offset是否提交成功。 //3. 消费数据 while (true){ //JDK1.8 的API 毫秒数, ConsumerRecords<String, String> crs = kafkaConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> cr : crs) { System.out.println("cr = " + cr); } kafkaConsumer.commitAsync(); } // 同步提交: consumer提交完毕offset之后,才会继续消费数据。 while (true){ //JDK1.8 的API 毫秒数, ConsumerRecords<String, String> crs = kafkaConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> cr : crs) { System.out.println("cr = " + cr); } kafkaConsumer.commitSync(); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。