当前位置:   article > 正文

Kafka实践中遇到的问题思考_not present in metadata after

not present in metadata after



一、Producer发送消息时报错 :Topic {{topic_name}} not present in metadata after 60000 ms


  1. ListenableFuture<SendResult<String, String>> future = kafkaTemplate.
  2. send(TOPIC,String.valueOf(msg), msgValue);
  3. future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
  4. @Override
  5. public void onSuccess(SendResult<String, String> sendResultMap) {
  6. log.info("send message success,res is {}", sendResultMap);
  7. }
  8. @Override
  9. public void onFailure(Throwable throwable) {
  10. log.error(" send message fail", throwable);
  11. }
  12. });

有的时候会如标题的错误,但是也不是每条都报错。查询资料得知,多可用区的Kafka实例,在某个可用区故障后,Kafka客户端在生产或消费消息时,可能会报Topic {{topic_name}} not present in metadata after 60000 ms的错误。



  • 升级Kafka客户端的版本到2.7或以上版本。
  • 修改Kafka客户端的“request.timeout.ms”大于“127s”。
  • 修改Kafka客户端Linux系统的网络参数“net.ipv4.tcp_syn_retries”为“3”。关于第三点我认为Kafka设置配置的时候可以指定retry次数应该也可以。



public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {}




  1. public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  2. // intercept the record, which can be potentially modified; this method does not throw exceptions
  3. ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
  4. return doSend(interceptedRecord, callback);
  5. }
  6. /**
  7. * Implementation of asynchronously send a record to a topic.
  8. */
  9. private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
  10. TopicPartition tp = null;
  11. try {
  12. throwIfProducerClosed();
  13. // first make sure the metadata for the topic is available
  14. ClusterAndWaitTime clusterAndWaitTime;
  15. try {
  16. clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); //出现问题的地方
  17. } catch (KafkaException e) {
  18. if (metadata.isClosed())
  19. throw new KafkaException("Producer closed while send in progress", e);
  20. throw e;
  21. }
  22. ...
  23. } catch (ApiException e) {
  24. ...
  25. }
  26. }
  27. //这里会导致阻塞
  28. private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
  29. // add topic to metadata topic list if it is not there already and reset expiry
  30. Cluster cluster = metadata.fetch();
  31. if (cluster.invalidTopics().contains(topic))
  32. throw new InvalidTopicException(topic);
  33. metadata.add(topic);
  34. Integer partitionsCount = cluster.partitionCountForTopic(topic);
  35. // Return cached metadata if we have it, and if the record's partition is either undefined
  36. // or within the known partition range
  37. if (partitionsCount != null && (partition == null || partition < partitionsCount))
  38. return new ClusterAndWaitTime(cluster, 0);
  39. long begin = time.milliseconds();
  40. long remainingWaitMs = maxWaitMs;
  41. long elapsed;
  42. //一直获取topic的元数据信息,直到获取成功,若获取时间超过maxWaitMs,则抛出异常
  43. do {
  44. if (partition != null) {
  45. log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
  46. } else {
  47. log.trace("Requesting metadata update for topic {}.", topic);
  48. }
  49. metadata.add(topic);
  50. int version = metadata.requestUpdate();
  51. sender.wakeup();
  52. try {
  53. metadata.awaitUpdate(version, remainingWaitMs);
  54. } catch (TimeoutException ex) {
  55. // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
  56. throw new TimeoutException(
  57. String.format("Topic %s not present in metadata after %d ms.",
  58. topic, maxWaitMs));
  59. }
  60. cluster = metadata.fetch();
  61. elapsed = time.milliseconds() - begin;
  62. if (elapsed >= maxWaitMs) { //判断执行时间是否大于maxWaitMs
  63. throw new TimeoutException(partitionsCount == null ?
  64. String.format("Topic %s not present in metadata after %d ms.",
  65. topic, maxWaitMs) :
  66. String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
  67. partition, topic, partitionsCount, maxWaitMs));
  68. }
  69. metadata.maybeThrowException();
  70. remainingWaitMs = maxWaitMs - elapsed;
  71. partitionsCount = cluster.partitionCountForTopic(topic);
  72. } while (partitionsCount == null || (partition != null && partition >= partitionsCount));
  73. return new ClusterAndWaitTime(cluster, elapsed);
  74. }


通过KafkaProducer 执行send的过程中需要先获取Metadata,而这是一个不断循环的操作,直到获取成功,或者抛出异常。





在新版的 Kafka Producer 中,设计了一个消息缓冲池,客户端发送的消息都会被存储到缓冲池中,同时 Producer 启动后还会开启一个 Sender 线程,不断地从缓冲池获取消息并将其发送到 Broker,如下图所示:

因此在新版的 Kafka Producer 中废弃掉异步发送的方法了,仅保留了一个 send 方法,同时返回一个 Futrue 对象,需要同步等待发送结果,就使用 Futrue#get 方法阻塞获取发送结果。而我在项目中直接调用 send 方法,为何还会发送阻塞呢?

我们在构建 Kafka Producer 时,会有一个自定义缓冲池大小的参数 buffer.memory,默认大小为 32M,因此缓冲池的大小是有限制的,我们不妨想一下,缓冲池内存资源耗尽了会怎么样?

Kafka 源码的注释是非常详细的,RecordAccumulator 类是 Kafka Producer 缓冲池的核心类,而 RecordAccumulator 类就有那么一段注释:

The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.



由于性能监控项目每分钟需要发送几百万条消息,只要 Kafka 集群负载很高或者网络稍有波动,Sender 线程从缓冲池捞取消息的速度赶不上客户端发送的速度,就会造成客户端发送被阻塞。






4、生产或消费消息时,报Topic {{topic_name}} not present in metadata after 60000 ms错误_分布式消息服务Kafka版_故障排除_华为云

