当前位置:   article > 正文

flink连接kafka报:org.apache.kafka.common.errors.TimeoutException_receiver class org.apache.flink.runtime.rpc.akka.a

receiver class org.apache.flink.runtime.rpc.akka.akkarpcsystemloader does no

测试flink1.12.7 连接kafka:

  1. package org.test.flink;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  6. import java.util.Properties;
  7. public class FlinkKafka {
  8. public static void main(String[] args) throws Exception {
  9. // 1. 执行环境-env
  10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. env.setParallelism(1);
  12. // 2. 数据源-source
  13. // 2-1. 创建消费Kafka数据时属性
  14. Properties props = new Properties();
  15. props.setProperty("bootstrap.servers", "11.0.24.216:9092");
  16. props.setProperty("group.id", "test1111");
  17. System.out.println("--2-2. 构建FlinkKafkaConsumer实例对象--");
  18. // 2-2. 构建FlinkKafkaConsumer实例对象
  19. FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(
  20. "testtopic", //
  21. new SimpleStringSchema(), //
  22. props
  23. );
  24. System.out.println("--2-3. 添加Source--");
  25. // 2-3. 添加Source
  26. DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
  27. // 3. 数据转换-transformation
  28. // 4. 数据接收器-sink
  29. System.out.println("--kafkaStream.print();--");
  30. kafkaStream.print();
  31. // 5. 触发执行-execute
  32. env.execute("ConnectorFlinkKafkaConsumerDemo") ;
  33. }
  34. }

执行报错如下:

  1. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  2. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
  3. at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
  4. at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
  5. at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
  6. at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
  7. at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
  8. at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
  9. at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
  10. at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
  11. at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
  12. at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
  13. at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1061)
  14. at akka.dispatch.OnComplete.internal(Future.scala:264)
  15. at akka.dispatch.OnComplete.internal(Future.scala:261)
  16. at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
  17. at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
  18. at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
  19. at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
  20. at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
  21. at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
  22. at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
  23. at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
  24. at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
  25. at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
  26. at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
  27. at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
  28. at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
  29. at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
  30. at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
  31. at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
  32. at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
  33. at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  34. at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
  35. at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
  36. at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
  37. at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
  38. at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  39. at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  40. at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  41. at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  42. Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
  43. at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
  44. at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
  45. at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
  46. at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
  47. at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
  48. at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
  49. at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
  50. at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
  51. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  52. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  53. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  54. at java.lang.reflect.Method.invoke(Method.java:498)
  55. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
  56. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
  57. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
  58. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
  59. at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
  60. at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
  61. at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
  62. at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
  63. at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
  64. at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
  65. at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
  66. at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
  67. at akka.actor.Actor.aroundReceive(Actor.scala:517)
  68. at akka.actor.Actor.aroundReceive$(Actor.scala:515)
  69. at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
  70. at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
  71. at akka.actor.ActorCell.invoke(ActorCell.scala:561)
  72. at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
  73. at akka.dispatch.Mailbox.run(Mailbox.scala:225)
  74. at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
  75. ... 4 more
  76. Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition testtopic-0 could be determined
  77. Process finished with exit code 1

经排除,找到文章:flink连接kafka报:org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic_未来的资深Java架构师的博客-CSDN博客

因为工程中log4j默认等级是error,所以,先配置resource/log4j.properties,日志等级改成info:

  1. log4j.rootLogger = info,console
  2. ### 配置输出到控制台 ###
  3. log4j.appender.console = org.apache.log4j.ConsoleAppender
  4. log4j.appender.console.Target = System.out
  5. log4j.appender.console.layout = org.apache.log4j.PatternLayout
  6. #log4j.appender.console.layout.ConversionPattern = %d{ABSOLUTE} %5p %c{ 1 }:%L - %m%n

再运行,发现如下错误:

  1. java.net.UnknownHostException: kafka
  2. at java.net.InetAddress.getAllByName0(InetAddress.java:1281)
  3. at java.net.InetAddress.getAllByName(InetAddress.java:1193)
  4. at java.net.InetAddress.getAllByName(InetAddress.java:1127)
  5. at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104)
  6. at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
  7. at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
  8. at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
  9. at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949)
  10. at org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
  11. at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
  12. at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
  13. at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
  14. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
  15. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
  16. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
  17. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
  18. at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240)
  19. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:788)
  20. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:750)
  21. at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2338)
  22. at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1725)
  23. at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
  24. at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:446)
  25. at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:243)

因为服务器的kafka我是装的docker版,用于测试:

.env配置如下:

  1. KAFKA_LOG_RETENTION_HOURS=16
  2. KAFKA_BROKER_ID=0
  3. KAFKA_ZOOKEEPER_CONNECT=zk-host:2181
  4. # 这个参数不能用0.0.0.0,不然会报错,需要设置成实际访问的ip
  5. KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
  6. KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092

查了下KAFKA_ADVERTISED_LISTENERS这个参数的解释:

深入理解KAFKA_ADVERTISED_LISTENERS_kafka_cfg_listeners_jason.zeng@1502207的博客-CSDN博客

KAFKA_ADVERTISED_LISTENERS:负责发布外网地址,这个地址会发布到zookeeper中。

所以最终会去访问kafka。

目前有两种方案:

1、在本地的hosts中配置域名指向;

  1. C:\Windows\System32\drivers\etc\HOSTS
  2. 11.0.24.216 kafka

2、修改kafka安装的配置,把kafka改成具体的ip

我通过1成功解决。

具体代码参考(gitee上):flink-java-1.12.7: flink1.12.7的java demo,包括flink wordcount示例,如何连接kafka

参考:flink连接kafka报:org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic_未来的资深Java架构师的博客-CSDN博客

docker安装kafka:Docker 安装Kafka_在奋斗的大道的博客-CSDN博客

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/816140?site
推荐阅读
相关标签
  

闽ICP备14008679号