赞
踩
kafka消息丢失及消息的重复消费都需要从生产者, 消费者两个点出发
想要了解这个问题, 需要了解一些前提 :
Kafka消息发送有两种方式:同步(sync)和异步(async)
异步模式下个几个常见参数 :
queue.buffering.max.ms : producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去
queue.buffering.max.messages : producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息
queue.enqueue.timeout.ms : 当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息
batch.num.messages : 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量)
1. 生产者消息丢失的几种情况
- 消息发送为异步模式, 当缓冲区满了, 且阻塞时间设置为0, 消息会立马丢弃掉
- ack设置为0或者1, leader中的数据没有同步到follower中, 且leader宕机, 数据丢失
- 数据发送到分区失败, 且重试次数设置为0
2. 生产者消息重复的情况
- 消息发送到leader中, 数据部分同步到follower中, 此时leader宕机, 其中的一个follower被选为leader,由于生产者没有没有接收到消息发送成功的信号, 尝试消息的重新发送, 导致数据重复
接下来讨论一下消费端数据的丢失及重复消费
首先需要知道 :
消费者向kafka集群中发送offset有两种方式, 自动提交与手动提交
自动提交 : 每隔一段时间, 自动提交依次offset
手动提交 : 通过代码调用, 然后再提交
3. 消费者丢失数据
- 消费者接收到批量数据, 数据处理完一部分, 此时offset自动提交了, 但是剩下的数据处理失败, 没有存入库中, 导致数据丢失
4. 消费者重复消费数据
- 消费者接收到批量数据, 数据处理完一部分, 此时offset还没有提交, 服务器就宕机了
以上的情况都是默认为offset自动提交的
接下来讨论一下如何解决这些问题 :
生产者消息丢失解决 :
1. queue.enqueue.timeout.ms设置为-1, 缓冲队列满了, 再有消息到来, 进行阻塞
2. ack设置为-1
生产者消息重复解决 :
消费者消息的重复消费和丢失从表面上看, 通过将offset从自动提交设置为手动提交可以解决问题, 但是如果出现一种极端的情况, 处理消息的代码全部执行完毕, 接下来要执行提交offset代码之前, 服务器宕机了, 此时由于offset没有提交, 可能会出现重复消费问题,
从上述说明中可以看出, 数据处理与offset提交是非原子性操作, 只有把他们变成原子性操作,利用事务才可以保证数据消费的正确性, 因此可以将offset存储到数据库中, 服务启动时, 从数据库中读取offset值, 并且接下来的数据消费与offset提交到数据库通过事务保证原子性操作即可
在ConsumerRebalanceListener的commitOffset方法中将offset提交到数据库, 在getOffset方法中获取到offset
存储offset值的表中, 至少需要有消费者组, topic, partition, offset这些字段, 才能确定一个offset
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。