当前位置:   article > 正文

Flink|提交 PyFlink 作业的踩坑记录_pyflink flinkkafkaconsumer

pyflink flinkkafkaconsumer

【本地】Java 类 FlinkKafkaConsumer 不存在报错

问题现象

在最初的 PyFlink 作业中,没有使用任何方式在命令行参数、代码中或 pyflink 仓库路径中添加 Kafka 所需的 jar 包。此时,运行 PyFlink 任务后报错如下:

TypeError: Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
  • 1

提示 Java 类 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer 不存在,提示用 --jarfile 参数导入所需的 jar 包。

解决方法

首先查找文件所在位置:

Step 1|在 apache 的 Github 账号搜索,发现 FlinkKafkaConsumer 类在 flink-connector-kafka 仓库中

然后制作 PyFlink 作业所需的 Jar 包,使用以下两种方法均可:

【解决方法 1】使用 flink-connector-kafka 源码制作 PyFlink 作业所需的 Jar 包

Step 1|从 Github 上拉取源码

从 Github 上拉取 flink-connector-kafka 的源码:

$ git clone https://github.com/apache/flink-connector-kafka.git
  • 1

如果 Github 无法直接访问,可以通过如下方法在拉取仓库时指定 http.proxy 配置信息 HTTP 代理(例如 clash 的本地端口为 7890):

$ git clone https://github.com/apache/flink-connector-kafka.git --config http.proxy=127.0.0.1:7890
  • 1

以上命令的含义,是指定在新的仓库中 http.proxy 配置等于 127.0.0.1:7890,在 clone 时这个新仓库的配置也会生效

Step 2|查找需要添加的 Java 类

flink-connector-kafka 仓库中,可以找到 FlinkKafkaConsumer 类在如下目录中:

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
  • 1

Step 3|将打包 fat jar 的添加到 pom.xml

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>3.1.1</version>
    <configuration>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>

    <executions>
        <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
</plugin>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

Step 4|使用 Maven 打包

Maven 打包命令:

mvn clean package -DskipTests -T 3
  • 1

Maven 打包结果:

flink-connector-kafka 项目的 target 目录下。

Step 5|将生成的 Jar 包添加的 PyFlink 的依赖中

添加方法的官方文档:应用开发 - Python API - 依赖管理 - JAR Dependencies

例如:

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///" + FLINK_CONNECTOR_KAFKA_JAR_PATH)
env.add_classpaths("file:///" + FLINK_CONNECTOR_KAFKA_JAR_PATH)
  • 1
  • 2
  • 3
【解决方法 2】在 Maven 仓库中下载 flink-connector-kafka 及其依赖的 Jar 包

参考资料:流批一体计算引擎-4-[Flink]消费kafka实时数据

Step 1|在 https://mvnrepository.com/ 中搜索 flink-connector-kafka

Step 2|下载它的 jar 包及其 Compile Dependencies 依赖的 jar 包

Step 3|将这些 Jar 包添加到 <PYTTHON_PATH>/lib/python37/site-packages/pyflink/lib

使用这种方法,不需要在代码中做任何修改。

【本地】Java 反射报错(java.lang.reflect.InaccessibleObjectException

问题现象

在添加了所需的 Jar 包依赖后,运行 PyFlink 脚本报错如下:

py4j.protocol.Py4JJavaError: An error occurred while calling o28.addSource.
: java.lang.reflect.InaccessibleObjectException: Unable to make field private static final long java.util.Properties.serialVersionUID accessible: module java.base does not "opens java.util" to unnamed module @45afc369
  • 1
  • 2

看起来很像是使用了过高的 JDK 版本,不支持对 JDK 标准库的类库进行反射导致。因为我的本地环境中有多个 JDK 环境,所以推测 Flink 使用了 JDK8 或 JDK11 以外的版本。

指定 Java 环境的官方文档:Deployment - 配置参数

解决方法

参考资料:[已解决]Windows 下使用pyflink出现PyFlink module java.base does not “opens java.lang“ to unnamed module

conf/flink-conf.yaml 中添加 env.java.home 参数,并指向 JDK 8 环境。

在 Python 环境下运行时,conf/flink-conf.yaml 文件在 <PYTTHON_PATH>/lib/python37/site-packages/pyflink/conf 中。

无效方法
  • 修改制作 Jar 包时 pom.xml 文件中的 scala 版本
  • 切换本地 Flink 版本

【本地】java.net.MalformedURLException

问题现象

执行 PyFlink 作业时报错如下:

py4j.protocol.Py4JJavaError: An error occurred while calling None.java.net.URL.
: java.net.MalformedURLException: unknown protocol: d
  • 1
  • 2
报错原因

在代码中添加的 Jar 包路径不正确,使用本地存储系统中的文件路径,而没有添加 file:/// 的前缀。

解决方案

添加 jar 包依赖的 Flink 官方文档:应用开发 - Python API - 依赖管理

将本地存储系统中的文件路径,替换为以 file:/// 前缀的路径,例如:

stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")
  • 1

【本地】本地运行 UDF 使用其他 Python 环境

问题现象

在通过 IDE 执行 PyFlink 作业时,发现虽然在 conf/flink-conf.yaml 文件中配置了 python.executable 参数,但是在运行时还是使用了其他 Python 环境,在日志中打印:

org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - Python interpreter path: python
  • 1
解决方案

推测在使用 IDE 直接启动 PyFlink 作业时,conf/flink-conf.yaml 文件中配置了 python.executable 参数实际不能生效。

在代码中直接指定,可以确保生效:

stream_execution_environment.set_python_executable(r"D:\py\Python37Flink14\python.exe"
  • 1

【本地】客户端运行缺少环境

问题现象

在已经指定了正确的 --pyRequirements 参数的前提下,在使用 flink run 方法执行 PyFlink 作业时客户端部分报错,提示缺少包。

问题原因

Flink 的 --pyRequirements 参数仅影响运行 Python UDF 的 Worker,而不影响 PyFlink 的客户端部分。

解决方案

在执行 flink run 的服务器的 Python 环境中,安装所需的包即可。

【集群】requirements.txt 缓存地址不存在的报错

问题现象

在使用 flink run 提交 PyFlink 作业时,出现 ClusterDeploymentException 报错,提示无法部署 Yarn job cluster,具体地是由于 FileNotFoundException 异常导致,提示是因为 cached 路径不存在:

py4j.protocol.Py4JJavaError: An error occurred while calling o22.execute.
: org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
	at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:482)
	at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2042)
	at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
	at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File cached_dir does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:930)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:469)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:386)
	at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2505)
	at org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:400)
	at org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:200)
	at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:868)
	at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:607)
	at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:475)
	... 15 more
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
原因定位

在启动命令中,包含 --pyRequirements .../requirements.txt#cached_dir 参数,其中指定了 Python 仓库的缓存地址 cached。出现这个报错是因为缓存地址的路径不存在导致的。

解决方案

这里的缓存地址是基于启动位置的相对路径 / 绝对路径。所以在启动位置创建 mkdir cached_dir 创建路径,并在目录中安装 Python UDF 运行所需的安装包。

【集群】PyFlink 作业提交后几十秒后就进入 FINISH 状态

问题现象

在使用 flink run 方法提交 PyFlink 作业后,大概 40 秒左右作业就进入 FINISH 状态。但是作业逻辑是消费 Kafka,原则上应该会永远跑下去。

原因定位

在任务启动后,就查看 Flink DataBoard,发现 Job failed during initialization of JobManager。查看日志后,发现是因为 Python 的依赖包不足。在 --pyRequirements 参数中指定了缓存路径后,PyFlink 的 Python UDF Worker 将只从该目录中读取安装包。

解决方案

参考文档:《Flink 官方文档 - 应用开发 - Python API - 依赖管理》

  1. 不再使用缓存地址,在任务部署到线上后由各个 Python UDF Worker 下载依赖包(但是这样 PyFlink 的作业启动速度会比较慢)
  2. 将所有依赖包下载到目录,命令如下:
pip download -d cached_dir -r requirements.txt --no-binary :all:
  • 1

【集群】使用 --no-binary :all: 下载 pip 包报错

问题现象

在使用 Flink 官方文档中的命令下载 pip 包时出现报错:

pip download -d cached_dir -r requirements.txt --no-binary :all:
  • 1

查看日志,发现报错出现在安装 importlib_metadata 时。

问题定位

推测可能是添加了 --no-binary :all: 参数,导致 pip 在尝试解包、打包时报错导致。

解决方案

在服务器上,不添加 --no-binary :all: 命令下载 pip 包:

pip download -d cached_dir -r requirements.txt --no-binary :all:
  • 1

此时,执行 PyFlink 任务时,观察 TaskManager 日志,很可能会出现一些包找不到的情况。日志类似于:

Could not find a version that satisfies the requirement greenlet!=0.4.17
  • 1

我们可以结合缓存地址中的安装包,观察是哪些包找不断,从而推断 Flink Python UDF 的运行环境。例如,我这里似乎是包含 manylinux2010manlinux2014 的 whl 文件无法被使用,而 manulinux1 的 whl 文件可以被使用。那么,整理在 TaskManager 日志中出现的所有无法被获取到的包,并使用 wget 直接从 pypi 上下载对应版本适合不同环境的包到缓存地址中。

也可以使用其他镜像,例如清华镜像:https://pypi.tuna.tsinghua.edu.cn/simple/{包名}/(建议添加包名使用,否则会很慢)

【集群】Jar 包与集群版本不匹配

报错信息
switched from INITIALIZING to FAILED with failure cause: java.lang.NoSuchMethodError: org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()Lorg/apache/flink/metrics/groups/OperatorMetricGroup;
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1402)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1379)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:1002)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:102)
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1223)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
报错原因

参考文档:【日常Exception】第三十三回:Flink运行jar包报错NoSuchMethodError: org.apache.flink.api.common.functions.Runtime…

jar 包使用的 1.14.3 版本,但 Flink 集群是在 1.13 版本上。

解决方法

切换集群即可。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/789273
推荐阅读
相关标签
  

闽ICP备14008679号