当前位置:   article > 正文

Flink Caused by:org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException

org.apache.flink.streaming.connectors.kafka.internals.handover$closedexcepti

Flink程序从kafka中读取数据进行计算,FLink程序一启动就报以下错误,看到错误很懵逼。加班到9点没解决,第二天提前来半小时,把如下错误信息又看了一遍。具体错误如下:

错误信息1.  

  1. 20/12/17 09:31:07 WARN NetworkClient: [Consumer clientId=consumer-14, groupId=qa_topic_flink_group] Error connecting to node 172.16.40.233:9092 (id: -3 rack: null)
  2. java.nio.channels.ClosedByInterruptException
  3. at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
  4. at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)
  5. at org.apache.kafka.common.network.Selector.doConnect(Selector.java:278)
  6. at org.apache.kafka.common.network.Selector.connect(Selector.java:256)
  7. at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920)
  8. at org.apache.kafka.clients.NetworkClient.access$700(NetworkClient.java:67)
  9. at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1090)
  10. at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:976)
  11. at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:533)
  12. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
  13. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
  14. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
  15. at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292)
  16. at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803)
  17. at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771)
  18. at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
  19. at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
  20. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
  21. at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
  22. at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
  23. at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
  24. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
  25. at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
  26. at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
  27. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
  28. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  29. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  30. at java.lang.Thread.run(Thread.java:748)

错误信息2:

  1. 20/12/17 09:31:27 WARN StreamTask: Error while canceling task.
  2. org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  3. at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
  4. at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
  5. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
  6. at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
  7. at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
  8. at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
  9. at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)
  10. at java.lang.Thread.run(Thread.java:748)
  11. 20/12/17 09:31:27 WARN StreamTask: Error while canceling task.
  12. org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  13. at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
  14. at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
  15. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
  16. at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
  17. at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
  18. at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
  19. at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)
  20. at java.lang.Thread.run(Thread.java:748)

解决办法:我们kafka的topic为9个分区。因此我们Flink程序中的并行度也要设置为9.

运行程序后正常了。

后续又来了。。。。当我们把并行度设置为9的时候,我的数据要分为好多个侧输出流,

当我写到最后一个侧输出流的时候,又爆了以上同样的错误。然后我就各种尝试。

然后我把所有的侧输出流都注释掉了,一个一个的打开,每每到最后一个侧输出流打开时就报错了。

很郁闷!我开始怀疑玄学了。。。。

最后突发灵感。看看资源管理器,怀疑是内存不够,

但是发现从程序刚启动就报错,内存里面啥也没有。但是在我启动程序的瞬间发现了下图。。。。。。

此图为CPU的资源使用情况,发现在启动程序的瞬间CPU使用率到了100%。

这时就怀疑是CPU了。

果然修改了并行度为3,好了。

env.setParallelism(3);

然后重新启动程序正常CPU资源使用图如下:

启动瞬间没有到100%启动正常。。。。

开心。。。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/789311
推荐阅读
相关标签
  

闽ICP备14008679号