当前位置:   article > 正文

FlinkJob提交和参数配置_jobmanager.rpc.address

jobmanager.rpc.address

一、Flink Job 的提交

1.1 提交方式
Flink Job 可以通过以下两种方式进行提交:
1)使用命令行工具 flink run 提交 Job
2)使用 Flink 提供的 Java Scala API 进行提交

1.2 提交流程
1Flink Job 提交时,首先需要配置 JobManager 的地址和端口号,以便与 JobManager 建立连接。
2JobManager 接收到 Job 提交请求后,会对 Job 进行解析和验证,确保 Job 的正确性。
3JobManager 将解析后的 Job 图发送给相应的 TaskManagerTaskManager 根据 Job 图进行任务的调度和执行。

二、Flink Job 的参数配置

2.1 JobManager 参数配置
JobManager Flink Job 的调度中心,负责接收和处理 Job 的提交请求。

1jobmanager.rpc.addressJobManager 的主机地址,默认为 localhost
2jobmanager.rpc.portJobManager 的端口号,默认为 6123
3jobmanager.memory.process.sizeJobManager 进程的内存大小,默认为 1024m
4jobmanager.memory.jvm-overhead.sizeJobManager JVM 的堆外内存大小,默认为 192m
5jobmanager.memory.framework.heap.sizeJobManager 的堆内存大小,默认为 512m
6jobmanager.memory.framework.off-heap.sizeJobManager 的堆外内存大小,默认为 0m
7jobmanager.memory.network.buffer.sizeJobManager 网络缓冲区的大小,默认为 64k
8jobmanager.execution.failover-strategyJobManager 失败恢复策略,默认为 none

2.2 TaskManager 参数配置
TaskManager 负责执行 Flink Job 中的任务。

1taskmanager.memory.process.sizeTaskManager 进程的内存大小,默认为 1024m
2taskmanager.memory.jvm-overhead.sizeTaskManager JVM 的堆外内存大小,默认为 192m
3taskmanager.memory.framework.heap.sizeTaskManager 的堆内存大小,默认为 1024m
4taskmanager.memory.framework.off-heap.sizeTaskManager 的堆外内存大小,默认为 0m
5taskmanager.memory.network.buffer.sizeTaskManager 网络缓冲区的大小,默认为 64k
6taskmanager.execution.slotsTaskManager 可用的任务槽数量,默认为 1
7taskmanager.execution.memory.preallocate:是否预分配 TaskManager 内存,默认为 false

2.3 Job 参数配置
Flink Job 的参数配置主要包括并行度、时间特性、重启策略等。

1ExecutionConfig.setParallelism(int parallelism):设置 Job 的并行度。
2ExecutionConfig.setAutoWatermarkInterval(long interval):设置自动生成 Watermark 的时间间隔。
3ExecutionConfig.setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategy):设置 Job 的重启策略。

三、完整代码案例

以下是一个使用 Flink Java API 提交 Job 的示例代码:

 

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkJobSubmitExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置 Job 的并行度为 4

env.getConfig().setParallelism(4);

// 设置自动生成 Watermark 的时间间隔为 1000 毫秒

env.getConfig().setAutoWatermarkInterval(1000);

// 设置 Job 的重启策略为固定延迟重启策略,最大重启次数为 3,重启间隔为 5000 毫秒

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));

// 添加具体的业务逻辑处理

env.fromElements(1, 2, 3, 4)

   .map(i -> i * 2)

   .print();

// 提交 Job

env.execute("Flink Job Submit Example");

}

}

以上代码中,首先使用 StreamExecutionEnvironment.getExecutionEnvironment() 获取 StreamExecutionEnvironment 对象,然后通过 env.getConfig() 获取 ExecutionConfig 对象进行参数配置。接着,通过 env.fromElements(1, 2, 3, 4) 创建一个数据源,然后对数据进行 map 操作并使用 print() 打印结果。最后,通过 env.execute("Flink Job Submit Example") 提交 Job

在实际运行时,可以通过命令行工具 flink run 提交该 Job,也可以使用 Flink 提供的 API 进行提交。例如,使用 Maven 打包后,可以使用以下命令进行提交:

./bin/flink run -c com.example.FlinkJobSubmitExample /path/to/job.jar

以上就是关于 Flink Job 提交和参数配置的理论解释、标题序号分点说明以及完整的代码案例。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/457958
推荐阅读
  

闽ICP备14008679号