当前位置:   article > 正文

kafka入门踩坑 连接超时问题_访问kafka broker节点超时

访问kafka broker节点超时

入门案例

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. Map<String, Object> configs = new HashMap<String, Object>();
  3. // 指定初始连接用到的broker地址
  4. configs.put("bootstrap.servers", "192.168.0.103:9092");
  5. // 指定key的序列化类
  6. configs.put("key.serializer", IntegerSerializer.class);
  7. // 指定value的序列化类
  8. configs.put("value.serializer", StringSerializer.class);
  9. // configs.put("acks", "all");
  10. // configs.put("reties", "3");
  11. KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
  12. // 用于设置用户自定义的消息头字段
  13. List<Header> headers = new ArrayList<Header>();
  14. headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));
  15. ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
  16. "topic_1",
  17. 0,
  18. 0,
  19. "hello hqk 0",
  20. headers
  21. );
  22. // 消息的同步确认
  23. final Future<RecordMetadata> future = producer.send(record);
  24. final RecordMetadata metadata = future.get();
  25. System.out.println("消息的主题:" + metadata.topic());
  26. System.out.println("消息的分区号:" + metadata.partition());
  27. System.out.println("消息的偏移量:" + metadata.offset());
  28. // 关闭生产者
  29. producer.close();
  30. }
  1. WARN Client session timed out, have not heard from server in 30000ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
  2. Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
  3. at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:262)
  4. at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:119)
  5. at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1881)
  6. at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:376)
  7. at kafka.admin.TopicCommand$.main(TopicCommand.scala:57)
  8. at kafka.admin.TopicCommand.main(TopicCommand.scala)

查了一些资料可能的原因有:
1.kafka中config目录下的server.properties配置的zookeeper是否错误
2.2181以及9092的端口是否被占用
3.防火墙需要关闭
4.连接zookeeper端口是2181
5.高版本kafka创建主题时的参数不是--zookeeper 而是--bootstrap-server

最后发现都不是

需要在kafka的  /config/service.properties中,最后添加上一句host.name=自己的服务器ip

  1. ############################# Group Coordinator Settings #############################
  2. # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
  3. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
  4. # The default value for this is 3 seconds.
  5. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
  6. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
  7. group.initial.rebalance.delay.ms=0
  8. host.name=192.168.0.103

然后重启kafka 再次连接

  1. [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
  2. acks = 1
  3. batch.size = 16384
  4. bootstrap.servers = [192.168.0.103:9092]
  5. buffer.memory = 33554432
  6. client.id =
  7. compression.type = none
  8. connections.max.idle.ms = 540000
  9. enable.idempotence = false
  10. interceptor.classes = null
  11. key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
  12. linger.ms = 0
  13. max.block.ms = 60000
  14. max.in.flight.requests.per.connection = 5
  15. max.request.size = 1048576
  16. metadata.max.age.ms = 300000
  17. metric.reporters = []
  18. metrics.num.samples = 2
  19. metrics.recording.level = INFO
  20. metrics.sample.window.ms = 30000
  21. partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
  22. receive.buffer.bytes = 32768
  23. reconnect.backoff.max.ms = 1000
  24. reconnect.backoff.ms = 50
  25. request.timeout.ms = 30000
  26. retries = 0
  27. retry.backoff.ms = 100
  28. sasl.jaas.config = null
  29. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  30. sasl.kerberos.min.time.before.relogin = 60000
  31. sasl.kerberos.service.name = null
  32. sasl.kerberos.ticket.renew.jitter = 0.05
  33. sasl.kerberos.ticket.renew.window.factor = 0.8
  34. sasl.mechanism = GSSAPI
  35. security.protocol = PLAINTEXT
  36. send.buffer.bytes = 131072
  37. ssl.cipher.suites = null
  38. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  39. ssl.endpoint.identification.algorithm = null
  40. ssl.key.password = null
  41. ssl.keymanager.algorithm = SunX509
  42. ssl.keystore.location = null
  43. ssl.keystore.password = null
  44. ssl.keystore.type = JKS
  45. ssl.protocol = TLS
  46. ssl.provider = null
  47. ssl.secure.random.implementation = null
  48. ssl.trustmanager.algorithm = PKIX
  49. ssl.truststore.location = null
  50. ssl.truststore.password = null
  51. ssl.truststore.type = JKS
  52. transaction.timeout.ms = 60000
  53. transactional.id = null
  54. value.serializer = class org.apache.kafka.common.serialization.StringSerializer
  55. [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.2
  56. [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 2a121f7b1d402825
  57. 消息的主题:topic_1
  58. 消息的分区号:0
  59. 消息的偏移量:8
  60. [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
  61. Process finished with exit code 0

发送成功

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/983208
推荐阅读
相关标签
  

闽ICP备14008679号