当前位置:   article > 正文

kafka消费失败多次重试_kafka 消费异常maxretrytimes

kafka 消费异常maxretrytimes
  1. Thread consumerThread = new Thread(new Runnable() {
  2. public void run() {
  3. MessageAndMetadata<byte[], byte[]> messageAndMetadata = null;
  4. try {
  5. while (consumer.hasNext() && !Thread.interrupted()) {
  6. messageAndMetadata = consumer.getData();
  7. try {
  8. messageHandler.onMsg(new String(messageAndMetadata.message(), "UTF-8"));
  9. } catch (Exception ee) {
  10. long retries = messageHandler.getErrorMaxRetries();
  11. // range from 1 ~ DEFAULT_MAX_RETRIES do as it, other retry DEFAULT_MAX_RETRIES times
  12. if (retries != -1) {
  13. if (retries<=0 || retries>DEFAULT_MAX_RETRIES) {
  14. retries = DEFAULT_MAX_RETRIES;
  15. }
  16. for (int i=0; i<retries; i++) {
  17. LOG.info(String.format("[USING RETRY] %s times, for consuming topic:%s, sleeping %s seconds", (i+1), topic, RETRY_INTERVAL_ARR[i]));
  18. try {
  19. Thread.sleep(RETRY_INTERVAL_ARR[i]*1000);
  20. messageHandler.onMsg(new String(messageAndMetadata.message(), "UTF-8"));
  21. } catch (Exception e1) {
  22. LOG.error(e1.getMessage(), e1);
  23. }
  24. }
  25. } else {
  26. LOG.error(String.format("[NO RETRY] Processing msg:[%s] met error, please check it.", new String(messageAndMetadata.message(), "UTF-8")), ee);
  27. }
  28. }
  29. messageHandler.increment();
  30. // should commit offset or not
  31. if (messageHandler.shouldCommit()) {
  32. consumer.commitOffsets();
  33. messageHandler.setLastTimeCommit(System
  34. .currentTimeMillis());
  35. if (LOG.isInfoEnabled()) {
  36. LOG.info("Successfully commitOffsets, topic:"
  37. + messageAndMetadata.topic()
  38. +" partition: "
  39. +messageAndMetadata.partition()
  40. + " offset: "
  41. + messageAndMetadata.offset());
  42. }
  43. }
  44. }
  45. } catch (Exception e) {
  46. try {
  47. if (messageAndMetadata != null) {
  48. LOG.error(Joiner.on("").join("topic:", messageAndMetadata.topic(),
  49. " partition:", messageAndMetadata.partition(),
  50. " msgContent:", new String(messageAndMetadata.message(), "UTF-8")), e);
  51. }
  52. } catch (UnsupportedEncodingException e1) {
  53. LOG.error(e1.getMessage(), e1);
  54. }
  55. } finally {
  56. LOG.info(String.format("DafkaConsumer thread for topic:%s will give up, try me later...", topic));
  57. if (consumer!=null) {
  58. consumer.shutdown();
  59. }
  60. }
  61. }
  62. });
  63. consumerThread.setName("DafkaConsumerThread-"
  64. + properties.getProperty("topic"));
  65. consumerThread.start();

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/125489?site
推荐阅读
相关标签
  

闽ICP备14008679号