当前位置:   article > 正文

Error:KafkaStorageException打开的文件过多

kafkastorageexception

问题描述,在Flink集群大数据处理过程中,向Kafka进行生产数据和消费数据;如果Flink处理过程中出现异常,采取相应的重启机制或设置检查点策略;项目启动后,随着设备接入越来越多,kafka的topic动态产生的也越来越多,Flink处理开始出现异常

  1. java.io.IOException: Could not perform checkpoint 87 for operator Sink: Unnamed (34/90)#99.
  2. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
  3. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
  4. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
  5. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
  6. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
  7. at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
  8. at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
  9. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
  10. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
  11. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
  12. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
  13. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
  14. at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
  15. at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
  16. at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
  17. at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
  18. at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
  19. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
  20. at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
  21. at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
  22. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
  23. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
  24. at java.lang.Thread.run(Thread.java:748)
  25. Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 87 for operator Sink: Unnamed (34/90)#99. Failure reason: Checkpoint was declined.
  26. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
  27. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)
  28. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
  29. at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)
  30. at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
  31. at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)
  32. at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)
  33. at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
  34. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
  35. at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
  36. at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
  37. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1258)
  38. ... 22 more
  39. Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 27 record(s) for topic-call_XAucjhIN-0:120000 ms has passed since batch creation
  40. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1429)
  41. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1117)
  42. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:1014)
  43. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:102)
  44. at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:345)
  45. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1122)
  46. at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
  47. at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
  48. at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
  49. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:219)
  50. ... 33 more
  51. Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 27 record(s) for topic-call_XAucjhIN-0:120000 ms has passed since batch creation

Kafka集群中某一台服务器挂掉,报错信息如下:

  1. [2022-08-01 14:55:22,453] ERROR Error while writing to checkpoint file /home/kafka-logs/fan_sink_29-1/leader-epoch-checkpoint (kafka.server.LogDirFailureChannel)
  2. java.io.FileNotFoundException: /home/kafka-logs/topic_min/leader-epoch-checkpoint.tmp (打开的文件过多)
  3. at java.io.FileOutputStream.open0(Native Method)
  4. at java.io.FileOutputStream.open(FileOutputStream.java:270)
  5. at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
  6. at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
  7. at kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:94)
  8. at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:92)
  9. at kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:70)
  10. at kafka.server.epoch.LeaderEpochFileCache.flush(LeaderEpochFileCache.scala:292)
  11. at kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:61)
  12. at kafka.log.Log.$anonfun$maybeAssignEpochStartOffset$1(Log.scala:1368)
  13. at kafka.log.Log.$anonfun$maybeAssignEpochStartOffset$1$adapted(Log.scala:1367)
  14. at scala.Option.foreach(Option.scala:437)
  15. at kafka.log.Log.maybeAssignEpochStartOffset(Log.scala:1367)
  16. at kafka.cluster.Partition.$anonfun$makeLeader$1(Partition.scala:592)
  17. at kafka.cluster.Partition.makeLeader(Partition.scala:547)
  18. at kafka.server.ReplicaManager.$anonfun$makeLeaders$5(ReplicaManager.scala:1568)
  19. at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
  20. at scala.collection.mutable.HashMap$Node.foreachEntry(HashMap.scala:633)
  21. at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:499)
  22. at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:1566)
  23. at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1411)
  24. at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:258)
  25. at kafka.server.KafkaApis.handle(KafkaApis.scala:171)
  26. at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
  27. at java.lang.Thread.run(Thread.java:748)

处理方案如下:

//修改操作系统限制
[root@kafka101 ~] vi /etc/security/limits.conf

root soft nofile 65536
root hard nofile 65536

//查找包含kafka的目录或文件【定位kafka.service】

[root@kafka103 ~]# cd /

[root@kafka103 ~]# find / -name *kafka*

/etc/systemd/system/kafka.service

[root@kafka103 ~]# cd /etc/systemd/system/
//修改配置-增加读取文件大小

[root@kafka103 ~]# vi kafka.service

#增加最大文件数
LimitNOFILE=65535

[root@kafka103 ~]# systemctl daemon-reload 

//重启kafka

[root@kafka103 ~]# systemctl stop kafka

[root@kafka103 ~]# systemctl start kafka

//查看kafka进程

[root@kafka103 system]# ps -ef|grep kafka
这里找到kafka进程号为19694

  1. [root@kafka103 system]# cat /proc/19694/limits
  2. Limit Soft Limit Hard Limit Units
  3. Max cpu time unlimited unlimited seconds
  4. Max file size unlimited unlimited bytes
  5. Max data size unlimited unlimited bytes
  6. Max stack size 8388608 unlimited bytes
  7. Max core file size 0 unlimited bytes
  8. Max resident set unlimited unlimited bytes
  9. Max processes 2062355 2062355 processes
  10. Max open files 65535 65535 files
  11. Max locked memory 65536 65536 bytes
  12. Max address space unlimited unlimited bytes
  13. Max file locks unlimited unlimited locks
  14. Max pending signals 2062355 2062355 signals
  15. Max msgqueue size 819200 819200 bytes
  16. Max nice priority 0 0
  17. Max realtime priority 0 0
  18. Max realtime timeout unlimited unlimited

Max Open Files  已经变为65535

至此"打开文件过多"问题已处理完毕

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/816169
推荐阅读
相关标签
  

闽ICP备14008679号