赞
踩
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- Map<String, Object> configs = new HashMap<String, Object>();
- // 指定初始连接用到的broker地址
- configs.put("bootstrap.servers", "192.168.0.103:9092");
- // 指定key的序列化类
- configs.put("key.serializer", IntegerSerializer.class);
- // 指定value的序列化类
- configs.put("value.serializer", StringSerializer.class);
-
- // configs.put("acks", "all");
- // configs.put("reties", "3");
-
- KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
-
- // 用于设置用户自定义的消息头字段
- List<Header> headers = new ArrayList<Header>();
- headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));
-
- ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
- "topic_1",
- 0,
- 0,
- "hello hqk 0",
- headers
- );
-
- // 消息的同步确认
- final Future<RecordMetadata> future = producer.send(record);
- final RecordMetadata metadata = future.get();
- System.out.println("消息的主题:" + metadata.topic());
- System.out.println("消息的分区号:" + metadata.partition());
- System.out.println("消息的偏移量:" + metadata.offset());
-
-
- // 关闭生产者
- producer.close();
- }
- WARN Client session timed out, have not heard from server in 30000ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
- Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
- at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:262)
- at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:119)
- at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1881)
- at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:376)
- at kafka.admin.TopicCommand$.main(TopicCommand.scala:57)
- at kafka.admin.TopicCommand.main(TopicCommand.scala)
-
查了一些资料可能的原因有:
1.kafka中config目录下的server.properties配置的zookeeper是否错误
2.2181以及9092的端口是否被占用
3.防火墙需要关闭
4.连接zookeeper端口是2181
5.高版本kafka创建主题时的参数不是--zookeeper 而是--bootstrap-server
最后发现都不是
需要在kafka的 /config/service.properties中,最后添加上一句host.name=自己的服务器ip
- ############################# Group Coordinator Settings #############################
-
- # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
- # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
- # The default value for this is 3 seconds.
- # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
- # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
- group.initial.rebalance.delay.ms=0
-
- host.name=192.168.0.103
然后重启kafka 再次连接
- [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
- acks = 1
- batch.size = 16384
- bootstrap.servers = [192.168.0.103:9092]
- buffer.memory = 33554432
- client.id =
- compression.type = none
- connections.max.idle.ms = 540000
- enable.idempotence = false
- interceptor.classes = null
- key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
- linger.ms = 0
- max.block.ms = 60000
- max.in.flight.requests.per.connection = 5
- max.request.size = 1048576
- metadata.max.age.ms = 300000
- metric.reporters = []
- metrics.num.samples = 2
- metrics.recording.level = INFO
- metrics.sample.window.ms = 30000
- partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
- receive.buffer.bytes = 32768
- reconnect.backoff.max.ms = 1000
- reconnect.backoff.ms = 50
- request.timeout.ms = 30000
- retries = 0
- retry.backoff.ms = 100
- sasl.jaas.config = null
- sasl.kerberos.kinit.cmd = /usr/bin/kinit
- sasl.kerberos.min.time.before.relogin = 60000
- sasl.kerberos.service.name = null
- sasl.kerberos.ticket.renew.jitter = 0.05
- sasl.kerberos.ticket.renew.window.factor = 0.8
- sasl.mechanism = GSSAPI
- security.protocol = PLAINTEXT
- send.buffer.bytes = 131072
- ssl.cipher.suites = null
- ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
- ssl.endpoint.identification.algorithm = null
- ssl.key.password = null
- ssl.keymanager.algorithm = SunX509
- ssl.keystore.location = null
- ssl.keystore.password = null
- ssl.keystore.type = JKS
- ssl.protocol = TLS
- ssl.provider = null
- ssl.secure.random.implementation = null
- ssl.trustmanager.algorithm = PKIX
- ssl.truststore.location = null
- ssl.truststore.password = null
- ssl.truststore.type = JKS
- transaction.timeout.ms = 60000
- transactional.id = null
- value.serializer = class org.apache.kafka.common.serialization.StringSerializer
-
- [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.2
- [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 2a121f7b1d402825
- 消息的主题:topic_1
- 消息的分区号:0
- 消息的偏移量:8
- [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
-
- Process finished with exit code 0
发送成功
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。