赞
踩
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$$Lambda$152/1531429858.apply(Unknown Source) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
//2、从Kafka读取数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xx.xxx.xxx.xx:9092");
properties.setProperty("group.id", "consumer-group");
1、在网上搜了半天,大多数都是说需要改kafka的server.properties配置,指明0.0.0.0,外网才能访问(其实是不对的,压根就不需要改,kafka安装好里面参数是啥就是啥
)。
listeners=PLAINTEXT://0.0.0.0:9092
2、还有说程序中引入的scala依赖需要跟Linux上运行的kafka内嵌的scala版本一致(这个确实需要对应
),但是改完都没什么用。
3、阿里云服务器的用户组端口也开放了。
4、后面就又搜到了一篇文章,试了试,原来真的是阿里云服务器有别名配置的原因。
1、windows使用telnet工具排查下,发现能够正常通信,说明原因不在kafka配置这一块。
telnet xx.xxx.xxx.xx 9092
2、此时打开程序debug级别
日志(开始默认是error级别,少了很多重要日志)查看,发现日志不停在报不知道该主机信息的错误,此时才发现这个主机名竟然是自己服务器的别名,明明自己配的是ip地址。
5464 [Kafka Fetcher for Source: Custom Source -> Sink: Print to Std. Out (6/16)#0] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-consumer-group-27, groupId=consumer-group] Error connecting to node gZervbrgme7rrth0rthjrthu545:9092 (id: 0 rack: null) java.net.UnknownHostException: gZervbrgme7rrth0rthjrthu545 at java.net.InetAddress.getAllByName0(InetAddress.java:1259) at java.net.InetAddress.getAllByName(InetAddress.java:1171) at java.net.InetAddress.getAllByName(InetAddress.java:1105) at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949) at org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
3、于是便将Windows机器上的hosts文件改了下映射,最后发现可以正常使用了。
xx.xxx.xxx.xx gZervbrgme7rrth0rthjrthu545
文章参考:https://blog.csdn.net/maoyuanming0806/article/details/80553632
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。