赞
踩
1.Kafka数据重复的解决方案:
对每个生产者生成的每条数据,都添加由生产者id,分区号,随机的序列号组成的标识符: (producerid,partition,SequenceId),通过标识符对数据进行去重。
2.Kafka数据丢失的解决方案:
Kafka自动提交和手动提交说明:
使用原始apache-kafka依赖的API来消费数据:
使用spring-kafka的@Listener注解来消费数据:
AckMode模式可以在工厂类配置:
- @Bean
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualListenerContainerFactory(
- ConsumerFactory<String, String> consumerFactory) {
-
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory);
- factory.getContainerProperties().setPollTimeout(1500);
- factory.setBatchListener(true);
- //配置手动提交offset,默认BATCH
- factory.getContainerProperties().setAckMode(AckMode.BATCH);
- return factory;
- }
AckMode模式 | 作用 |
---|---|
BATCH | 默认的提交模式。当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交,由Spring帮我们提交。 |
RECORD | 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交,由Spring帮我们提交。 |
TIME | 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交,由Spring帮我们提交。 |
COUNT | 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交,由Spring帮我们提交。 |
COUNT_TIME | TIME和COUNT有一个条件满足时提交,由Spring帮我们提交。 |
MANUAL | 需要对监听消息的方法中引入 Acknowledgment参数,并在代码中调用acknowledge()方法进行手动提交。实际上,对于每一批poll()的数据,每次调用acknowledge()方法之后仅仅是将offset存放到本地map缓存,在下一次poll的时候,在poll新数据之前从缓存中拿出来批量提交,也就是说与BATCH有相同的语义。 |
MANUAL_IMMEDIATE | 需要对监听消息的方法中引入 Acknowledgment参数,并在代码中调用acknowledge()方法进行手动提交。实际上,对于每一批poll()的数据,每次调用acknowledge()方法之后立即进行偏移量的提交。 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。