当前位置:   article > 正文

kafka如何保证消息不丢失和不重复消费_kafka怎么保证消息不丢失和不重复消费

kafka怎么保证消息不丢失和不重复消费

\Kafka 是一个高吞吐量、分布式的消息系统,它提供了多种机制来保证消息的可靠性,包括消息不丢失和不重复消费。下面详细介绍 Kafka 如何实现这些目标。

1. 消息不丢失

Kafka 提供了多种机制来确保消息不丢失:

1.1. 副本机制

Kafka 的每个分区都有一个 Leader 副本和多个 Follower 副本,通过副本机制来确保数据的高可用性和可靠性。

  • ISR(In-Sync Replicas):只有 ISR 集合中的副本才会被认为是同步的。ISR 集合中的副本与 Leader 副本保持同步,这意味着即使 Leader 副本故障,系统仍然可以从 ISR 集合中选出新的 Leader,保证数据不会丢失。
1.2. 生产者配置

生产者可以通过以下配置来确保消息成功发送到 Kafka 服务器:

  • acks:该参数控制生产者在接收来自服务器的确认之前,必须要有多少个副本收到这条消息。常见配置有:
    • acks=0:生产者不等待任何确认。
    • acks=1:Leader 副本收到消息后发送确认。
    • acks=all(或 acks=-1):所有 ISR 副本收到消息后发送确认。这是最可靠的设置。
Properties props = new Properties();
props.put("acks", "all");
  • 1
  • 2
  • 重试机制:配置生产者的重试次数和重试间隔,确保在临时故障时消息不会丢失。
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
  • 1
  • 2
1.3. Broker 配置

Kafka Broker 也有一些配置项来确保消息的持久化:

  • min.insync.replicas:配置需要的最小同步副本数,确保在生产者设置 acks=all 时,消息必须被写入至少 min.insync.replicas 个副本。
min.insync.replicas=2
  • 1
  • 日志配置:通过配置 log.flush.interval.messageslog.flush.interval.ms 来控制日志的刷新频率,确保数据被及时写入磁盘。
log.flush.interval.messages=10000
log.flush.interval.ms=1000
  • 1
  • 2

2. 消息不重复消费

Kafka 提供了多种机制来避免消息的重复消费:

2.1. 消费者配置
  • enable.auto.commit:关闭自动提交,改为手动提交偏移量。这样,消费者在处理完消息后才提交偏移量,避免因处理失败而重复消费消息。
Properties props = new Properties();
props.put("enable.auto.commit", "false");
  • 1
  • 2
  • 手动提交偏移量:消费者在成功处理消息后,手动提交偏移量。
consumer.commitSync();
  • 1
2.2. 消费者组协调

Kafka 使用消费者组协调机制来确保同一个分区的消息不会被多个消费者重复消费。通过组协调器(Group Coordinator)来管理消费者组的成员关系和分区分配。

2.3. 幂等性和事务
  • 幂等生产者:使用幂等生产者来确保每条消息只会被写入一次。生产者配置 enable.idempotence=true 可以开启幂等性。
props.put("enable.idempotence", "true");
  • 1
  • 事务性生产者:使用事务性生产者来确保一组消息要么全部写入,要么全部失败。适用于需要原子性操作的场景。
props.put("transactional.id", "my-transactional-id");
  • 1
producer.initTransactions();
try {
    producer.beginTransaction();
    // 发送消息
    producer.send(new ProducerRecord<>("topic", "key", "value"));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // 这些异常是致命的,不能继续
    producer.close();
} catch (KafkaException e) {
    // 其他异常可以尝试重试
    producer.abortTransaction();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

总结

通过以上机制,Kafka 能够有效地保证消息不丢失和不重复消费:

  • 消息不丢失:通过副本机制、生产者和 Broker 的配置来确保消息成功写入并持久
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/871414
推荐阅读
相关标签
  

闽ICP备14008679号