赞
踩
FlinkKafkaConsumer
不存在报错在最初的 PyFlink 作业中,没有使用任何方式在命令行参数、代码中或 pyflink 仓库路径中添加 Kafka 所需的 jar 包。此时,运行 PyFlink 任务后报错如下:
TypeError: Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
提示 Java 类 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
不存在,提示用 --jarfile
参数导入所需的 jar 包。
首先查找文件所在位置:
Step 1|在 apache 的 Github 账号搜索,发现 FlinkKafkaConsumer
类在 flink-connector-kafka
仓库中
然后制作 PyFlink 作业所需的 Jar 包,使用以下两种方法均可:
flink-connector-kafka
源码制作 PyFlink 作业所需的 Jar 包Step 1|从 Github 上拉取源码
从 Github 上拉取 flink-connector-kafka
的源码:
$ git clone https://github.com/apache/flink-connector-kafka.git
如果 Github 无法直接访问,可以通过如下方法在拉取仓库时指定 http.proxy
配置信息 HTTP 代理(例如 clash 的本地端口为 7890):
$ git clone https://github.com/apache/flink-connector-kafka.git --config http.proxy=127.0.0.1:7890
以上命令的含义,是指定在新的仓库中 http.proxy
配置等于 127.0.0.1:7890
,在 clone 时这个新仓库的配置也会生效
Step 2|查找需要添加的 Java 类
在 flink-connector-kafka
仓库中,可以找到 FlinkKafkaConsumer
类在如下目录中:
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
Step 3|将打包 fat jar 的添加到 pom.xml
中
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
Step 4|使用 Maven 打包
Maven 打包命令:
mvn clean package -DskipTests -T 3
Maven 打包结果:
在 flink-connector-kafka
项目的 target
目录下。
Step 5|将生成的 Jar 包添加的 PyFlink 的依赖中
添加方法的官方文档:应用开发 - Python API - 依赖管理 - JAR Dependencies
例如:
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///" + FLINK_CONNECTOR_KAFKA_JAR_PATH)
env.add_classpaths("file:///" + FLINK_CONNECTOR_KAFKA_JAR_PATH)
flink-connector-kafka
及其依赖的 Jar 包Step 1|在 https://mvnrepository.com/ 中搜索 flink-connector-kafka
包
Step 2|下载它的 jar 包及其 Compile Dependencies 依赖的 jar 包
Step 3|将这些 Jar 包添加到 <PYTTHON_PATH>/lib/python37/site-packages/pyflink/lib
中
使用这种方法,不需要在代码中做任何修改。
java.lang.reflect.InaccessibleObjectException
)在添加了所需的 Jar 包依赖后,运行 PyFlink 脚本报错如下:
py4j.protocol.Py4JJavaError: An error occurred while calling o28.addSource.
: java.lang.reflect.InaccessibleObjectException: Unable to make field private static final long java.util.Properties.serialVersionUID accessible: module java.base does not "opens java.util" to unnamed module @45afc369
看起来很像是使用了过高的 JDK 版本,不支持对 JDK 标准库的类库进行反射导致。因为我的本地环境中有多个 JDK 环境,所以推测 Flink 使用了 JDK8 或 JDK11 以外的版本。
指定 Java 环境的官方文档:Deployment - 配置参数
参考资料:[已解决]Windows 下使用pyflink出现PyFlink module java.base does not “opens java.lang“ to unnamed module
在 conf/flink-conf.yaml
中添加 env.java.home
参数,并指向 JDK 8 环境。
在 Python 环境下运行时,conf/flink-conf.yaml
文件在 <PYTTHON_PATH>/lib/python37/site-packages/pyflink/conf
中。
pom.xml
文件中的 scala 版本java.net.MalformedURLException
执行 PyFlink 作业时报错如下:
py4j.protocol.Py4JJavaError: An error occurred while calling None.java.net.URL.
: java.net.MalformedURLException: unknown protocol: d
在代码中添加的 Jar 包路径不正确,使用本地存储系统中的文件路径,而没有添加 file:///
的前缀。
添加 jar 包依赖的 Flink 官方文档:应用开发 - Python API - 依赖管理
将本地存储系统中的文件路径,替换为以 file:///
前缀的路径,例如:
stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")
在通过 IDE 执行 PyFlink 作业时,发现虽然在 conf/flink-conf.yaml
文件中配置了 python.executable
参数,但是在运行时还是使用了其他 Python 环境,在日志中打印:
org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - Python interpreter path: python
推测在使用 IDE 直接启动 PyFlink 作业时,conf/flink-conf.yaml
文件中配置了 python.executable
参数实际不能生效。
在代码中直接指定,可以确保生效:
stream_execution_environment.set_python_executable(r"D:\py\Python37Flink14\python.exe"
在已经指定了正确的 --pyRequirements
参数的前提下,在使用 flink run
方法执行 PyFlink 作业时客户端部分报错,提示缺少包。
Flink 的 --pyRequirements
参数仅影响运行 Python UDF 的 Worker,而不影响 PyFlink 的客户端部分。
在执行 flink run
的服务器的 Python 环境中,安装所需的包即可。
requirements.txt
缓存地址不存在的报错在使用 flink run
提交 PyFlink 作业时,出现 ClusterDeploymentException
报错,提示无法部署 Yarn job cluster,具体地是由于 FileNotFoundException
异常导致,提示是因为 cached 路径不存在:
py4j.protocol.Py4JJavaError: An error occurred while calling o22.execute.
: org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:482)
at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2042)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File cached_dir does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:930)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:469)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:386)
at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2505)
at org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:400)
at org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:200)
at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:868)
at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:607)
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:475)
... 15 more
在启动命令中,包含 --pyRequirements .../requirements.txt#cached_dir
参数,其中指定了 Python 仓库的缓存地址 cached
。出现这个报错是因为缓存地址的路径不存在导致的。
这里的缓存地址是基于启动位置的相对路径 / 绝对路径。所以在启动位置创建 mkdir cached_dir
创建路径,并在目录中安装 Python UDF 运行所需的安装包。
在使用 flink run
方法提交 PyFlink 作业后,大概 40 秒左右作业就进入 FINISH 状态。但是作业逻辑是消费 Kafka,原则上应该会永远跑下去。
在任务启动后,就查看 Flink DataBoard,发现 Job failed during initialization of JobManager
。查看日志后,发现是因为 Python 的依赖包不足。在 --pyRequirements
参数中指定了缓存路径后,PyFlink 的 Python UDF Worker 将只从该目录中读取安装包。
pip download -d cached_dir -r requirements.txt --no-binary :all:
--no-binary :all:
下载 pip 包报错在使用 Flink 官方文档中的命令下载 pip 包时出现报错:
pip download -d cached_dir -r requirements.txt --no-binary :all:
查看日志,发现报错出现在安装 importlib_metadata
时。
推测可能是添加了 --no-binary :all:
参数,导致 pip 在尝试解包、打包时报错导致。
在服务器上,不添加 --no-binary :all:
命令下载 pip 包:
pip download -d cached_dir -r requirements.txt --no-binary :all:
此时,执行 PyFlink 任务时,观察 TaskManager 日志,很可能会出现一些包找不到的情况。日志类似于:
Could not find a version that satisfies the requirement greenlet!=0.4.17
我们可以结合缓存地址中的安装包,观察是哪些包找不断,从而推断 Flink Python UDF 的运行环境。例如,我这里似乎是包含 manylinux2010
和 manlinux2014
的 whl 文件无法被使用,而 manulinux1
的 whl 文件可以被使用。那么,整理在 TaskManager 日志中出现的所有无法被获取到的包,并使用 wget 直接从 pypi 上下载对应版本适合不同环境的包到缓存地址中。
也可以使用其他镜像,例如清华镜像:https://pypi.tuna.tsinghua.edu.cn/simple/{包名}/
(建议添加包名使用,否则会很慢)
switched from INITIALIZING to FAILED with failure cause: java.lang.NoSuchMethodError: org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()Lorg/apache/flink/metrics/groups/OperatorMetricGroup;
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1402)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1379)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:1002)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:102)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1223)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
参考文档:【日常Exception】第三十三回:Flink运行jar包报错NoSuchMethodError: org.apache.flink.api.common.functions.Runtime…
jar 包使用的 1.14.3 版本,但 Flink 集群是在 1.13 版本上。
切换集群即可。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。