当前位置:   article > 正文

kafka手动提交无限重试_kafka重试多次不成功跳过

kafka重试多次不成功跳过

对于一些很强调分区内有序,并且0丢失的场景,比如增量同步数据,(先insert再delete,先delete再insert,结果是不一样的,又或者用了唯一索引,然而delete失败了,消息丢失了,后面的插入唯一性冲突了),解决方案是无限重试,失败了发送信息通知。

  1. @KafkaListener(topics = {"cgbOrderDetail"},groupId = "c2",concurrency ="5",containerFactory = "kafkaListenerContainerFactory")
  2. public void consum(ConsumerRecord<String, String> record, Acknowledgment ack) throws Exception {
  3. while (true){
  4. try {
  5. this.test(record,ack);
  6. break;
  7. }catch (Exception e){
  8. e.printStackTrace();
  9. Thread.sleep(5000);
  10. }
  11. }
  12. }
  13. public void test(ConsumerRecord<String, String> record, Acknowledgment ack) throws Exception {
  14. doSomething();
  15. ack.acknowledge();
  16. }

不过当你一个分区的消费者无限循环的时候,时间超过了max.poll.interval.ms,就会rebalance,然后分配到别的消费者,然后别的消费者也开始死循环,产生雪崩问题

隔了一段时间后

不过还可以更加暴力一些,抛出异常后直接stop,这样可以避免rebalance

后面可以弄个接口重新start

kafka提交时机?

conn.setAutoCommit(false);

.......

kafka.commit();

conn.commit();

上面这种方案可能会丢失消息,因为kafka有可能提交成功,但是响应超时,于是我本地事务回滚,因为该消息已经提交了,所以下次就没了。

应该是先conn.commit()在执行kafka.commit();不过这样有可能消费重复,不过宁可重复也不要丢失。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/125478
推荐阅读
相关标签
  

闽ICP备14008679号