赞
踩
Kafka 避免消息重复消费通常依赖于以下策略和机制:
Kafka使用Consumer Group ID来跟踪每个消费者所读取的消息。确保每个消费者都具有唯一的Group ID。如果多个消费者属于同一个Group ID,那么它们将共享消息,但每个分区的消息只能由一个消费者处理。
- // 创建一个消费者并设置Group ID
- Properties props = new Properties();
- props.put("bootstrap.servers", "your-kafka-server:9092");
- props.put("group.id", "unique-consumer-group-id");
-
- // 创建 Kafka 消费者
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Kafka会记录每个消费者组消费的偏移量(Offset)。一旦消费者成功处理了消息,就会将偏移量提交给Kafka。当消费者重新启动时,它会从最后提交的偏移量处继续消费消息。
- // 手动提交偏移量
- consumer.commitSync();
Kafka支持自动和手动提交偏移量。自动提交会定期提交偏移量,而手动提交需要在适当的时候手动调用提交方法。手动提交能够更好地控制偏移量的提交时机,避免重复消费。
- // 开启自动提交位移
- props.put("enable.auto.commit", "true");
- // 设置自动提交的时间间隔
- props.put("auto.commit.interval.ms", "1000");
应用程序层面可以保证消息的处理是幂等的,即使消息被重复处理也不会产生副作用。这可以通过唯一标识符或其他手段来识别和避免重复消息的影响。
在分布式消息系统中,保证消息处理的幂等性是至关重要的。幂等性是指无论对同一条消息进行多少次处理,最终结果都是相同的。以下是一些保证消息处理幂等性的方法:
为每条消息分配唯一的标识符(例如消息 ID),并在处理消息时检查该标识符是否已经处理过。可以利用数据库的唯一索引或分布式缓存(如Redis)来记录已经处理过的消息 ID。
- // 假设 msgId 是消息的唯一标识符
- if (!processedMessages.contains(msgId)) {
- // 处理消息的逻辑
- processedMessages.add(msgId);
- }
在处理消息时,使用数据库事务来确保消息的处理操作是原子性的,并且如果相同消息被处理多次,只会产生一次结果变更。
在更新数据库或状态时,使用乐观锁机制确保只有第一个到达的处理请求会成功,后续重复的请求会被拒绝或忽略。
对于每条消息,使用版本号来追踪状态的变化,确保相同的消息不会再次触发相同的状态变更。
实现重试机制来处理消息处理失败的情况。当消息处理失败时,确保能够安全地重试,而不会产生重复的影响。
设计接口时,考虑使其具有幂等性。例如,针对相同的请求多次调用接口不会对系统产生额外的影响,或者对相同请求的多次调用只会产生一次效果。
以上方法中,结合使用适合自身业务场景的机制,可以有效确保消息处理的幂等性。
Kafka本身并不提供内置的消息去重机制,因此需要在消费者端实现消息去重的逻辑。下面是几种常见的去重方法:
在消费消息时,将消费记录存储在数据库或缓存中,并在消费前检查记录,如果已经消费过相同的消息,则不再进行处理。
- // 假设 messageId 是消息的唯一标识符
- if (!consumedMessages.contains(messageId)) {
- // 处理消息的逻辑
- consumedMessages.add(messageId);
- }
对于每条消息,可以利用消息的唯一标识符(例如消息 ID)进行去重,类似于上述的处理方式。
如果消息包含业务键,可以根据业务键来进行去重。将业务键作为索引或键值存储在数据库或缓存中,在处理消息前检查是否存在相同的业务键。
可以设置一个时间窗口,在此时间内的相同消息将被视为重复消息并被丢弃。
Kafka Streams 或 KSQL 可以处理 Kafka 中的消息并进行去重、聚合等操作,可以针对数据流进行去重操作。
以上方法都是在消费者端进行消息去重的常见方式,需要根据业务场景和需求选择合适的方法。
1、消费端程序跟不上/或者上游生产者数据量突增,导致下游kakfa数据堆积,消费不过来
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
解决方法:
- @Bean
- KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory);
- factory.setConcurrency(4);
- factory.setBatchListener(true);
- factory.getContainerProperties().setPollTimeout(3000);
- //当使用手动提交时必须设置ackMode为MANUAL,否则会报错No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
- factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
- return factory;
- }
- spring:
- kafka: # kafka相关配置
- bootstrap-servers: 192.168.101.34:9092,192.168.101.35:9092,192.168.101.36:9092
- consumer:
- auto-offset-reset: latest
- group-id: refiner-tjw
- max-poll-records: 200 #单次拉取消息条数
- properties :
- max:
- poll:
- interval:
- ms: 18000 #单次消息最大处理时间
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。