赞
踩
Flink 提供了一个命令行界面 (CLI) bin/flink 来运行打包为 JAR 文件的程序并控制其执行。CLI 是任何 Flink 设置的一部分,可在本地单节点设置和分布式设置中使用。它连接到在conf/flink-conf.yaml中指定的正在运行的JobManager。
本节中列出的命令工作的先决条件是运行 Flink 部署,如 Kubernetes、YARN 或任何其他可用的选项。您可以随意在本地启动 Flink 集群,在您自己的机器上尝试这些命令。
提交作业意味着将作业的 JAR 和相关依赖项上传到 Flink 集群并启动作业执行。对于此示例,我们选择一个长时间运行的作业,如 examples/streaming/StateMachineExample.jar。您可以随意从 examples/文件夹中选择任何其他JAR存档或部署自己的作业。
$ ./bin/flink run \
--detached \
./examples/streaming/StateMachineExample.jar
使用 --detached 提交作业将使该命令在提交完成后返回。输出包含(除其他内容外)新提交的作业的 ID。
Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]
Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]
Options for both the above setups:
[--backend <file|rocks>]
[--checkpoint-dir <filepath>]
[--async-checkpoints <true|false>]
[--incremental-checkpoints <true|false>]
[--output <filepath> OR null for stdout]
Using standalone source with error rate 0.000000 and sleep delay 1 millis
Job has been submitted with JobID cca7bc1061d61cf15238e92312c2fc20
打印的使用情况信息列出了与作业相关的参数,如有必要,可以将其添加到作业提交命令的末尾。为了便于阅读,我们假设返回的 JobID 存储在以下命令的变量JOB_ID中:
$ export JOB_ID="cca7bc1061d61cf15238e92312c2fc20"
还有另一个名为 run-application 的操作可用于在 Application Mode下运行作业。本文档不单独处理此操作,因为它在 CLI 前端方面的工作方式类似于运行操作。
run 和run-application程序命令支持通过 -D 参数传递其他配置参数。例如,可以通过设置 -Dpipeline.max-parallelism=120 来完成设置作业的最大并行度。此参数对于配置每个作业或应用程序模式群集非常有用,因为您可以将任何配置参数传递给群集,而无需更改配置文件。
将作业提交到现有会话集群时,仅支持执行配置参数。
您可以使用列表操作监视任何正在运行的作业:
$ ./bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
30.11.2020 16:02:29 : cca7bc1061d61cf15238e92312c2fc20 : State machine job (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
已提交但尚未启动的作业将列在“Scheduled Jobs”下。
可以创建保存点以保存作业所处的当前状态。所需要的只是工作 ID:
$ ./bin/flink savepoint $JOB_ID /tmp/flink-savepoints
Triggering savepoint for job cca7bc1061d61cf15238e92312c2fc20.
Waiting for response...
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
You can resume your program from this savepoint with the run command.
savepoint 文件夹是可选的,如果未设置 state.savepoints.dir,则需要指定该文件夹。
最后,您可以选择提供保存点的二进制格式。
保存点的路径可以在以后用于重新启动 Flink 作业。
保存点操作还可用于删除保存点。–dispose 需要添加相应的保存点路径:
$ ./bin/flink savepoint \
--dispose \
/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
$JOB_ID
Disposing savepoint '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab'.
Waiting for response...
Savepoint '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab' disposed.
如果使用自定义状态实例(例如自定义还原状态或 RocksDB 状态),则必须指定用于触发保存点的程序 JAR 的路径。否则,您将遇到一个 ClassNotFoundException:
$ ./bin/flink savepoint \
--dispose <savepointPath> \
--jarfile <jarFile>
通过保存点操作触发保存点处置不仅会从存储中删除数据,还会使 Flink 清理与保存点相关的元数据。
停止作业的另一个操作是 stop。这是一种在停止从源流向接收器时停止正在运行的流式处理作业的更优雅方式。当用户请求停止作业时,将请求所有源发送将触发保存点的最后一个检查点屏障,并且在该保存点成功完成后,它们将通过调用 cancel() 方法完成。
$ ./bin/flink stop \
--savepointPath /tmp/flink-savepoints \
$JOB_ID
Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
如果未设置 state.savepoints.dir,我们必须使用 --savepointPath 指定 savepoint 文件夹。
如果指定了 --drain 标志,则在最后一个检查点屏障之前将发出MAX_WATERMARK。这将使所有已注册的事件时间计时器都处于触发状态,从而清除正在等待特定水印的任何状态,例如窗口。作业将继续运行,直到所有源正确关闭。这允许作业完成对所有动态数据的处理,这可以生成一些记录,以便在停止时获取的保存点之后进行处理。
如果要永久终止作业,请使用 --drain 标志。如果要在以后的某个时间点恢复作业,请不要排空管道,因为在恢复作业时可能会导致不正确的结果。
最后,您可以选择提供保存点的二进制格式。
取消作业可以通过取消操作实现:
$ ./bin/flink cancel $JOB_ID
Cancelling job cca7bc1061d61cf15238e92312c2fc20.
Cancelled job cca7bc1061d61cf15238e92312c2fc20.
相应作业的状态将从“正在运行”转换为“已取消”。任何计算都将停止。
–withSavepoint 标志允许在作业取消过程中创建保存点。此功能已弃用。请改用停止操作。
可以使用运行(和运行应用程序)操作从保存点启动作业。
$ ./bin/flink run \
--detached \
--fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./examples/streaming/StateMachineExample.jar
Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]
Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]
Options for both the above setups:
[--backend <file|rocks>]
[--checkpoint-dir <filepath>]
[--async-checkpoints <true|false>]
[--incremental-checkpoints <true|false>]
[--output <filepath> OR null for stdout]
Using standalone source with error rate 0.000000 and sleep delay 1 millis
Job has been submitted with JobID 97b20a0a8ffd5c1d656328b0cd6436a6
查看该命令如何等于初始 run 命令,但 --fromSavepoint 参数除外,该参数用于引用以前停止的作业的状态。将生成一个新的作业 ID,可用于维护作业。
默认情况下,我们尝试将整个保存点状态与正在提交的作业进行匹配。如果要允许跳过无法使用新作业还原的保存点状态,可以设置 --allowNonRestoredState 标志。如果从程序中删除了在触发保存点时属于程序一部分的算子,并且您仍希望使用保存点,则需要允许此操作。
$ ./bin/flink run \
--fromSavepoint <savepointPath> \
--allowNonRestoredState ...
如果您的程序删除了作为保存点一部分的算子,这将非常有用。
您还可以选择应用于保存点的恢复模式。该模式控制谁将取得指定保存点文件的所有权。
以下是 Flink 的 CLI 工具支持的操作概述:
Action | Purpose(目的) |
---|---|
run | 此操作将执行作业。它至少需要包含作业的jar。如有必要,可以传递与 Flink 或作业相关的参数。 |
run-application | 此操作在Application Mode下执行作业。除此之外,它需要与运行操作相同的参数。 |
info | 此操作可用于打印已传递作业的优化执行图。同样,需要传递包含作业的 jar。 |
list | 此操作将列出所有正在运行的作业或计划的作业。 |
savepoint | 此操作可用于为给定作业创建或释放保存点。如果 conf/flink-conf.yaml 中未指定 state.savepoints.dir 参数,则可能需要指定 JobID 之外的 savepoint 目录。 |
cancel | 此操作可用于根据其作业 ID 取消正在运行的作业。 |
stop | 此操作将取消和保存点操作组合在一起以停止正在运行的作业,同时还会创建一个保存点以再次开始。 |
所有操作及其参数的更细粒度的描述可以通过 bin/flink --help 或每个单独的操作 bin/flink --help 的使用信息进行访问。
Flink 集群也可以使用 REST API 进行管理。前面几节中描述的命令是 Flink 的 REST 端点所提供命令的子集。因此,像curl这样的工具可以用来从Flink中获得更多。
Flink 与多个集群管理框架(如 Kubernetes 或 YARN)兼容,这些框架在资源提供程序部分有更详细的描述。可以在不同的部署模式下提交作业。作业提交的参数化因基础框架和部署模式而异。
bin/flink 提供了一个参数 --target 来处理不同的选项。除此之外,还必须使用run(对于Session and Per-Job Mode)或run-application(对于 Application Mode)提交作业。请参阅以下参数组合摘要:
–target 将覆盖 conf/flink-conf.yaml 中指定的 execute.target。
有关命令和可用选项的更多详细信息,请参阅文档的资源提供程序特定页面。
目前,用户可以通过 CLI 提交 PyFlink 作业。它不需要指定 JAR 文件路径或条目 main 类,这与 Java 作业提交不同。
当通过 flink run 提交 Python 作业时,Flink 将运行命令 “python”。请运行以下命令以确认当前环境中的 python 可执行文件指向受支持的 Python 版本 3.6+。
$ python --version
# the version printed here must be 3.6+
以下命令显示了不同的 PyFlink 作业提交用例:
$ ./bin/flink run --python examples/python/table/word_count.py
使用其他源文件和资源文件运行 PyFlink 作业。在 --pyFiles 中指定的文件将被添加到 PYTHONPATH 中,因此在 Python 代码中可用。
$ ./bin/flink run \
--python examples/python/table/word_count.py \
--pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
$ ./bin/flink run \
--python examples/python/table/word_count.py \
--jarfile <jarFile>
使用 pyFiles 和 --pyModule 中指定的主入口模块运行 PyFlink 作业:
$ ./bin/flink run \
--pyModule table.word_count \
--pyFiles examples/python/table
在主机上运行的特定 JobManager 上提交 PyFlink 作业(相应地调整命令):
$ ./bin/flink run \
--jobmanager <jobmanagerHost>:8081 \
--python examples/python/table/word_count.py
$ ./bin/flink run \
--target yarn-per-job
--python examples/python/table/word_count.py
$ ./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dyarn.application.name=<ApplicationName> \
-Dyarn.ship-files=/path/to/shipfiles \
-pyarch shipfiles/venv.zip \
-pyclientexec venv.zip/venv/bin/python3 \
-pyexec venv.zip/venv/bin/python3 \
-py shipfiles/word_count.py
注意 它假定执行作业所需的 Python 依赖项已放置在 /path/to/shipfiles 目录中。例如,对于上面的示例,它应该包含 venv.zip 和 word_count.py。
注意:当它在 YARN 应用程序模式下执行 JobManager 上的作业时,-pyarch 和 -py 中指定的路径是相对于 shipfile 的路径,shipfile 是已交付文件的目录名称。
注意:通过 -pyarch 指定的存档文件将通过 Blob 服务器分发到任务管理器,其中文件大小限制为 2 GB。如果归档文件的大小超过 2 GB,则可以将其上载到分布式文件系统,然后使用命令行选项 -pyarch 中的路径。
$ ./bin/flink run-application \
--target kubernetes-application \
--parallelism 8 \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=<PyFlinkImageName> \
--pyModule word_count \
--pyFiles /opt/flink/examples/python/table/word_count.py
要了解更多可用选项,请参阅资源提供程序部分中更详细地介绍的 Kubernetes 或 YARN。
除了上面提到的 --pyFiles、–pyModule 和 --python 之外,还有其他一些与 Python 相关的选项。以下是 Flink 的 CLI 工具支持的操作运行和运行应用程序的所有 Python 相关选项的概述:
Option | Description |
---|---|
-py,–python | 具有程序入口点的 Python 脚本。可以使用 --pyFiles 选项配置依赖资源。 |
-pym,–pyModule | 具有程序入口点的 Python 模块。此选项必须与 --pyFiles 结合使用。 |
-pyfs,–pyFiles | 附加作业的自定义文件。标准资源文件后缀,如.py/.egg/.zip/.whl 或目录。这些文件将被添加到本地客户端和远程 python UDF 工作线程的 PYTHONPATH 中。后缀为.zip的文件将被提取并添加到PYTHONPATH中。逗号(‘,’)可以用作指定多个文件的分隔符(例如,–pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)。 |
-pyarch,–pyArchives | 为作业添加 python 存档文件。归档文件将被解压缩到 python UDF worker 的工作目录中。对于每个归档文件,指定一个目标目录。如果指定了目标目录名,则归档文件将被提取到具有指定名称的目录中。否则,存档文件将被提取到与存档文件同名的目录中。通过此选项上传的文件可通过相对路径访问。“#”可用作归档文件路径和目标目录名称的分隔符。逗号 (‘,’) 可用作指定多个存档文件的分隔符。此选项可用于上传虚拟环境,Python UDF中使用的数据文件(例如,–pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable py37.zip/bin/python)。数据文件可以在Python UDF中访问,例如:f = open(‘data/data.txt’, ‘r’)。 |
-pyclientexec,–pyClientExecutable | Python 解释器的路径,用于在通过 “flink run”提交 Python 作业或编译包含 Python UDF 的 Java/Scala 作业时启动 Python 进程。(例如,–pyArchives file:///tmp/py37.zip --pyClientExecutable py37.zip/py37/python) |
-pyexec,–pyExecutable | 指定用于执行 python UDF worker 的 python 解释器的路径(例如:–pyExecutable /usr/local/bin/python3)。python UDF worker依赖于Python 3.6+,Apache Beam(版本== 2.27.0),Pip(版本>= 7.1.0)和SetupTools(版本>= 37.0.0)。请确保指定的环境符合上述要求。 |
-pyreq,–pyRequirements | 指定要求.txt定义第三方依赖项的文件。这些依赖项将被安装并添加到 python UDF worker 的 PYTHONPATH 中。可以选择指定包含这些依赖项的安装包的目录。如果可选参数存在,请使用“#”作为分隔符(例如,–pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir)。 |
除了提交作业期间的命令行选项外,它还支持通过代码中的配置或 Python API 指定依赖项。有关更多详细信息,请参阅依赖项管理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。