赞
踩
近来,老有朋友询问Kafka SASL配置 & Demo测试
这篇文章的相关内容。近来无事,准备将所以的组件打成镜像。此处,便讲的为 Kafka SASL/PLAIN
权限验证镜像模块的构建。
前置熟悉文章:
Kafka SASL配置 & Demo测试
Kafka常用命令(带SASL权限版)
由于Docker化的Kafka存在一定的缺陷和问题。第一次尝试的目的是在Docker内部将整个认证流程打通。操作细节概不细叙,按照上面的文章。配置ZK、配置Kafka、创建Topic、设置权限即可。
### 使用镜像
docker pull yanxml/kafka-acls:0.0.2
docker run yanxml/kafka-acls:0.0.2 /bin/bash
# 启动zk
source /etc/profile
/opt/apps/zookeeper/bin/start.sh
# 启动一个节点(暂时未完工 先做一个节点)
/opt/apps/kafka/kafka1/app/bin/kafka-server-start.sh /opt/apps/kafka/kafka1/app/config/server.properties
### 权限操作(详细见上面2篇文章)
### 产生日志
[root@9f24cf035dd9 bin]# ./kafka-console-producer.sh --broker-list localhost:9093 --topic sean-security --producer.config ../config/consumer.properties
### 消费
[root@9f24cf035dd9 bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic sean-security --consumer.config ../config/consumer.properties --new-consumer
设置对外映射的监听端口,更新镜像。打通测试用例。
主要设置的映射配置:
listeners=SASL_PLAINTEXT://:29092
advertised.linsteners=SASL_PLAINTEXT://192.168.1.16:29092
上述问题,我已经打成docker镜像,你们去kafka-aclas,即可获得。
### 使用镜像 docker run -d -p 2181:2181 -p 2182:2182 -p 2183:2183 -p 29092:29092 -p 29093:29093 -p 29094:29094 --env advertised_host_name=192.168.1.16 yanxml/kafka-acls:0.0.8 /opt/apps/bin/start.sh <注意 advertised_host_name 要改成本地ip 并且不要使用localhost 和 127.0.0.1 > ### consumer & provider docker exec -it <container-id> /bin/bash source /etc/profile cd /opt/apps/kafka/kafka1/app/bin ### consumer ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic sean-security --consumer.config ../config/consumer.properties --from-beginning ### provider ./kafka-console-producer.sh --broker-list localhost:29092 --topic sean-security --producer.config ../config/producer.properties ### create Topic & acls ./kafka-topics.sh --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.3:2183 --describe --topic test ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --replication-factor 1 --partitions 1 --topic sean-security ./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --add --allow-principal User:alice --operation Read --operation Write --topic sean-security ./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --list --topic sean-security
除了上述进入docker镜像使用kafka-console-producer.sh
脚本使用,还可以在外部通过api使用。镜像内内置创建了一个sean-security
的Topic,可以直接使用。Demos地址还是这个git地址https://github.com/SeanYanxml/bigdata。
运行后预期结果如下:
# provider 16:32:16,663 INFO ProducerConfig:223 - ProducerConfig values: acks = all batch.size = 16384 bootstrap.servers = [192.168.1.16:29092] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 1 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = PLAIN security.protocol = SASL_PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 16:32:17,037 INFO AbstractLogin:53 - Successfully logged in. 16:32:17,094 INFO AppInfoParser:109 - Kafka version : 1.0.0 16:32:17,095 INFO AppInfoParser:110 - Kafka commitId : aaa7af6d4a11b29d ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:17 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:18 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:19 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:20 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:21 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:22 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:23 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:24 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:25 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:26 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:27 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:28 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:29 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:30 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:31 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:32 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:33 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:34 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:35 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:36 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:37 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:38 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:39 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:40 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:41 CST 2018 Data: Hello ClusterInfo: 192.168.1.16:29092 Topic: sean-securityTime:Thu Nov 22 16:33:42 CST 2018 Data: Hello #consumer 16:32:19,979 INFO ConsumerConfig:223 - ConsumerConfig values: auto.commit.interval.ms = 1000 auto.offset.reset = latest bootstrap.servers = [192.168.1.16:29092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = test heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = PLAIN security.protocol = SASL_PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 16:32:20,196 INFO AbstractLogin:53 - Successfully logged in. 16:32:20,285 INFO AppInfoParser:109 - Kafka version : 1.0.0 16:32:20,285 INFO AppInfoParser:110 - Kafka commitId : aaa7af6d4a11b29d 16:32:50,582 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Discovered coordinator 192.168.1.16:29092 (id: 2147483647 rack: null) 16:33:20,594 INFO ConsumerCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Revoking previously assigned partitions [] 16:33:20,594 INFO AbstractCoordinator:336 - [Consumer clientId=consumer-1, groupId=test] (Re-)joining group 16:33:20,731 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Successfully joined group with generation 1 16:33:20,733 INFO ConsumerCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Setting newly assigned partitions [sean-security-0] 16:33:50,769 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Marking the coordinator 192.168.1.16:29092 (id: 2147483647 rack: null) dead 16:33:50,811 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Discovered coordinator 192.168.1.16:29092 (id: 2147483647 rack: null) 16:33:50,812 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Marking the coordinator 192.168.1.16:29092 (id: 2147483647 rack: null) dead 16:33:50,926 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Discovered coordinator 192.168.1.16:29092 (id: 2147483647 rack: null) 16:34:20,971 ERROR ConsumerCoordinator:301 - [Consumer clientId=consumer-1, groupId=test] Offset commit failed on partition sean-security-0 at offset 46: The coordinator is not aware of this member. 16:34:20,972 WARN ConsumerCoordinator:246 - [Consumer clientId=consumer-1, groupId=test] Asynchronous auto-commit of offsets {sean-security-0=OffsetAndMetadata{offset=46, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 16:34:20,973 WARN ConsumerCoordinator:246 - [Consumer clientId=consumer-1, groupId=test] Synchronous auto-commit of offsets {sean-security-0=OffsetAndMetadata{offset=46, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 16:34:20,975 INFO ConsumerCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Revoking previously assigned partitions [sean-security-0] 16:34:20,976 INFO AbstractCoordinator:336 - [Consumer clientId=consumer-1, groupId=test] (Re-)joining group 16:34:20,988 INFO AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Successfully joined group with generation 3 16:34:20,989 INFO ConsumerCoordinator:341 - [Consumer clientId=consumer-1, groupId=test] Setting newly assigned partitions [sean-security-0] offset = 76, key = null, value = Hello offset = 77, key = null, value = Hello offset = 78, key = null, value = Hello offset = 79, key = null, value = Hello PS: 前面会出现一段时间链接不上的情况,我不知道是因为卡顿导致还是其它原因。不过,在生产环境,还是不建议使用docker来处理这个量级的数据。因为很可能就会卡死。需要后期的优化和网络的优化等等。不过本地的调试还是可以的。
[root@899ffd853b39 bin]# ./kafka-console-producer.sh --broker-list localhost:29092 --topic sean-security --producer.config ../config/producer.properties >ll >pp >oo >[2018-11-22 08:18:37,328] ERROR Error when sending message to topic sean-security with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for sean-security-0: 10020 ms has passed since batch creation plus linger time [2018-11-22 08:18:37,351] ERROR Error when sending message to topic sean-security with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for sean-security-0: 10020 ms has passed since batch creation plus linger time [2018-11-22 08:18:37,355] ERROR Error when sending message to topic sean-security with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for sean-security-0: 10020 ms has passed since batch creation plus linger time [root@899ffd853b39 bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic sean-security --consumer.config ../config/consumer.properties --from-beginning ddasd dada dasd adda da adadasd dasda dasda adasd dasda dasd dasd [2018-11-22 08:23:11,471] ERROR [Consumer clientId=consumer-1, groupId=console-consumer-80036] Offset commit failed on partition sean-security-0 at offset 0: The coordinator is not aware of this member. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-11-22 08:23:11,476] WARN [Consumer clientId=consumer-1, groupId=console-consumer-80036] Asynchronous auto-commit of offsets {sean-security-0=OffsetAndMetadata{offset=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-11-22 08:23:11,477] WARN [Consumer clientId=consumer-1, groupId=console-consumer-80036] Synchronous auto-commit of offsets {sean-security-0=OffsetAndMetadata{offset=12, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) ddasd
kafka_server_jaas.conf 与 kafka_client_jaas.conf 分开写。详见kafka-topic-authorization-failed 和 kafka常见报错及解决方法(topic类、生产消费类、启动类)
# 表现为broker链接不上 [2018-11-21 04:58:55,550] WARN [Controller-0-to-broker-0-send-thread]: Controller 0's connection to broker 192.168.1.16:29092 (id: 0 rack: null) was unsuccessful (kafka.controller.RequestSendThread) java.io.IOException: Connection to 192.168.1.16:29092 (id: 0 rack: null) failed. at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:68) at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:269) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:223) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) [2018-11-21 04:58:55,550] WARN [Controller-0-to-broker-0-send-thread]: Controller 0's connection to broker 192.168.1.16:29092 (id: 0 rack: null) was unsuccessful (kafka.controller.RequestSendThread) java.io.IOException: Connection to 192.168.1.16:29092 (id: 0 rack: null) failed. at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:68) at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:269) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:223) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
同上
[2018-11-21 10:25:06,236] FATAL [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.KafkaException: Failed to acquire lock on file .lock in /opt/apps/kafka/kafka1/data. A Kafka instance in another process or thread is using this directory. at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:203) at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:199) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at kafka.log.LogManager.lockLogDirs(LogManager.scala:199) at kafka.log.LogManager.<init>(LogManager.scala:85) at kafka.log.LogManager$.apply(LogManager.scala:799) at kafka.server.KafkaServer.startup(KafkaServer.scala:222) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38) at kafka.Kafka$.main(Kafka.scala:92) at kafka.Kafka.main(Kafka.scala)
[2018-11-21 05:14:34,098] FATAL (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are PLAINTEXT
at scala.Predef$.require(Predef.scala:224)
at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1194)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
[2018-11-22 05:56:40,693] FATAL (kafka.Kafka$) java.lang.IllegalArgumentException: requirement failed: Each listener must have a different name, listeners: SASL_PLAINTEXT://:9092,SASL_PLAINTEXT://:29092 at scala.Predef$.require(Predef.scala:224) at kafka.utils.CoreUtils$.validate$1(CoreUtils.scala:270) at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:280) at kafka.server.KafkaConfig$$anonfun$getListeners$1.apply(KafkaConfig.scala:1120) at kafka.server.KafkaConfig$$anonfun$getListeners$1.apply(KafkaConfig.scala:1119) at scala.Option.map(Option.scala:146) at kafka.server.KafkaConfig.getListeners(KafkaConfig.scala:1119) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1089) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878) at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28) at kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala)
15:02:21,358 INFO VerifiableProperties:72 - Verifying properties 15:02:21,403 INFO VerifiableProperties:72 - Property metadata.broker.list is overridden to 127.0.0.1:9092 15:02:21,406 INFO VerifiableProperties:72 - Property serializer.class is overridden to kafka.serializer.StringEncoder 15:02:21,406 WARN VerifiableProperties:87 - Property zookeeper.connect is not valid 15:02:21,629 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 0 for 1 topic(s) Set(test3) 15:02:21,730 INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing 15:02:21,805 INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092 15:02:21,812 WARN BrokerPartitionInfo:87 - Error while fetching metadata [{TopicMetadata for topic test3 -> No partition metadata for topic test3 due to org.apache.kafka.common.errors.LeaderNotAvailableException}] for topic [test3]: class org.apache.kafka.common.errors.LeaderNotAvailableException 15:02:21,816 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 1 for 1 topic(s) Set(test3) 15:02:21,817 INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing 15:02:21,853 INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092 15:02:21,854 WARN BrokerPartitionInfo:87 - Error while fetching metadata [{TopicMetadata for topic test3 -> No partition metadata for topic test3 due to org.apache.kafka.common.errors.LeaderNotAvailableException}] for topic [test3]: class org.apache.kafka.common.errors.LeaderNotAvailableException 15:02:21,855 ERROR DefaultEventHandler:101 - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test3 15:02:21,856 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 3 15:02:21,958 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 2 for 1 topic(s) Set(test3) 15:02:21,959 INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing 15:02:21,973 INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092 15:02:52,042 INFO SyncProducer:72 - Connected to c127a1b1a3cd:9092 for producing 15:02:52,043 INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092 15:02:52,049 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 4 to broker 0 with data for partitions test3-0 java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer.send(SyncProducer.scala:108) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31) 15:02:52,052 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 2 15:02:52,156 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 5 for 1 topic(s) Set(test3) 15:02:52,156 INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing 15:02:52,166 INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092 15:02:52,167 INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092 15:02:52,169 INFO SyncProducer:72 - Connected to c127a1b1a3cd:9092 for producing 15:02:52,169 INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092 15:02:52,170 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 7 to broker 0 with data for partitions test3-0 java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer.send(SyncProducer.scala:108) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31) 15:02:52,171 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 1 15:02:52,273 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 8 for 1 topic(s) Set(test3) 15:02:52,273 INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing 15:02:52,287 INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092 15:02:52,287 INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092 15:02:52,288 INFO SyncProducer:72 - Connected to c127a1b1a3cd:9092 for producing 15:02:52,289 INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092 15:02:52,289 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 10 to broker 0 with data for partitions test3-0 java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer.send(SyncProducer.scala:108) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31) 15:02:52,290 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 0 15:02:52,394 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,127.0.0.1,9092) with correlation id 11 for 1 topic(s) Set(test3) 15:02:52,395 INFO SyncProducer:72 - Connected to 127.0.0.1:9092 for producing 15:02:52,403 INFO SyncProducer:72 - Disconnecting from 127.0.0.1:9092 15:02:52,404 INFO SyncProducer:72 - Disconnecting from c127a1b1a3cd:9092 15:02:52,405 ERROR DefaultEventHandler:101 - Failed to send requests for topics test3 with correlation ids in [0,11] Exception in thread "Thread-0" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:98) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
14:05:12,770 INFO VerifiableProperties:72 - Verifying properties 14:05:12,824 INFO VerifiableProperties:72 - Property metadata.broker.list is overridden to 192.168.1.16:9092,192.168.1.16:9093,192.168.1.16:9094 14:05:12,830 INFO VerifiableProperties:72 - Property serializer.class is overridden to kafka.serializer.StringEncoder 14:05:12,832 WARN VerifiableProperties:87 - Property zookeeper.connect is not valid 14:05:13,103 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,192.168.1.16,9092) with correlation id 0 for 1 topic(s) Set(test) 14:05:13,204 INFO SyncProducer:72 - Connected to 192.168.1.16:9092 for producing 14:05:13,251 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9092 14:05:43,343 INFO SyncProducer:72 - Connected to 54efcdb1689f:9093 for producing 14:05:43,344 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093 14:05:43,351 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 2 to broker 1 with data for partitions test-0 java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer.send(SyncProducer.scala:108) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31) 14:05:43,354 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 3 14:05:43,463 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(2,192.168.1.16,9094) with correlation id 3 for 1 topic(s) Set(test) 14:05:43,464 INFO SyncProducer:72 - Connected to 192.168.1.16:9094 for producing 14:05:43,467 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9094 14:05:43,469 WARN ClientUtils$:93 - Fetching topic metadata with correlation id 3 for topics [Set(test)] from broker [BrokerEndPoint(2,192.168.1.16,9094)] failed java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124) at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:83) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:86) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:82) at kafka.utils.Logging$class.swallowError(Logging.scala:110) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:86) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31) 14:05:43,470 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9094 14:05:43,474 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(1,192.168.1.16,9093) with correlation id 3 for 1 topic(s) Set(test) 14:05:43,474 INFO SyncProducer:72 - Connected to 192.168.1.16:9093 for producing 14:05:43,501 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9093 14:05:43,501 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093 14:05:43,503 INFO SyncProducer:72 - Connected to 54efcdb1689f:9093 for producing 14:05:43,503 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093 14:05:43,504 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 5 to broker 1 with data for partitions test-0 java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer.send(SyncProducer.scala:108) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31) 14:05:43,505 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 2 14:05:43,609 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(2,192.168.1.16,9094) with correlation id 6 for 1 topic(s) Set(test) 14:05:43,611 INFO SyncProducer:72 - Connected to 192.168.1.16:9094 for producing 14:05:43,617 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9094 14:05:43,617 WARN ClientUtils$:93 - Fetching topic metadata with correlation id 6 for topics [Set(test)] from broker [BrokerEndPoint(2,192.168.1.16,9094)] failed java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124) at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:83) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:86) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:82) at kafka.utils.Logging$class.swallowError(Logging.scala:110) at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:86) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31) 14:05:43,618 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9094 14:05:43,618 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,192.168.1.16,9092) with correlation id 6 for 1 topic(s) Set(test) 14:05:43,619 INFO SyncProducer:72 - Connected to 192.168.1.16:9092 for producing 14:05:43,635 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9092 14:05:43,636 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093 14:05:43,638 INFO SyncProducer:72 - Connected to 54efcdb1689f:9093 for producing 14:05:43,638 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093 14:05:43,639 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 8 to broker 1 with data for partitions test-2 java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer.send(SyncProducer.scala:108) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31) 14:05:43,640 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 1 14:05:43,743 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(0,192.168.1.16,9092) with correlation id 9 for 1 topic(s) Set(test) 14:05:43,744 INFO SyncProducer:72 - Connected to 192.168.1.16:9092 for producing 14:05:43,763 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9092 14:05:43,763 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093 14:05:43,764 INFO SyncProducer:72 - Connected to 54efcdb1689f:9093 for producing 14:05:43,765 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093 14:05:43,766 WARN DefaultEventHandler:93 - Failed to send producer request with correlation id 11 to broker 1 with data for partitions test-2 java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.producer.SyncProducer.send(SyncProducer.scala:108) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:274) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:115) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:107) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:80) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31) 14:05:43,767 INFO DefaultEventHandler:72 - Back off for 100 ms before retrying send. Remaining retries = 0 14:05:43,870 INFO ClientUtils$:72 - Fetching metadata from broker BrokerEndPoint(1,192.168.1.16,9093) with correlation id 12 for 1 topic(s) Set(test) 14:05:43,870 INFO SyncProducer:72 - Connected to 192.168.1.16:9093 for producing 14:05:43,877 INFO SyncProducer:72 - Disconnecting from 192.168.1.16:9093 14:05:43,877 INFO SyncProducer:72 - Disconnecting from 54efcdb1689f:9093 14:05:43,878 ERROR DefaultEventHandler:101 - Failed to send requests for topics test with correlation ids in [0,12] Exception in thread "Thread-0" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:98) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at com.yanxml.kafka.quickstart.KafkaOldProducer.run(KafkaOldProducer.java:31)
完成的基础配置 #advertised.host.name=192.168.1.16 #advertised.port=29092 advertised.listeners=SASL_PLAINTEXT://192.168.1.16:29092 listeners=SASL_PLAINTEXT://:29092 #listeners=SASL_PLAINTEXT://192.168.1.16:29092 #listeners=SASL_PLAINTEXT://ip(127.0.0.1):port(9092) security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer # default false | true to accept all the users to use it. allow.everyone.if.no.acl.found=true super.users=User:admin;User:alice
[1] kafka SASL验证
[2] Authentication using SASL/PLAIN
[3] Kafka 0.10.0 SASL/PLAIN身份认证及权限实现
[4] Unable to access kafka from outside Docker
[5] Kafka安全认证SASL下附带工具的配置使用
[6] Kafka SASL zookeeper authentication
[7] Kafka JAAS 安全认证流程
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。