赞
踩
目录
2.1 getExecutionEnvironment(推荐)
2.2 createLocalEnvironment (本地环境对象)
2.3 createRemoteEnvironment (远程环境对象)
2.4 createLocalEnvironmentWithWebUI (本地环境+Web监控UI)
Flink计算任务是用`特定的语言(java、scala、python、sql)`,根据业务需求编写的代码
最终需要将编写好的代码发送到可运行的环境中,计算出所需要的结果
Flink计算任务支持在多种环境中运行:
集群环境:Yarn、Standalone、K8s
本地环境(调试使用)
这就需要提交Flink计算任务时,为其找到一个`合适`的执行环境对象
语法:
通过 配置文件对象 自动获取相适应 的 执行环境对象
如果在IDE中执行 或者 作为一般的java程序执行 ,则返回 本地执行环境
如果你将 jar包提交到集群, 则返回 集群执行环境
public static StreamExecutionEnvironment getExecutionEnvironment() {
return getExecutionEnvironment(new Configuration());
}
语法:
创建 本地执行环境对象
tips:
1. 在本地的JVM中 以多线程的方式执行flink计算任务
2. 不指定并行度时,用当前机器CPU核数作为并行度
语法:
创建 远程执行环境对象
tips:
1.不指定并行度时,将使用 flink-conf.yaml 设置的并行度 parallelism.default=1
说明:
创建一个本地环境对象,并启动WEB UI监控服务,可以登入 localhost:8081 查看flink任务运行情况,这种方式在IDE开发测试时,常用
开发语言:Java1.8
Flink版本:1.17.0
- package com.baidu.datastream.env;
-
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- /*
- * TODO 思考:flink的计算任务(flink application) 可以在那些环境运行呢?
- * 1.集群环境:yarn、Standalone、K8s
- * 2.本地环境:本地的JVM中
- * */
-
- public class ExecutionEnvironment {
- public static void main(String[] args) throws Exception {
- // 方式1
- //getExecutionEnvironment();
- // 方式2
- //createLocalEnvironment();
- // 方式3
- //createRemoteEnvironment();
- // 方式4
- createLocalEnvironmentWithWebUI();
- }
-
- // 方式1(推荐): getExecutionEnvironment 方法
- public static void getExecutionEnvironment() {
- /*
- * TODO 根据运行的上下文(配置文件),来判断当前运行环境
- * 如果在IDE中执行 或者 作为一般的java程序执行 ,则返回 本地执行环境
- * 如果你将 jar包提交到集群, 则返回 集群执行环境
- *
- * */
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- System.out.println("并行度: " + env.getParallelism());
- }
-
- // 方式2(不推荐): createLocalEnvironment 方法
- public static void createLocalEnvironment() {
- /*
- * TODO 返回本地执行环境
- * */
- LocalStreamEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
- System.out.println("并行度: " + localEnv.getParallelism());
- }
-
- // 方式3(不推荐): createRemoteEnvironment 方法
- public static void createRemoteEnvironment() {
- /*
- * TODO 返回集群执行环境
- * */
- StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost"
- , 8081
- , 8
- , "../FlinkAPI/target/FlinkAPI-1.0-SNAPSHOT.jar"
- );
- System.out.println("并行度: " + remoteEnv.getParallelism());
- }
-
- // 方式4(推荐):createLocalEnvironmentWithWebUI
- public static void createLocalEnvironmentWithWebUI() throws Exception {
- /*
- * TODO 根据运行的上下文(配置文件),来判断当前运行环境
- * 如果在IDE中执行 或者 作为一般的java程序执行 ,则返回 本地执行环境
- * 如果你将 jar包提交到集群, 则返回 集群执行环境
- *
- * */
- Configuration configuration = new Configuration();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
- // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
- env.socketTextStream("localhost", 9999).print();
-
-
- // 3.触发程序执行
- env.execute();
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。