当前位置:   article > 正文

2、flink计算任务可以在哪些环境中运行_createlocalenvironmentwithwebui

createlocalenvironmentwithwebui

目录

1、说明

2、初始化 执行环境对象

2.1 getExecutionEnvironment(推荐)

2.2 createLocalEnvironment (本地环境对象)

2.3  createRemoteEnvironment (远程环境对象)

2.4 createLocalEnvironmentWithWebUI (本地环境+Web监控UI)

3、代码示例


1、说明

Flink计算任务是用`特定的语言(java、scala、python、sql)`,根据业务需求编写的代码

最终需要将编写好的代码发送到可运行的环境中,计算出所需要的结果

Flink计算任务支持在多种环境中运行:

        集群环境:Yarn、Standalone、K8s

        本地环境(调试使用)

这就需要提交Flink计算任务时,为其找到一个`合适`的执行环境对象


2、初始化 执行环境对象

2.1 getExecutionEnvironment(推荐)

语法:

通过 配置文件对象 自动获取相适应 的 执行环境对象
           如果在IDE中执行 或者 作为一般的java程序执行 ,则返回 本地执行环境
           如果你将 jar包提交到集群, 则返回 集群执行环境

public static StreamExecutionEnvironment getExecutionEnvironment() {
    return getExecutionEnvironment(new Configuration());
}


2.2 createLocalEnvironment (本地环境对象)

语法:

创建 本地执行环境对象
tips:
       1. 在本地的JVM中 以多线程的方式执行flink计算任务
       2. 不指定并行度时,用当前机器CPU核数作为并行度


2.3  createRemoteEnvironment (远程环境对象)

语法:

创建 远程执行环境对象
tips:
    1.不指定并行度时,将使用 flink-conf.yaml 设置的并行度 parallelism.default=1


2.4 createLocalEnvironmentWithWebUI (本地环境+Web监控UI)

说明:

        创建一个本地环境对象,并启动WEB UI监控服务,可以登入 localhost:8081 查看flink任务运行情况,这种方式在IDE开发测试时,常用


3、代码示例

开发语言:Java1.8

Flink版本:1.17.0

  1. package com.baidu.datastream.env;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. /*
  6. * TODO 思考:flink的计算任务(flink application) 可以在那些环境运行呢?
  7. * 1.集群环境:yarn、Standalone、K8s
  8. * 2.本地环境:本地的JVM中
  9. * */
  10. public class ExecutionEnvironment {
  11. public static void main(String[] args) throws Exception {
  12. // 方式1
  13. //getExecutionEnvironment();
  14. // 方式2
  15. //createLocalEnvironment();
  16. // 方式3
  17. //createRemoteEnvironment();
  18. // 方式4
  19. createLocalEnvironmentWithWebUI();
  20. }
  21. // 方式1(推荐): getExecutionEnvironment 方法
  22. public static void getExecutionEnvironment() {
  23. /*
  24. * TODO 根据运行的上下文(配置文件),来判断当前运行环境
  25. * 如果在IDE中执行 或者 作为一般的java程序执行 ,则返回 本地执行环境
  26. * 如果你将 jar包提交到集群, 则返回 集群执行环境
  27. *
  28. * */
  29. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  30. System.out.println("并行度: " + env.getParallelism());
  31. }
  32. // 方式2(不推荐): createLocalEnvironment 方法
  33. public static void createLocalEnvironment() {
  34. /*
  35. * TODO 返回本地执行环境
  36. * */
  37. LocalStreamEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
  38. System.out.println("并行度: " + localEnv.getParallelism());
  39. }
  40. // 方式3(不推荐): createRemoteEnvironment 方法
  41. public static void createRemoteEnvironment() {
  42. /*
  43. * TODO 返回集群执行环境
  44. * */
  45. StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
  46. "localhost"
  47. , 8081
  48. , 8
  49. , "../FlinkAPI/target/FlinkAPI-1.0-SNAPSHOT.jar"
  50. );
  51. System.out.println("并行度: " + remoteEnv.getParallelism());
  52. }
  53. // 方式4(推荐):createLocalEnvironmentWithWebUI
  54. public static void createLocalEnvironmentWithWebUI() throws Exception {
  55. /*
  56. * TODO 根据运行的上下文(配置文件),来判断当前运行环境
  57. * 如果在IDE中执行 或者 作为一般的java程序执行 ,则返回 本地执行环境
  58. * 如果你将 jar包提交到集群, 则返回 集群执行环境
  59. *
  60. * */
  61. Configuration configuration = new Configuration();
  62. StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
  63. // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
  64. env.socketTextStream("localhost", 9999).print();
  65. // 3.触发程序执行
  66. env.execute();
  67. }
  68. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/633941
推荐阅读
相关标签
  

闽ICP备14008679号