赞
踩
一、背景
任务是基于Flink 1.13 版本java api 实现的,主要功能是将业务数据写到 hdfs 上。
二、问题
主要的报错信息如下
- 2022-10-24 18:18:42,206 WARN org.apache.flink.runtime.taskmanager.Task
- [] - Source: Kafka Source -> Flat Map -> Process -> Flat Map -> Sink: Unnamed (4/6)#676 (ea645f328d5a2ee473927cd39c92e050) switched from RUNNING to FAILED with failure
- cause: org.apache.flink.util.FlinkRuntimeException: Could not find a public truncate method on the Hadoop File System.
- at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.ensureTruncateInitialized(HadoopRecoverableFsDataOutputStream.java:186)
- at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:71)
- at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.open(HadoopRecoverableWriter.java:80)
- at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90)
- at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:36)
- at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243)
- at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)
- at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)
- at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
- at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492)
- at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
- at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
- at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
- at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
- at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
- at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
- at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
- at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
- at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
- at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
- at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
- at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:180)
- at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
- at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36)
- at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27)
- at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
- at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
- at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
- at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
- at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
- at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
- at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
- at java.lang.Thread.run(Thread.java:748)

报错信息已经很明显,及时找不到对应的方法。查看了相关的源码及调用关系也没有发现异常。后来又查看了一下 flink 的客户端。发下 lib 下面没有 hadoop 先关的 shaded 包。
因此,初步判定是缺包导致的问题。将 hadoop 相关的 shaded 包导入后,任务就可以正常运行了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。