赞
踩
1.1 提交方式
Flink Job 可以通过以下两种方式进行提交:
(1)使用命令行工具 flink run 提交 Job
(2)使用 Flink 提供的 Java 或 Scala API 进行提交
1.2 提交流程
(1)Flink Job 提交时,首先需要配置 JobManager 的地址和端口号,以便与 JobManager 建立连接。
(2)JobManager 接收到 Job 提交请求后,会对 Job 进行解析和验证,确保 Job 的正确性。
(3)JobManager 将解析后的 Job 图发送给相应的 TaskManager,TaskManager 根据 Job 图进行任务的调度和执行。
2.1 JobManager 参数配置
JobManager 是 Flink Job 的调度中心,负责接收和处理 Job 的提交请求。
(1)jobmanager.rpc.address:JobManager 的主机地址,默认为 localhost。
(2)jobmanager.rpc.port:JobManager 的端口号,默认为 6123。
(3)jobmanager.memory.process.size:JobManager 进程的内存大小,默认为 1024m。
(4)jobmanager.memory.jvm-overhead.size:JobManager JVM 的堆外内存大小,默认为 192m。
(5)jobmanager.memory.framework.heap.size:JobManager 的堆内存大小,默认为 512m。
(6)jobmanager.memory.framework.off-heap.size:JobManager 的堆外内存大小,默认为 0m。
(7)jobmanager.memory.network.buffer.size:JobManager 网络缓冲区的大小,默认为 64k。
(8)jobmanager.execution.failover-strategy:JobManager 失败恢复策略,默认为 none。
2.2 TaskManager 参数配置
TaskManager 负责执行 Flink Job 中的任务。
(1)taskmanager.memory.process.size:TaskManager 进程的内存大小,默认为 1024m。
(2)taskmanager.memory.jvm-overhead.size:TaskManager JVM 的堆外内存大小,默认为 192m。
(3)taskmanager.memory.framework.heap.size:TaskManager 的堆内存大小,默认为 1024m。
(4)taskmanager.memory.framework.off-heap.size:TaskManager 的堆外内存大小,默认为 0m。
(5)taskmanager.memory.network.buffer.size:TaskManager 网络缓冲区的大小,默认为 64k。
(6)taskmanager.execution.slots:TaskManager 可用的任务槽数量,默认为 1。
(7)taskmanager.execution.memory.preallocate:是否预分配 TaskManager 内存,默认为 false。
2.3 Job 参数配置
Flink Job 的参数配置主要包括并行度、时间特性、重启策略等。
(1)ExecutionConfig.setParallelism(int parallelism):设置 Job 的并行度。
(2)ExecutionConfig.setAutoWatermarkInterval(long interval):设置自动生成 Watermark 的时间间隔。
(3)ExecutionConfig.setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategy):设置 Job 的重启策略。
以下是一个使用 Flink Java API 提交 Job 的示例代码:
import org.apache.flink.api.common.ExecutionConfig; public class FlinkJobSubmitExample { // 设置 Job 的并行度为 4 env.getConfig().setParallelism(4); // 设置自动生成 Watermark 的时间间隔为 1000 毫秒 env.getConfig().setAutoWatermarkInterval(1000); // 设置 Job 的重启策略为固定延迟重启策略,最大重启次数为 3,重启间隔为 5000 毫秒 env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); // 添加具体的业务逻辑处理 env.fromElements(1, 2, 3, 4) .map(i -> i * 2) .print(); // 提交 Job env.execute("Flink Job Submit Example"); } } |
以上代码中,首先使用 StreamExecutionEnvironment.getExecutionEnvironment() 获取 StreamExecutionEnvironment 对象,然后通过 env.getConfig() 获取 ExecutionConfig 对象进行参数配置。接着,通过 env.fromElements(1, 2, 3, 4) 创建一个数据源,然后对数据进行 map 操作并使用 print() 打印结果。最后,通过 env.execute("Flink Job Submit Example") 提交 Job。
在实际运行时,可以通过命令行工具 flink run 提交该 Job,也可以使用 Flink 提供的 API 进行提交。例如,使用 Maven 打包后,可以使用以下命令进行提交:
./bin/flink run -c com.example.FlinkJobSubmitExample /path/to/job.jar |
以上就是关于 Flink Job 提交和参数配置的理论解释、标题序号分点说明以及完整的代码案例。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。