kafka入门踩坑 连接超时问题_访问kafka broker节点超时

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. Map<String, Object> configs = new HashMap<String, Object>();
  3. // 指定初始连接用到的broker地址
  4. configs.put("bootstrap.servers", "");
  5. // 指定key的序列化类
  6. configs.put("key.serializer", IntegerSerializer.class);
  7. // 指定value的序列化类
  8. configs.put("value.serializer", StringSerializer.class);
  9. // configs.put("acks", "all");
  10. // configs.put("reties", "3");
  11. KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
  12. // 用于设置用户自定义的消息头字段
  13. List<Header> headers = new ArrayList<Header>();
  14. headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));
  15. ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
  16. "topic_1",
  17. 0,
  18. 0,
  19. "hello hqk 0",
  20. headers
  21. );
  22. // 消息的同步确认
  23. final Future<RecordMetadata> future = producer.send(record);
  24. final RecordMetadata metadata = future.get();
  25. System.out.println("消息的主题:" + metadata.topic());
  26. System.out.println("消息的分区号:" + metadata.partition());
  27. System.out.println("消息的偏移量:" + metadata.offset());
  28. // 关闭生产者
  29. producer.close();
  30. }
  1. WARN Client session timed out, have not heard from server in 30000ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
  2. Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
  3. at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:262)
  4. at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:119)
  5. at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1881)
  6. at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:376)
  7. at kafka.admin.TopicCommand$.main(TopicCommand.scala:57)
  8. at kafka.admin.TopicCommand.main(TopicCommand.scala)

5.高版本kafka创建主题时的参数不是--zookeeper 而是--bootstrap-server


需要在kafka的  /config/service.properties中,最后添加上一句host.name=自己的服务器ip

  1. ############################# Group Coordinator Settings #############################
  2. # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
  3. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
  4. # The default value for this is 3 seconds.
  5. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
  6. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
  7. group.initial.rebalance.delay.ms=0
  8. host.name=

然后重启kafka 再次连接

  61. Process finished with exit code 0


