当前位置:   article > 正文

kafka(4) 消息丢失及重复消费_queue.buffering.max.ms

queue.buffering.max.ms

kafka消息丢失及消息的重复消费都需要从生产者, 消费者两个点出发

想要了解这个问题, 需要了解一些前提 : 

Kafka消息发送有两种方式:同步(sync)和异步(async)

异步模式下个几个常见参数 : 

queue.buffering.max.ms : producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去

queue.buffering.max.messages : producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息

queue.enqueue.timeout.ms : 当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息

batch.num.messages : 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量)

1. 生产者消息丢失的几种情况

  1. 消息发送为异步模式, 当缓冲区满了, 且阻塞时间设置为0, 消息会立马丢弃掉
  2. ack设置为0或者1, leader中的数据没有同步到follower中, 且leader宕机, 数据丢失
  3. 数据发送到分区失败, 且重试次数设置为0

2. 生产者消息重复的情况

  1. 消息发送到leader中, 数据部分同步到follower中, 此时leader宕机, 其中的一个follower被选为leader,由于生产者没有没有接收到消息发送成功的信号, 尝试消息的重新发送, 导致数据重复

 接下来讨论一下消费端数据的丢失及重复消费

首先需要知道 : 

消费者向kafka集群中发送offset有两种方式, 自动提交与手动提交

自动提交 : 每隔一段时间, 自动提交依次offset

手动提交 : 通过代码调用, 然后再提交

3. 消费者丢失数据

  1. 消费者接收到批量数据, 数据处理完一部分, 此时offset自动提交了, 但是剩下的数据处理失败, 没有存入库中, 导致数据丢失

4. 消费者重复消费数据

  1.  消费者接收到批量数据, 数据处理完一部分, 此时offset还没有提交, 服务器就宕机了

以上的情况都是默认为offset自动提交的

接下来讨论一下如何解决这些问题 : 

生产者消息丢失解决 : 

1. queue.enqueue.timeout.ms设置为-1, 缓冲队列满了, 再有消息到来, 进行阻塞

2. ack设置为-1

生产者消息重复解决 : 

消费者消息的重复消费和丢失从表面上看, 通过将offset从自动提交设置为手动提交可以解决问题, 但是如果出现一种极端的情况, 处理消息的代码全部执行完毕, 接下来要执行提交offset代码之前, 服务器宕机了, 此时由于offset没有提交, 可能会出现重复消费问题,

从上述说明中可以看出, 数据处理与offset提交是非原子性操作, 只有把他们变成原子性操作,利用事务才可以保证数据消费的正确性, 因此可以将offset存储到数据库中, 服务启动时, 从数据库中读取offset值, 并且接下来的数据消费与offset提交到数据库通过事务保证原子性操作即可

在ConsumerRebalanceListener的commitOffset方法中将offset提交到数据库, 在getOffset方法中获取到offset

存储offset值的表中, 至少需要有消费者组, topic, partition, offset这些字段, 才能确定一个offset

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/451971
推荐阅读
相关标签
  

闽ICP备14008679号