当前位置:   article > 正文

8W字深度解读StreamPark源码_streampark源码介绍

streampark源码介绍

本文讲解的源码基于 StreamPark 最新稳定的分支(即:dev-2.1.5)。

为了方便读者的阅读,博主简化了复杂的描述,并在文中画了多张流程图,内容都是一步一步调试出来的结果,创作非常不易,有需要转载的同学请务必声明出处!


版权声明 :

本文原创作者:杨林伟(阿甘兄)
作者博客地址:https://yanglinwei.blog.csdn.net


本文目录如下

|---- 1. 项目导入
|--------1.1 环境要求
|--------1.2 IDEA配置
|--------1.3 启动项目
|--------1.4 其它
|---- 2. 项目结构
|------------ 2.1 关系图
|------------ 2.2 模块解析
|------------ 2.2.1 parent
|------------ 2.2.2 base
|------------ 2.2.3 console
|------------ 2.2.4 flink
|------------ 2.2.5 spark
|---- 3. 表结构
|---- 4. 项目启动
|-------- 4.1 初始化
|-------- 4.2 定时任务
|------------ 4.2.1 作业指标更新
|------------ 4.2.1 作业指标更新(k8s)
|------------ 4.2.3 定时清理过期资源
|-------- 4.3 WebSocket
|---- 5. 核心源码分析
|-------- 5.1 启动Flink集群
|-------- 5.2 停止Flink集群
|-------- 5.3 作业上线
|-------- 5.4 作业启动
|-------- 5.5 作业停止
|-------- 5.6 作业映射
|-------- 5.7 作业快照
|-------- 5.8 项目构建
|-------- 5.9 核心注解
|-------- 5.10 目录
|---- 6. Q&A


1. 项目导入

1.1 环境要求

备注:maven打包时会根据配置里面的nodepnpm版本去下载,然后执行构建命令,下面的版本仅限于开发使用。

类型版本要求本地开发环境版本备注
JDKJDK 1.8+JDK 11
Maven-3.9.0
Scala2.12.x2.12.8
Nodejs16.14.x ~ 18v18.12.0https://nodejs.org/dist/v18.12.0/node-v18.12.0.pkg
pnpm7.11.29.1.4sudo npm install -g pnpm@9.1.4

1.2 IDEA配置

JDK配置截图:

在这里插入图片描述

Scala配置:

在这里插入图片描述

maven配置:(这里选择本地的maven仓库进行打包,不使用默认的)

在这里插入图片描述

插件配置:

在这里插入图片描述

打包配置(选scala-2.12版本,因为flink1.14.0之后就不支持scala-2.11版本了。shaded用于解决依赖冲突统一依赖。webapp是前端的包。dist是把压缩包打到dist目录):

在这里插入图片描述

1.3 启动项目

当然,启动项目之前,可能会出现不少的编译问题,为了不影响整体流程,我把问题和解决都放在文中的最后了。

按上图进行打包,打包后,会在dist目录有一个压缩文件,解压:
在这里插入图片描述

配置启动程序,Add VM Options:(即:-Dapp.home=“dist目录的全路径”),指定这个目录的目的是调试程序时,可以方便地动态加载已经编译好的jar以及配置文件等。具体操作方式可以参考官网教程:https://streampark.apache.org/docs/development/development

在这里插入图片描述

1.3.1 启动后端

启动StreamParkConsoleBootstrap类:

在这里插入图片描述

1.3.2 启动前端

进入streampark-console-webapp目录,执行如下命令:

pnpm install
  • 1

启动前端:


npm run dev
  • 1
  • 2

启动成功:

在这里插入图片描述

1.4 其它

清除node相关命令:

# 清空pnpm、npm、node、nvm等
sudo rm -rf /Users/yanglinwei/.nvm/versions/node/v18.12.0/lib/node_modules/pnpm
sudo npm uninstall -g pnpm
nvm deactivate
# nvm uninstall --all
nvm uninstall 17.9.1
nvm unload
rm -rf ~/.nvm

## 卸载node
sudo rm -rf /usr/local/lib/node_modules
sudo rm /usr/local/bin/node
sudo rm /usr/local/bin/npm
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2. 项目结构

2.1 关系图

在这里插入图片描述

2.2 模块解析

2.2.1 parent

streampark-parent

  • 子模块:common、shaded、flink、spark、console
  • 依赖:force-shading(依赖冲突处理)、scala…
  • 插件:build-helper、compiler、shade(打包和重定位依赖,避免冲突)、apache-rat(审核项目的许可证合规性)、checkstyle(代码质量检测)、spotless(格式化代码)、denpendency-check(漏洞检测)
  • 描述:maven父节点

2.2.2 base

streampark-common

  • 子模块:无
  • 依赖:junit、enumeratum(scala枚举库)、jedis、HikariCP、大数据相关(habase-client、hadoop-client)…
  • 插件:build-helper、scala、shade
  • 描述:公共依赖库

streampark-shaded

  • 子模块:Jackson、slf4j
  • 依赖:slf4j相关、logback相关
  • 插件:shade
  • 描述:公共依赖,解决日志以及jackson依赖冲突(重新定义包路径)

2.2.3 console

streampark-console

  • 子模块:service、webapp
  • 依赖:无
  • 插件:无
  • 描述:StreamPark控制台(API&前端服务)

streampark-console-service

  • 子模块:无
  • 依赖:scala相关、springboot相关、docker客户端相关、数据库相关(mysql、h2、pg)、通用库(yml、lombok、caffeine等)、streampark其它模块
  • 插件:compiler、surefire(测试)、clean、frontend(前端插件)、resources、dependency、assembly(装配)、antrun(ant运行插件)
  • 描述:StreamPark控制台(后端API服务)

streampark-console-webapp

  • 子模块:无
  • 依赖:ant-…(Ant Design 颜色、图标库)、iconify(通用图标库)、vue-…(vue相关)、axios(http请求)、lodash-es(js函数库)、monaco-editor(代码编辑器)、penpal(跨窗口通信)、sweetalert2(美观弹框)…
  • 插件:启动和开发(bootstrap、serve、dev等),构建(build、report)、清理(clean)、代码质量(type、lint)、测试(test)、生成(log、gen、reinstall)
  • 描述:StreamPark控制台(前端服务)

2.2.4 flink

streampark-flink

  • 子模块:shims、core、connector、sqlclient、udf、client、proxy、packer、kubernetes
  • 依赖:scalatest,junit
  • 插件:scala
  • 描述:StreamPark 相关Flink

streampark-flink-core

  • 子模块:无

  • 依赖:StreamPark相关(common、flink-shims)、flink相关(core、scala、table、connector、RocksDB等)

  • 插件:无

  • 描述:flink相关核心依赖

streampark-flink-shims

  • 子模块:base、test、flink-1.12→flink-1.14(支持scala-1.11)、flink-1.15→flink-1.19(仅支持scala-1.12)
  • 依赖:无
  • 插件:无
  • 描述:flink-shims父模块

streampark-flink-shims_flink-*

  • 子模块:无
  • 依赖:streampark-flink-shims-base、flink相关(core、scala、table、RocksDB等)
  • 插件:shade
  • 描述:不同的flink版本,一般会根据选择的flink的版本加载对应的flink jar进类加载器,动态切换。

streampark-flink-kubernetes

  • 子模块:无
  • 依赖:StreamPark相关(common、flink-shims)、flink相关(clients、kubernetes)、基础(http、caffeine、guava、lombok等)
  • 插件:build-helper(构建辅助)
  • 描述:flink-k8s相关

streampark-flink-udf

  • 子模块:无
  • 依赖:streampark-common、flink-table-common
  • 插件:scala
  • 描述:函数相关

streampark-flink-connector

  • 子模块:base、clickhouse、doris、hbase、http、elasticsearch、influx、jdbc、kafka、mongo
  • 依赖:无
  • 插件:无
  • 描述:连接器相关

streampark-flink-connector-*

  • 子模块:无
  • 依赖:streampark相关(connector_base、shims_flink)、flink相关(core、scala、table-common等)
  • 插件:无
  • 描述:具体连接器

streampark-flink-packer

  • 子模块:无
  • 依赖:streampark相关(common、kubernetes),shade(打包),aether-…(依赖管理),docker客户端相关
  • 插件:build-helper(构建辅助)
  • 描述:利用 Maven Shade 插件和 Aether 进行依赖管理和打包

streampark-flink-sqlclient

  • 子模块:无
  • 依赖:scala相关(枚举类型库、),streampark相关(core、shims等),flink相关(table、scala、uber等)
  • 插件:scala、shade
  • 描述:sql客户端(执行程序)

streampark-flink-client

  • 子模块:api、core
  • 依赖:无
  • 插件:无
  • 描述:flink提交客户端

streampark-flink-proxy

  • 子模块:无
  • 依赖:streampark-common
  • 插件:无
  • 描述:代理提交客户端,切换到指定flink环境的classloader,invoke对应的方法。

2.2.5 spark

Core、CLI、Connector(还没支持Spark,待补充…)

3. 表结构

类别表名描述
RBACt_role角色信息,包括角色名称、描述等
t_menu菜单和按钮的信息,包括菜单名称、路径、组件、权限ID、图标、类型、显示状态、排序等
t_role_menu角色与菜单的关联关系
t_user用户信息,登录账号密码登
t_team团队信息,包括团队名称、描述等
t_member团队成员信息,包括团队ID、用户ID、角色ID等
认证授权t_access_token用户的访问令牌信息,包括令牌内容、过期时间和状态等
通知告警t_alert_config告警配置,包括告警类型、告警参数(如邮件、短信、钉钉、微信等)
t_message消息通知,包括应用ID、用户ID、消息类型、标题、内容等
系统设置t_setting系统设置,包括设置键、设置值、名称、描述、类型等
t_external_link外部链接的信息,包括标签名称、标签颜色、链接URL等
t_variable变量管理相关字段
t_yarn_queueyarn队列管理相关字段
t_app_backup应用的备份信息,包括应用ID、SQL ID、配置ID、版本、路径、描述等
作业上线t_app_build_pipe应用构建流水线的信息,包括流水线类型、状态、步骤、错误信息等
作业管理t_flink_clusterFlink集群的信息,包括集群地址、集群ID、集群名称、执行模式、版本、命名空间、描述等
t_flink_envFlink环境的信息,包括环境名称、Flink路径、版本、Scala版本、配置、描述等
t_flink_projectFlink项目的信息,包括团队ID、项目名称、URL、分支、用户名、密码、构建参数、类型、描述等
t_flink_appFlink应用的信息,包括应用名称、执行模式、资源来源、模块、JAR包、参数、依赖、状态等
t_flink_effectiveFlink生效的配置或SQL信息,包括应用ID、目标类型、目标ID等
t_flink_configFlink配置,包括应用ID、配置内容、格式、版本等
t_flink_sqlFlink SQL信息,包括应用ID、SQL内容、依赖、版本、候选状态等
t_flink_savepointFlink应用的保存点信息,包括应用ID、检查点ID、类型、路径、触发时间等
t_flink_logFlink应用的日志信息,包括应用ID、Yarn应用ID、Job Manager URL、是否成功、异常信息等

4. 项目启动

4.1 初始化

StreamParkConsoleBootstrap:启动类初始化properties

@Slf4j
@SpringBootApplication
@EnableScheduling
public class StreamParkConsoleBootstrap extends SpringBootServletInitializer {

  public static void main(String[] args) throws Exception {
    new SpringApplicationBuilder()
        .properties(SpringProperties.get()) // 这里初始化了全局配置
        .sources(StreamParkConsoleBootstrap.class)
        .run(args);
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

SpringProperties:全局配置,使用application.yml(过时)或config.yaml

/**
 * 初始化配置
 *
 * @author : YangLinWei
 * @createTime: 2024/6/11 13:40
 * @version: 1.0.0
 */
public class SpringProperties {

  private static final Logger log = LoggerFactory.getLogger(SpringProperties.class);

  public static Properties get() {
    File oldConfig = getOldConfig();
    if (oldConfig != null) { // 使用旧的配置(application.yml)
      log.warn(
          "in the \"conf\" directory, found the \"application.yml\" file. The \"application.yml\" file is deprecated. "
              + "For compatibility, this \"application.yml\" file will be used preferentially. The latest configuration file is \"config.yaml\". "
              + "It is recommended to use \"config.yaml\". Note: \"application.yml\" will be completely deprecated in version 2.2.0. ");
      SystemPropertyUtils.set("spring.config.location", oldConfig.getAbsolutePath());
      return new Properties();
    } else {  // 使用新的配置(config.yaml)
      // 1) 获取用户信息
      Properties userConfig = getUserConfig();
      // 2) 获取spring配置
      Properties springConfig = getSpringConfig();
      // 3) 合并配置
      mergeConfig(userConfig, springConfig);
      // 4) 数据库连接配置
      dataSourceConfig(userConfig, springConfig);
      // 5) 配置设置进System Property
      springConfig.forEach((k, v) -> SystemPropertyUtils.set(k.toString(), v.toString()));
      return springConfig;
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

EnvInitializer:初始化工作目录(如:uploads、jars、shims等)

/**
 * 初始化监听
 * 
 * @author : YangLinWei
 * @createTime: 2024/6/11 13:50
 * @version: 1.0.0
 */
@Order
@Slf4j
@Component
public class EnvInitializer implements ApplicationRunner {

  @Override
  public void run(ApplicationArguments args) throws Exception {
    Optional<String> profile =
        Arrays.stream(context.getEnvironment().getActiveProfiles()).findFirst();
    if ("test".equals(profile.orElse(null))) {
      return;
    }

    String appHome = WebUtils.getAppHome();
    if (appHome == null) {
      throw new ExceptionInInitializerError(
          String.format(
              "[StreamPark] System initialization check failed,"
                  + " The system initialization check failed. If started local for development and debugging,"
                  + " please ensure the -D%s parameter is clearly specified,"
                  + " more detail: https://streampark.apache.org/docs/user-guide/development",
              ConfigConst.KEY_APP_HOME()));
    }

    // 初始化系统环境配置
    initInternalConfig(context.getEnvironment());
    // 重置默认的HADOOP_USER_NAME
    String hadoopUserName = InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME());
    overrideSystemProp(ConfigConst.KEY_HADOOP_USER_NAME(), hadoopUserName);
    // 初始化本地系统资源
    storageInitialize(LFS);
  }
  
  -------
  public synchronized void storageInitialize(StorageType storageType) {

    if (initialized.contains(storageType)) {
      return;
    }

    final String mkdirLog = "storage initialize, now mkdir [{}] starting ...";

    FsOperator fsOperator = FsOperator.of(storageType);
    Workspace workspace = Workspace.of(storageType);

    // 1. 创建工作目录(支持本地和hdfs,目前仅支持本地)
    if (storageType.equals(LFS)) {
      String localDist = Workspace.APP_LOCAL_DIST();
      if (!fsOperator.exists(localDist)) {
        log.info(mkdirLog, localDist);
        fsOperator.mkdirs(localDist);
      }
    }

    // 创建uploads目录
    String appUploads = workspace.APP_UPLOADS();
    if (!fsOperator.exists(appUploads)) {
      log.info(mkdirLog, appUploads);
      fsOperator.mkdirs(appUploads);
    }

    // 创建workspace目录
    String appWorkspace = workspace.APP_WORKSPACE();
    if (!fsOperator.exists(appWorkspace)) {
      log.info(mkdirLog, appWorkspace);
      fsOperator.mkdirs(appWorkspace);
    }

    // 创建backups目录
    String appBackups = workspace.APP_BACKUPS();
    if (!fsOperator.exists(appBackups)) {
      log.info(mkdirLog, appBackups);
      fsOperator.mkdirs(appBackups);
    }

    // 创建savepoints目录
    String appSavePoints = workspace.APP_SAVEPOINTS();
    if (!fsOperator.exists(appSavePoints)) {
      log.info(mkdirLog, appSavePoints);
      fsOperator.mkdirs(appSavePoints);
    }

    // 创建jars目录
    String appJars = workspace.APP_JARS();
    if (!fsOperator.exists(appJars)) {
      log.info(mkdirLog, appJars);
      fsOperator.mkdirs(appJars);
    }

    // 2. 上传jar包.
    // 2.1) 上传客户端jar包至client目录
    File client = WebUtils.getAppClientDir();
    Utils.required(
        client.exists() && client.listFiles().length > 0,
        client.getAbsolutePath().concat(" is not exists or empty directory "));

    String appClient = workspace.APP_CLIENT();
    fsOperator.mkCleanDirs(appClient);

    for (File file : client.listFiles(fileFilter)) {
      log.info("load client:{} to {}", file.getName(), appClient);
      fsOperator.upload(file.getAbsolutePath(), appClient);
    }

    // 2.2) 上传 shims jar至shims目录
    File[] shims =
        WebUtils.getAppLibDir()
            .listFiles(pathname -> pathname.getName().matches(PATTERN_FLINK_SHIMS_JAR.pattern()));

    Utils.required(shims != null && shims.length > 0, "streampark-flink-shims jar not exist");

    String appShims = workspace.APP_SHIMS();
    fsOperator.delete(appShims);

    for (File file : shims) {
      Matcher matcher = PATTERN_FLINK_SHIMS_JAR.matcher(file.getName());
      if (matcher.matches()) {
        String version = matcher.group(1);
        String shimsPath = appShims.concat("/flink-").concat(version);
        fsOperator.mkdirs(shimsPath);
        log.info("load shims:{} to {}", file.getName(), shimsPath);
        fsOperator.upload(file.getAbsolutePath(), shimsPath);
      }
    }

    // 2.3) 创建本地maven仓库目录(Users/xxx/.m2/repository)
    String localMavenRepo = Workspace.MAVEN_LOCAL_PATH();
    if (FsOperator.lfs().exists(localMavenRepo)) {
      FsOperator.lfs().mkdirs(localMavenRepo);
    }

    initialized.add(storageType);
  }   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140

4.2 定时任务

4.2.1 作业指标更新

FlinkAppHttpWatcher#start:定时1秒查询一次,查询方式分为flink rest apiyarn rest api)。

 /**
   * 每秒查询一次
   * 
   * @author : YangLinWei
   * @createTime: 2024/6/11 14:10
   * @version: 1.0.0
   */
 @Scheduled(fixedDelay = 1000)
  public void start() {
    // The application has been started at the first time, or the front-end is operating start/stop,
    // need to return status info immediately.
    if (lastWatchingTime == null || !OPTIONING.isEmpty()) {
      watch();
    } else if (System.currentTimeMillis() - lastOptionTime <= OPTION_INTERVAL) {
      // The last operation time is less than option interval.(10 seconds)
      watch();
    } else if (System.currentTimeMillis() - lastWatchingTime >= WATCHING_INTERVAL) {
      // Normal information obtain, check if there is 5 seconds interval between this time and the
      // last time.(once every 5 seconds)
      watch();
    }
  }

  private void watch() {
    lastWatchingTime = System.currentTimeMillis();
    for (Application application : WATCHING_APPS.values()) {
      watchExecutor.execute(
          () -> {
            try {
              // 从rest api查询
              getFromFlinkRestApi(application);
              cleanupLost(application);
            } catch (Exception flinkException) {
              // query status from yarn rest api
              try {
                // 从yarn rest api查询  
                getFromYarnRestApi(application);
                cleanupLost(application);
              } catch (Exception yarnException) {
                doStateFailed(application);
              }
            }
          });
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

根据返回的作业概览详情,更新指标:

private void handleJobOverview(Application application, JobsOverview.Job jobOverview)
    throws IOException {
  // 更新作业启动实际时间
  long startTime = jobOverview.getStartTime();
  long endTime = jobOverview.getEndTime();
  if (application.getStartTime() == null || startTime != application.getStartTime().getTime()) {
    application.setStartTime(new Date(startTime));
  }
  if (endTime != -1) {
    if (application.getEndTime() == null || endTime != application.getEndTime().getTime()) {
      application.setEndTime(new Date(endTime));
    }
  }

  application.setJobId(jobOverview.getId());
  application.setDuration(jobOverview.getDuration());
  application.setTotalTask(jobOverview.getTasks().getTotal());
  application.setOverview(jobOverview.getTasks());

  // 更新作业资源占用情况
  if (STARTING_CACHE.getIfPresent(application.getId()) != null) {
    Overview override = httpOverview(application);
    if (override != null && override.getSlotsTotal() > 0) {
      application.setTotalTM(override.getTaskmanagers());
      application.setTotalSlot(override.getSlotsTotal());
      application.setAvailableSlot(override.getSlotsAvailable());
    }
    STARTING_CACHE.invalidate(application.getId());
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

处理最新的Checkpoint(是否保存为savepoint或触发告警或重启):

private void handleCheckPoints(Application application) throws Exception {
  CheckPoints checkPoints = httpCheckpoints(application);
  if (checkPoints != null) {
    checkpointProcessor.process(application, checkPoints);
  }
}

private void process(Application application, @Nonnull CheckPoints.CheckPoint checkPoint) {
    // 获取应用程序的JobID和应用ID
    String jobID = application.getJobId();
    Long appId = application.getId();

    // 获取当前检查点的状态
    CheckPointStatus status = checkPoint.getCheckPointStatus();
    
    // 创建一个检查点键,用于缓存和标识
    CheckPointKey checkPointKey = new CheckPointKey(appId, jobID, checkPoint.getId());

    // 如果检查点状态为完成
    if (CheckPointStatus.COMPLETED.equals(status)) {
        // 检查是否应该将其保存为保存点
        if (checkSaveAsSavepoint(checkPointKey, checkPoint)) {
            // 将保存点信息存入缓存并保存到数据库
            savepointedCache.put(checkPointKey.getSavePointId(), DEFAULT_FLAG_BYTE);
            saveSavepoint(checkPoint, application.getId());
            flinkAppHttpWatcher.cleanSavepoint(application);
            return;
        }

        // 获取最新的检查点ID
        Long latestChkId = getLatestCheckpointedId(appId, checkPointKey.getCheckPointId());
        // 检查是否应该将其保存为检查点
        if (checkSaveAsCheckpoint(checkPoint, latestChkId)) {
            checkPointCache.put(checkPointKey.getCheckPointId(), checkPoint.getId());
            saveSavepoint(checkPoint, application.getId());
        }
    } else if (shouldProcessFailedTrigger(checkPoint, application.cpFailedTrigger(), status)) {
        // 处理失败的检查点
        Counter counter = checkPointFailedCache.get(appId);
        if (counter == null) {
            // 初始化失败计数器
            checkPointFailedCache.put(appId, new Counter(checkPoint.getTriggerTimestamp()));
        } else {
            // 计算失败时间间隔
            long minute = counter.getDuration(checkPoint.getTriggerTimestamp());
            // 如果在指定时间内失败次数超过阈值
            if (minute <= application.getCpFailureRateInterval()
                && counter.getCount() >= application.getCpMaxFailureInterval()) {
                // 从缓存中移除该应用的失败记录
                checkPointFailedCache.remove(appId);
                // 获取故障恢复策略
                FailoverStrategy failoverStrategy = FailoverStrategy.of(application.getCpFailureAction());
                if (failoverStrategy == null) {
                    throw new IllegalArgumentException(
                        "Unexpected cpFailureAction: " + application.getCpFailureAction());
                }
                // 根据恢复策略采取相应行动
                switch (failoverStrategy) {
                    case ALERT: // 告警
                        alertService.alert(application, CheckPointStatus.FAILED);
                        break;
                    case RESTART: // 重启
                        try {
                            applicationService.restart(application);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                        break;
                    default:
                        break;
                }
            } else {
                // 失败次数加一
                counter.increment();
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

对过时保存点(obsolete savepoint)和需要启动(NEED_START)检查点的处理逻辑:

private void handleRunningState(
    Application application, OptionState optionState, FlinkAppState currentState) {
  /*
   if the last recorded state is STARTING and the latest state obtained this time is RUNNING,
   which means it is the first tracking after restart.
   Then: the following the job status needs to be updated to the restart status:
   NEED_RESTART_AFTER_CONF_UPDATE (Need to restart  after modified configuration)
   NEED_RESTART_AFTER_SQL_UPDATE (Need to restart  after modified flink sql)
   NEED_RESTART_AFTER_ROLLBACK (Need to restart after rollback)
   NEED_RESTART_AFTER_DEPLOY (Need to rollback after deploy)
  */
  Long appId = application.getId();
  if (OptionState.STARTING.equals(optionState)) {
    Application latestApp = WATCHING_APPS.get(appId);
    ReleaseState releaseState = latestApp.getReleaseState();
    switch (releaseState) {
      case NEED_RESTART:
      case NEED_ROLLBACK:
        LambdaUpdateWrapper<Application> updateWrapper =
            new LambdaUpdateWrapper<Application>()
                .eq(Application::getId, appId)
                .set(Application::getRelease, ReleaseState.DONE.get());
        applicationService.update(updateWrapper);
        break;
      default:
        break;
    }
  }

  // The current state is running, and there is a current task in the savePointCache,
  // indicating that the task is doing savepoint
  if (SAVEPOINT_CACHE.getIfPresent(appId) != null) {
    application.setOptionState(OptionState.SAVEPOINTING.getValue());
  } else {
    application.setOptionState(OptionState.NONE.getValue());
  }
  application.setState(currentState.getValue());
  doPersistMetrics(application, false);
  cleanOptioning(optionState, appId);
}


private void handleNotRunState(
    Application application,
    OptionState optionState,
    FlinkAppState currentState,
    StopFrom stopFrom)
    throws Exception {
  switch (currentState) {
    case CANCELLING:
      CANCELING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
      cleanSavepoint(application);
      application.setState(currentState.getValue());
      doPersistMetrics(application, false);
      break;
    case CANCELED:
      log.info(
          "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job state {}, stop tracking and delete stopFrom!",
          currentState.name());
      cleanSavepoint(application);
      application.setState(currentState.getValue());
      if (StopFrom.NONE.equals(stopFrom) || applicationService.checkAlter(application)) {
        if (StopFrom.NONE.equals(stopFrom)) {
          log.info(
              "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job cancel is not form StreamPark,savePoint expired!");
          savePointService.expire(application.getId());
        }
        stopCanceledJob(application.getId());
        alertService.alert(application, FlinkAppState.CANCELED);
      }
      STOP_FROM_MAP.remove(application.getId());
      doPersistMetrics(application, true);
      cleanOptioning(optionState, application.getId());
      break;
    case FAILED:
      cleanSavepoint(application);
      STOP_FROM_MAP.remove(application.getId());
      application.setState(FlinkAppState.FAILED.getValue());
      doPersistMetrics(application, true);
      alertService.alert(application, FlinkAppState.FAILED);
      applicationService.start(application, true);
      break;
    case RESTARTING:
      log.info(
          "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job state {},add to starting",
          currentState.name());
      STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
      break;
    default:
      application.setState(currentState.getValue());
      doPersistMetrics(application, false);
      break;
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94

4.2.1 作业指标更新(k8s)

FlinkK8sWatcherWrapper#registerFlinkK8sWatcher:注册运行在k8s的作业状态监听

@Bean(destroyMethod = "close")
public FlinkK8sWatcher registerFlinkK8sWatcher() {
  // 启动监听
  FlinkK8sWatcher flinkK8sWatcher =
      FlinkK8sWatcherFactory.createInstance(FlinkTrackConfig.fromConfigHub(), true);
  initFlinkK8sWatcher(flinkK8sWatcher);

  /* Dev scaffold: watch flink k8s tracking cache,
     see org.apache.streampark.flink.kubernetes.helper.KubernetesWatcherHelper for items.
     Example:
         KubernetesWatcherHelper.watchTrackIdsCache(flinkK8sWatcher);
         KubernetesWatcherHelper.watchJobStatusCache(flinkK8sWatcher);
         KubernetesWatcherHelper.watchAggClusterMetricsCache(flinkK8sWatcher);
         KubernetesWatcherHelper.watchClusterMetricsCache(flinkK8sWatcher);
  */
  return flinkK8sWatcher;
}

// -------- 初始化观察者(EventBus模式)------------
private void initFlinkK8sWatcher(@Nonnull FlinkK8sWatcher trackMonitor) {
  // 注册事件监听观察者(观察者接收到消息后,处理作业指标)
  trackMonitor.registerListener(flinkK8sChangeEventListener);
  // 获取监听的作业,并开始监听
  List<TrackId> k8sApp = getK8sWatchingApps();
  k8sApp.forEach(trackMonitor::doWatching);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

FlinkJobStatusWatcher#doWatch:监听的逻辑, eventBus.postSync推送作业指标信息

  /** 单个 Flink 作业状态跟踪任务 */
  override def doWatch(): Unit = {
    this.synchronized {
      // 获取所有合法的跟踪 ID
      val trackIds = Try(watchController.getAllWatchingIds())
        .filter(_.nonEmpty)
        .getOrElse(return
        )

      // 1) k8s application 模式
      val appFuture: Set[Future[Option[JobStatusCV]]] =
        trackIds.filter(_.executeMode == FlinkK8sExecuteMode.APPLICATION).map {
          id =>
            val future = Future(touchApplicationJob(id))
            future.onComplete(_.getOrElse(None) match {
              case Some(jobState) =>
                updateState(id.copy(jobId = jobState.jobId), jobState)
              case _ =>
            })
            future
        }

      // 2) k8s session 模式
      val sessionIds = trackIds.filter(_.executeMode == FlinkK8sExecuteMode.SESSION)
      val sessionCluster = sessionIds.groupBy(_.toClusterKey.toString).flatMap(_._2).toSet
      val sessionFuture = sessionCluster.map {
        trackId =>
          val future = Future(touchSessionAllJob(trackId))
          future.onComplete(_.toOption match {
            case Some(map) =>
              map.find(_._1.jobId == trackId.jobId) match {
                case Some(job) =>
                  updateState(job._1.copy(appId = trackId.appId), job._2)
                case _ =>
                  touchSessionJob(trackId) match {
                    case Some(state) =>
                      if (FlinkJobState.isEndState(state.jobState)) {
                        // can't find that job in the k8s cluster.
                        watchController.unWatching(trackId)
                      }
                      //. 推送作业状态消息
                      eventBus.postSync(FlinkJobStatusChangeEvent(trackId, state))
                    case _ =>
                  }
              }
            case _ =>
          })
          future
      }

      // 阻塞,直到所有 future 完成或超时。
      Try(Await.ready(Future.sequence(appFuture), conf.requestTimeoutSec seconds)).failed.map {
        _ =>
          logWarn(
            s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes native application mode timeout," +
              s" limitSeconds=${conf.requestTimeoutSec}," +
              s" trackIds=${trackIds.mkString(",")}")
      }

      Try(Await.ready(Future.sequence(sessionFuture), conf.requestTimeoutSec seconds)).failed.map {
        _ =>
          logWarn(
            s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes native session mode timeout," +
              s" limitSeconds=${conf.requestTimeoutSec}," +
              s" trackIds=${trackIds.mkString(",")}")
      }
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

FlinkK8sChangeEventListener#subscribeJobStatusChange

 /** 捕获作业状态,并持久化指标到数据库*/
  @SuppressWarnings("UnstableApiUsage")
  @AllowConcurrentEvents
  @Subscribe
  public void subscribeJobStatusChange(FlinkJobStatusChangeEvent event) {
    JobStatusCV jobStatus = event.jobStatus();
    TrackId trackId = event.trackId();
    // 获取应用记录
    Application app = applicationService.getById(trackId.appId());
    if (app == null) {
      return;
    }

    // 更新应用记录
    setByJobStatusCV(app, jobStatus);

    applicationService.persistMetrics(app);

    FlinkAppState state = app.getFlinkAppStateEnum();
    // 如果必要时,邮箱告警
    if (FlinkAppState.FAILED.equals(state)
        || FlinkAppState.LOST.equals(state)
        || FlinkAppState.RESTARTING.equals(state)
        || FlinkAppState.FINISHED.equals(state)) {
      alertService.alert(app, state);
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

4.2.3 定时清理过期资源

PackerResourceGCTask#collectGarbage

@Slf4j
@Component
public class PackerResourceGCTask {

  @Value("${streampark.packer-gc.max-resource-expired-hours:120}")
  public Integer maxResourceIntervalHours;

  // 每6小时执行一次
  @Scheduled(cron = "${streampark.packer-gc.exec-cron:0 0 0/6 * * ?}")
  public void collectGarbage() {
    log.info("[streampark-packer] Starting Packer Resource GC Task.");
    PackerResourceGC.startGc(maxResourceIntervalHours);
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

PackerResourceGC#startGc:

/**
   * 开始一个资源垃圾收集过程,清理过期的构建资源。
   *
   * @param expiredHours 资源的预期过期时间(小时)。
   */
  def startGc(expiredHours: Integer): Unit = {
    val appWorkspace = new File(appWorkspacePath)
    // 如果工作空间目录不存在,则退出方法
    if (!appWorkspace.exists()) return
    
    // 计算过期资源的时间阈值(当前时间减去过期时间)
    val evictedBarrier = System.currentTimeMillis - expiredHours * 3600 * 1000

    // 查找应被清除的 flink 构建路径,根据文件名模式匹配
    val evictedFiles = appWorkspace.listFiles
      .filter(_.isDirectory)
      .filter(_.getName.contains("@"))
      .flatMap(findLastModifiedOfSubFile)
      .filter(_._2 < evictedBarrier)
      .map(_._1)

    // 如果没有需要清除的文件,则退出方法
    if (evictedFiles.isEmpty) return
    
    // 记录将要删除的文件日志
    logInfo(s"删除过期的构建资源: ${evictedFiles.mkString(", ")}")
    
    // 尝试删除这些目录,记录任何发生的错误
    evictedFiles.foreach(path => Try(FileUtils.deleteDirectory(path)))
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

4.3 WebSocket

WebSocketEndpoint#pushNotice:主要用于推送消息

@Slf4j
@Component
@ServerEndpoint(value = "/websocket/{id}")
public class WebSocketEndpoint {

  private static final Map<String, Session> SOCKET_SESSIONS = new CopyOnWriteMap<>();

  @Getter private String id;

  @Getter private Session session;

  @OnOpen
  public void onOpen(Session session, @PathParam("id") String id) {
    log.debug("websocket onOpen....");
    this.id = id;
    this.session = session;
    SOCKET_SESSIONS.put(id, session);
  }

  @OnClose
  public void onClose() throws IOException {
    log.debug("websocket onClose....");
    this.session.close();
    SOCKET_SESSIONS.remove(this.id);
  }

  @OnError
  public void onError(Session session, Throwable e) {
    log.error(e.getMessage(), e);
  }

  public static void writeMessage(String socketId, String message) {
    try {
      Session session = SOCKET_SESSIONS.get(socketId);
      if (session != null) {
        session.getBasicRemote().sendText(message);
      }
    } catch (IOException e) {
      log.error(e.getMessage(), e);
    }
  }

  public static void pushNotice(Message message) {
    try {
      Session session = SOCKET_SESSIONS.get(message.getUserId().toString());
      if (session != null) {
        session.getBasicRemote().sendObject(message);
      }
    } catch (Exception e) {
      log.error(e.getMessage(), e);
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

5. 核心源码分析

5.1 启动Flink集群

备注:Standalone模式不支持启动集群方法。

在这里插入图片描述

流程描述

  1. FlinkClusterController启动flink集群;
  2. FlinkClusterService启动Flink集群之前,如果当前为Yarn Session集群,使用yarn客户端校验应用是否存在,使用“状态+类型+标签”来判断应用(要保证唯一);
  3. 上一步骤校验没问题,构建请求参数(不同部署模式构建的请求内容不同),调用FlinkClient的deploy方法;
  4. Flink Client根据请求参数,判断当前的Flink版本去生成不同的类加载器(加载不同版本Flink版本lib目录的jar);
  5. 上一步骤的类加载器继承URLClassLoader(注意:类加载器里面有对应了“child-first”和“parent-first”选项,这样可以以不同的类型去加载不同的class);
  6. 接着,切换前面步骤生成的类加载器,然后并使用反射的方式invoke deploy方法;
  7. 最后会进入FlinkClientHelper的deploy,会根据不同的执行模式(yarn session or kubernetes session)调用对应的deploy实现(YarnSessionClientKubernetesNativeSessionClient);

具体的实现下面继续讲解。

5.1.1 yarn session

界面配置如下:

在这里插入图片描述

5.1.1.1 核心逻辑

主要核心的逻辑在:YarnSessionClient类

  def deploy(deployRequest: DeployRequest): DeployResponse = {
    logInfo(
      s"""
         |--------------------------------------- flink yarn sesion start ---------------------------------------
         |    userFlinkHome    : ${deployRequest.flinkVersion.flinkHome}
         |    flinkVersion     : ${deployRequest.flinkVersion.version}
         |    execMode         : ${deployRequest.executionMode.name()}
         |    clusterId        : ${deployRequest.clusterId}
         |    properties       : ${deployRequest.properties.mkString(" ")}
         |-------------------------------------------------------------------------------------------
         |""".stripMargin)
    var clusterDescriptor: YarnClusterDescriptor = null
    var client: ClusterClient[ApplicationId] = null
    try {
      // 获取flink配置
      val flinkConfig =
        extractConfiguration(deployRequest.flinkVersion.flinkHome, deployRequest.properties)

      // 获取部署配置(kerberos认证、flink相关(ship file、dist jar、config dir等))
      deployClusterConfig(deployRequest, flinkConfig)

      val yarnClusterDescriptor = getSessionClusterDeployDescriptor(flinkConfig)
      clusterDescriptor = yarnClusterDescriptor._2
      if (StringUtils.isNotBlank(deployRequest.clusterId)) {
        try {
          val applicationStatus =
            clusterDescriptor.getYarnClient
              .getApplicationReport(ConverterUtils.toApplicationId(deployRequest.clusterId))
              .getFinalApplicationStatus
          if (FinalApplicationStatus.UNDEFINED.equals(applicationStatus)) {
            // application is running
            val yarnClient = clusterDescriptor
              .retrieve(ApplicationId.fromString(deployRequest.clusterId))
              .getClusterClient
            if (yarnClient.getWebInterfaceURL != null) {
              return DeployResponse(yarnClient.getWebInterfaceURL, yarnClient.getClusterId.toString)
            }
          }
        } catch {
          case e: Exception => return DeployResponse(error = e)
        }
      }
      
      // 获取Yarn Session集群
      val clientProvider = clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
      client = clientProvider.getClusterClient
      if (client.getWebInterfaceURL != null) {
        DeployResponse(
          address = client.getWebInterfaceURL,
          clusterId = client.getClusterId.toString)
      } else {
        DeployResponse(error = new RuntimeException("get the cluster getWebInterfaceURL failed."))
      }
    } catch {
      case e: Exception => DeployResponse(error = e)
    } finally {
      Utils.close(client, clusterDescriptor)
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

5.1.2 k8s Session

界面配置如下:

在这里插入图片描述

5.1.2.1 RBAC配置

启动k8s session集群之前,需要配置好RBAC,这里是相关的配置步骤。

Step1 :创建命名空间

kubectl create namespace streampark
  • 1

Step2 :创建服务账号

kubectl -n streampark create serviceaccount flink-service-account
  • 1

Step3:创建角色

kubectl apply -f flink-cluster-role.yaml
  • 1

flink-cluster-role.yaml内容下:

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: flink-cluster-role
rules:
- apiGroups: [""]
  resources: ["pods", "services", "configmaps", "persistentvolumeclaims", "events", "secrets"]
  verbs: ["create", "delete", "get", "list", "patch", "update", "watch"]
- apiGroups: ["apps"]
  resources: ["deployments", "statefulsets"]
  verbs: ["create", "delete", "get", "list", "patch", "update", "watch"]
- apiGroups: ["batch"]
  resources: ["jobs"]
  verbs: ["create", "delete", "get", "list", "patch", "update", "watch"]
- apiGroups: ["rbac.authorization.k8s.io"]
  resources: ["roles", "rolebindings"]
  verbs: ["create", "delete", "get", "list", "patch", "update", "watch"]

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

Step4: 绑定账号角色绑定

kubectl apply -f flink-cluster-role-binding.yaml
  • 1

flink-cluster-role-binding.yaml内容如下:

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: flink-cluster-role-binding
subjects:
- kind: ServiceAccount
  name: flink-service-account
  namespace: streampark
roleRef:
  kind: ClusterRole
  name: flink-cluster-role
  apiGroup: rbac.authorization.k8s.io
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

Step5:导出并修改kube config文件

## 导出kubernetes文件
kubectl config current-context
## 执行结果:kubernetes-admin@kubernetes

## 使用以下命令将当前上下文的配置导出到一个新的文件中
kubectl config view --minify --flatten --context=kubernetes-admin@kubernetes > ./config

## 修改config文件的server值,一般导出来是域名,查询方式如下:
kubectl cluster-info
## 查询结果:Kubernetes master is running at https://apiserver.cluster.local:6443
## 查询结果:KubeDNS is running at https://apiserver.cluster.local:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
## 查询结果:To further debug and diagnose cluster problems, use 'kubectl cluster-info dump'.

## 查询对应域名的实际值,例如:apiserver.cluster.local域名对应的是10.194.183.113
cat /etc/hosts
## 查询结果:127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
## 查询结果:::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
## 查询结果:10.194.183.113 node-10-194-183-113
## 查询结果:10.194.183.113 apiserver.cluster.local
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

Step6:备份并替换StreamPark服务器/.kube目录下的 config文件。

5.1.2.2 核心逻辑

主要核心的逻辑在:KubernetesNativeSessionClient

  def deploy(deployReq: DeployRequest): DeployResponse = {
    val deployRequest = deployReq.asInstanceOf[KubernetesDeployRequest]
    logInfo(
      s"""
         |--------------------------------------- kubernetes session cluster start ---------------------------------------
         |    userFlinkHome    : ${deployRequest.flinkVersion.flinkHome}
         |    flinkVersion     : ${deployRequest.flinkVersion.version}
         |    execMode         : ${deployRequest.executionMode.name()}
         |    clusterId        : ${deployRequest.clusterId}
         |    namespace        : ${deployRequest.kubernetesNamespace}
         |    exposedType      : ${deployRequest.flinkRestExposedType}
         |    serviceAccount   : ${deployRequest.serviceAccount}
         |    flinkImage       : ${deployRequest.flinkImage}
         |    properties       : ${deployRequest.properties.mkString(" ")}
         |-------------------------------------------------------------------------------------------
         |""".stripMargin)

    // 构建k8s配置
    val flinkConfig = getFlinkK8sConfig(deployRequest)
    // 从配置构建Kubernetes客户端
    val kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")

    var clusterDescriptor: KubernetesClusterDescriptor = null
    var client: ClusterClient[String] = null

    try {
      val kubernetesClusterDescriptor = getK8sClusterDescriptorAndSpecification(flinkConfig)
      clusterDescriptor = kubernetesClusterDescriptor._1

      // 初始化Kubernetes服务
      val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
      val kubeService = kubeClientWrapper.getService(deployRequest.clusterId)

      // 启动kubernetes session集群
      if (kubeService.isPresent) {
        client = clusterDescriptor.retrieve(deployRequest.clusterId).getClusterClient
      } else {
        client =
          clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
      }
      DeployResponse(address = client.getWebInterfaceURL, clusterId = client.getClusterId)
    } catch {
      case e: Exception => DeployResponse(error = e)
    } finally {
      Utils.close(client, clusterDescriptor, kubeClient)
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

5.2 停止Flink集群

备注:Standalone模式不支持停止集群方法。

在这里插入图片描述

停止Flink集群流程跟启动Flink集群流程大体相同,最终的逻辑还是在YarnSessionClientKubernetesNativeSessionClient 执行。

5.2.1 yarn session

5.2.1.1 核心逻辑

核心逻辑在:YarnSessionClient#shutdown

/**
 * 停止flink集群
 */
def shutdown(shutDownRequest: DeployRequest): ShutDownResponse = {
  var clusterDescriptor: YarnClusterDescriptor = null
  var client: ClusterClient[ApplicationId] = null
  try {
    // 获取并设置flink默认参数
    val flinkConfig = getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
    shutDownRequest.properties.foreach(
      m =>
        m._2 match {
          case v if v != null => flinkConfig.setString(m._1, m._2.toString)
          case _ =>
        })
    flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, shutDownRequest.clusterId)
    flinkConfig.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)

    // 获取yarn客户端
    val yarnClusterDescriptor = getSessionClusterDescriptor(flinkConfig)
    clusterDescriptor = yarnClusterDescriptor._2
    if (
      FinalApplicationStatus.UNDEFINED.equals(
        clusterDescriptor.getYarnClient
          .getApplicationReport(ApplicationId.fromString(shutDownRequest.clusterId))
          .getFinalApplicationStatus)
    ) {
      val clientProvider = clusterDescriptor.retrieve(yarnClusterDescriptor._1)
      client = clientProvider.getClusterClient
      // 停止Flink Session集群(RestClusterClient)
      client.shutDownCluster()
    }
    logInfo(s"the ${shutDownRequest.clusterId}'s final status is ${clusterDescriptor.getYarnClient
        .getApplicationReport(ConverterUtils.toApplicationId(shutDownRequest.clusterId))
        .getFinalApplicationStatus}")
    ShutDownResponse()
  } catch {
    case e: Exception => ShutDownResponse(e)
  } finally {
    Utils.close(client, clusterDescriptor)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

5.2.2 k8s session

5.2.2.1 核心逻辑

核心逻辑在:KubernetesNativeSessionClient#shutdown

  /**
   * 停止Flink集群
   */
  @throws[Exception]
  def shutdown(deployRequest: DeployRequest): ShutDownResponse = {
    val shutDownRequest = deployRequest.asInstanceOf[KubernetesDeployRequest]
    logInfo(
      s"""
         |--------------------------------------- kubernetes session cluster shutdown ---------------------------------------
         |    userFlinkHome     : ${shutDownRequest.flinkVersion.version}
         |    namespace         : ${shutDownRequest.kubernetesNamespace}
         |    clusterId         : ${shutDownRequest.clusterId}
         |    execMode          : ${shutDownRequest.executionMode.getName}
         |    flinkImage        : ${shutDownRequest.flinkImage}
         |    exposedType       : ${shutDownRequest.flinkRestExposedType.getName}
         |    kubeConf          : ${shutDownRequest.kubeConf}
         |    serviceAccount    : ${shutDownRequest.serviceAccount}
         |    properties        : ${shutDownRequest.properties.mkString(" ")}
         |-------------------------------------------------------------------------------------------
         |""".stripMargin)

    // 构造提交参数
    val flinkConfig = this.getFlinkK8sConfig(shutDownRequest)
    val clusterDescriptor = getK8sClusterDescriptor(flinkConfig)
    Try(
      // 获取k8s提交客户端
      clusterDescriptor
        .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
        .getClusterClient
    ) match {
      case Failure(e) => ShutDownResponse(e)
      case Success(c) =>
        // 停止Flink集群
        Try(c.shutDownCluster()) match {
          case Success(_) => ShutDownResponse()
          case Failure(e) => ShutDownResponse(e)
        }
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

5.3 作业上线

在这里插入图片描述

描述:

  1. checkVersion:检查Flink版本,版本必须是>=12和<=19;

  2. checkEnv:

    • 2.1 检测Flink环境
      • 2.1.1 Application#getStorageType(java.lang.Integer):根据execMode获取存储类型(yarn-application模式为HDFS存储类型)
      • EnvInitializer#checkFlinkEnv:门面获取StorageOperator,创建存储目录,例如:LFS为“/tmp/streampark/flink”,HDFS为:“/streampark/flink/flink-1.19.0”,创建完目录,上传文件到指定的目录。
    • 2.2 storageInitialize:初始化存储
      • 2.2.1 EnvInitializer#storageInitialize:其实程序启动时已经初始化了(EnvInitializer#run),
      • 2.2.2 创建的目录有:dist、uploads、workspace、backups、savepoints、jars(全局公共jar)
      • 2.2.3 上传client jar至client目录
      • 2.2.5 上传shims jar至shims目录
      • 2.2.6 创建maven本地仓库目录
    • 2.3 检测连接:如果为standlone或yarn session模式,通过rest api的方式获取job overview来判断连接是否正常。
  3. allowToBuildNow:检测是否允许构建,如果构建中,不可以构建

  4. checkBuildAndUpdate

    • 4.1 更新上线状态

    • 4.2 如果为SQL作业,备份(版本管理)。

    • 4.3 判断是否需要构建

  5. buildApplication

    • 5.1 判断是否为sql作业,如果是,则设置依赖

    • 5.2 createPipelineInstance:创建pipeline实例,根据不同执行模式封装,有几种:FlinkYarnApplicationBuildPipeline、FlinkRemoteBuildPipeline、FlinkK8sSessionBuildPipeline、FlinkK8sApplicationBuildPipeline

    • 5.3 移除上一次的构建历史

    • 5.4 registerWatcher:注册构建观察者

    • 5.5 buildPipelineExecutor.submit(pipeline::launch):开始构建

      • 5.5.1 BuildPipeline#launch:执行构建

        • onStart():构建开始

          • 更新构建状态
          • 初始化FlinkAppHttpWatcher(FlinkAppHttpWatcher#initialize)
          • 检测集群状态
          • jar包准备工作:复制依赖的jar包到uploads目录(POM和手动上传的jar)、如果为Yarn Application模式,上传至HDFS。
        • onStepStateChange():构建状态变更

          • executor里面执行的buildProcess,不同的模式有不同的步骤(异步)
          • 基类:BuildPipeline#execStep
          • 步骤枚举:PipelineType
        • onFinish():构建结束,保存并更新状态

备注:如果为K8S application模式,跟之前一样,注册DockerProgressWatcher,也是执行buildProcess


小结:

  • 4个构建实现类:FlinkK8sApplicationBuildPipelineFlinkK8sSessionBuildPipelineFlinkRemoteBuildPipeline
    FlinkYarnApplicationBuildPipeline

  • 2个进度观察者:PipeWatcherDockerProgressWatcher

  • 1个步骤枚举类:PipelineType

构建实现类与模式对应关系:

构建实现类模式
FlinkYarnApplicationBuildPipelineYARN_APPLICATION
FlinkRemoteBuildPipelineYARN_PER_JOB、YARN_SESSION、REMOTE
FlinkK8sSessionBuildPipelineKUBERNETES_NATIVE_SESSION
FlinkK8sApplicationBuildPipelineKUBERNETES_NATIVE_APPLICATION

5.3.1 standlone

PipelineType枚举描述

  FLINK_STANDALONE(
      3,
      "flink standalone session mode task building pipeline",
      ImmutableMap.<Integer, String>builder()
          .put(1, "Create building workspace")
          .put(2, "Build shaded flink app jar")
          .build(),
      ShadedBuildResponse.class),
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

核心类:FlinkRemoteBuildPipeline(打了一个fat包)

/**
 *  打包:(主程序+依赖)
 */  
@throws[Throwable]
override protected def buildProcess(): ShadedBuildResponse = {
  execStep(1) {
    // 删除工作目录,例如:/tmp/streampark/workspace/100001
    LfsOperator.mkCleanDirs(request.workspace)
    logInfo(s"recreate building workspace: ${request.workspace}")
  }.getOrElse(throw getError.exception)
  // build flink job shaded jar
  val shadedJar =
    execStep(2) {
      // 把执行程序和依赖都打成一个fat包,并存放在一个目录,例如:/tmp/streampark/workspace/100001/streampark-flinkjob_standalone-jar.jar
      val output = MavenTool.buildFatJar(
        request.mainClass,
        request.providedLibs,
        request.getShadedJarPath(request.workspace))
      logInfo(s"output shaded flink job jar: ${output.getAbsolutePath}")
      output
    }.getOrElse(throw getError.exception)
  ShadedBuildResponse(request.workspace, shadedJar.getAbsolutePath)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

5.3.2 yarn

PipelineType枚举描述

  FLINK_YARN_APPLICATION(
      4,
      "flink yarn application mode task building pipeline",
      ImmutableMap.<Integer, String>builder()
          .put(1, "Prepare hadoop yarn environment and building workspace")
          .put(2, "Resolve maven dependencies")
          .put(3, "upload jar to yarn.provided.lib.dirs")
          .build(),
      SimpleBuildResponse.class);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

核心类:FlinkYarnApplicationBuildPipeline(只有yarn application sql模式能进去)

  override protected def buildProcess(): SimpleBuildResponse = {
    if (request.developmentMode == DevelopmentMode.FLINK_SQL) {// 仅支持SQL模式
      execStep(1) {
        HdfsOperator.mkCleanDirs(request.yarnProvidedPath) // 清除依赖库目录
        logInfo(s"recreate building workspace: ${request.yarnProvidedPath}")
      }.getOrElse(throw getError.exception)

      val mavenJars =
        execStep(2) { // 处理maven依赖
          val mavenArts = MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
          mavenArts.map(_.getAbsolutePath) ++ request.dependencyInfo.extJarLibs
        }.getOrElse(throw getError.exception)

      execStep(3) { // 上传
        mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath))
      }.getOrElse(throw getError.exception)
    }
    SimpleBuildResponse()
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

5.3.3 k8s

5.3.3.1 session

PipelineType枚举描述

FLINK_NATIVE_K8S_SESSION(
      1,
      "flink native kubernetes session mode task building pipeline",
      ImmutableMap.<Integer, String>builder()
          .put(1, "Create building workspace")
          .put(2, "Build shaded flink app jar")
          .build(),
      ShadedBuildResponse.class),
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

核心类:FlinkK8sSessionBuildPipeline

override protected def buildProcess(): ShadedBuildResponse = {

    // 创建空间
    // 子目录类型类似于: APP_WORKSPACE/k8s-clusterId@k8s-namespace/,例如:APP_WORKSPACE/flink-cluster-local@streampark
    val buildWorkspace =
      execStep(1) {
        val buildWorkspace = s"${request.workspace}/${request.clusterId}@${request.k8sNamespace}"
        LfsOperator.mkCleanDirs(buildWorkspace) // 删除目录,例如:/tmp/streampark/workspace/100011/flink-cluster-local@streampark
        logInfo(s"recreate building workspace: $buildWorkspace")
        buildWorkspace
      }.getOrElse(throw getError.exception)

    // 构建shaded jar.
    val shadedJar =
      execStep(2) {
        val output = MavenTool.buildFatJar(
          request.mainClass,
          request.providedLibs,
          request.getShadedJarPath(buildWorkspace))
        logInfo(s"output shaded flink job jar: ${output.getAbsolutePath}")
        output
      }.getOrElse(throw getError.exception)

    ShadedBuildResponse(buildWorkspace, shadedJar.getAbsolutePath)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
5.3.3.2 application

PipelineType枚举描述

FLINK_NATIVE_K8S_APPLICATION(
      2,
      "flink native kubernetes session mode task building pipeline",
      ImmutableMap.<Integer, String>builder()
          .put(1, "Create building workspace")
          .put(2, "Export kubernetes pod template")
          .put(3, "Build shaded flink app jar")
          .put(4, "Export flink app dockerfile")
          .put(5, "Pull flink app base docker image")
          .put(6, "Build flink app docker image")
          .put(7, "Push flink app docker image")
          .build(),
      DockerImageBuildResponse.class),
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

核心类:FlinkK8sApplicationBuildPipeline

  override protected def buildProcess(): DockerImageBuildResponse = {

    // Step-1: 初始化flink作业工作目录,例如:/tmp/streampark/workspace/100013/k8s-application-jar@streampark

    val buildWorkspace =
      execStep(1) {
        val buildWorkspace = s"${request.workspace}/${request.clusterId}@${request.k8sNamespace}"
        LfsOperator.mkCleanDirs(buildWorkspace)
        logInfo(s"recreate building workspace: $buildWorkspace")
        buildWorkspace
      }.getOrElse(throw getError.exception)

    // Step-2: 导出k8s pod模板文件集合
    val podTemplatePaths = request.flinkPodTemplate match {
      case podTemplate if podTemplate.isEmpty =>
        skipStep(2)
        Map[String, String]()
      case podTemplate =>
        execStep(2) {
          val podTemplateFiles =
            PodTemplateTool.preparePodTemplateFiles(buildWorkspace, podTemplate).tmplFiles
          logInfo(s"export flink podTemplates: ${podTemplateFiles.values.mkString(",")}")
          podTemplateFiles
        }.getOrElse(throw getError.exception)
    }

    // Step-3: 构建shaded jar和处理依赖包
    // 输出目录:/tmp/streampark/workspace/100013/k8s-application-jar@streampark/streampark-flinkjob_k8s-application-jar.jar
    val (shadedJar, extJarLibs) =
      execStep(3) {
        val shadedJarOutputPath = request.getShadedJarPath(buildWorkspace)
        val extJarLibs = request.developmentMode match {
          case DevelopmentMode.FLINK_SQL => request.dependency.extJarLibs
          case DevelopmentMode.CUSTOM_CODE => Set[String]()
        }
        val shadedJar =
          MavenTool.buildFatJar(request.mainClass, request.providedLibs, shadedJarOutputPath)
        logInfo(s"output shaded flink job jar: ${shadedJar.getAbsolutePath}")
        shadedJar -> extJarLibs
      }.getOrElse(throw getError.exception)

    // Step-4: 生成和导出flink镜像相关的DockerFile文件,例如:/tmp/streampark/workspace/100013/k8s-application-jar@streampark/Dockerfile
    val (dockerfile, dockerFileTemplate) =
      execStep(4) {
        val dockerFileTemplate = {
          if (request.integrateWithHadoop) {
            FlinkHadoopDockerfileTemplate.fromSystemHadoopConf(
              buildWorkspace,
              request.flinkBaseImage,
              shadedJar.getAbsolutePath,
              extJarLibs)
          } else {
            FlinkDockerfileTemplate(
              buildWorkspace,
              request.flinkBaseImage,
              shadedJar.getAbsolutePath,
              extJarLibs)
          }
        }
        val dockerFile = dockerFileTemplate.writeDockerfile
        logInfo(
          s"output flink dockerfile: ${dockerFile.getAbsolutePath}, content: \n${dockerFileTemplate.offerDockerfileContent}")
        dockerFile -> dockerFileTemplate
      }.getOrElse(throw getError.exception)

    val dockerConf = request.dockerConfig
    val baseImageTag = request.flinkBaseImage.trim
    val pushImageTag = {
      val expectedImageTag = s"streampark-${request.k8sNamespace}-${request.clusterId}"
      compileTag(expectedImageTag, dockerConf.registerAddress, dockerConf.imageNamespace)
    }

    // Step-5: 推送flink基础镜像
    execStep(5) {
      usingDockerClient {
        dockerClient =>
          val pullImageCmd = {
            // when the register address prefix is explicitly identified on base image tag,
            // the user's pre-saved docker register auth info would be used.
            if (
              dockerConf.registerAddress != null && !baseImageTag.startsWith(
                dockerConf.registerAddress)
            ) {
              dockerClient.pullImageCmd(baseImageTag)
            } else {
              dockerClient.pullImageCmd(baseImageTag).withAuthConfig(dockerConf.toAuthConf)
            }
          }
          val pullCmdCallback = pullImageCmd
            .asInstanceOf[HackPullImageCmd]
            .start(watchDockerPullProcess {
              pullRsp =>
                dockerProcess.pull.update(pullRsp)
                Future(dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
            })
          pullCmdCallback.awaitCompletion
          logInfo(s"already pulled docker image from remote register, imageTag=$baseImageTag")
      }(err => throw new Exception(s"pull docker image failed, imageTag=$baseImageTag", err))
    }.getOrElse(throw getError.exception)

    // Step-6: 构建应用镜像
    execStep(6) {
      usingDockerClient {
        dockerClient =>
          val buildImageCmd = dockerClient
            .buildImageCmd()
            .withBaseDirectory(new File(buildWorkspace))
            .withDockerfile(dockerfile)
            .withTags(Sets.newHashSet(pushImageTag))

          val buildCmdCallback = buildImageCmd
            .asInstanceOf[HackBuildImageCmd]
            .start(watchDockerBuildStep {
              buildStep =>
                dockerProcess.build.update(buildStep)
                Future(
                  dockerProcessWatcher.onDockerBuildProgressChange(dockerProcess.build.snapshot))
            })
          val imageId = buildCmdCallback.awaitImageId
          logInfo(s"built docker image, imageId=$imageId, imageTag=$pushImageTag")
      }(err => throw new Exception(s"build docker image failed. tag=$pushImageTag", err))
    }.getOrElse(throw getError.exception)

    // Step-7: 推送镜像至harbor
    execStep(7) {
      usingDockerClient {
        dockerClient =>
          val pushCmd: PushImageCmd = dockerClient
            .pushImageCmd(pushImageTag)
            .withAuthConfig(dockerConf.toAuthConf)

          val pushCmdCallback = pushCmd
            .asInstanceOf[HackPushImageCmd]
            .start(watchDockerPushProcess {
              pushRsp =>
                dockerProcess.push.update(pushRsp)
                Future(dockerProcessWatcher.onDockerPushProgressChange(dockerProcess.push.snapshot))
            })
          pushCmdCallback.awaitCompletion
          logInfo(s"already pushed docker image, imageTag=$pushImageTag")
      }(err => throw new Exception(s"push docker image failed. tag=$pushImageTag", err))
    }.getOrElse(throw getError.exception)

    // Step-8:  初始化ingress的构建目录
    request.ingressTemplate match {
      case ingress if StringUtils.isBlank(ingress) => skipStep(8)
      case _ =>
        execStep(8) {
          val path = IngressController.prepareIngressTemplateFiles(
            buildWorkspace,
            request.ingressTemplate
          )
          logInfo(s"export flink ingress: $path")
        }.getOrElse(throw getError.exception)
    }

    DockerImageBuildResponse(
      buildWorkspace,
      pushImageTag,
      podTemplatePaths,
      dockerFileTemplate.innerMainJarPath)
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162

工作目录如下:例如(/tmp/streampark/workspace/100013/k8s-application-jar@streampark

在这里插入图片描述

5.4 作业启动

在这里插入图片描述

描述

  1. ApplicationController#start:启动作业入口;
  2. 在入口,首先会检测环境(流程跟前面一样,可参考5.4),接着开始启动作业;
  3. 启动作业前,会检查作业的状态、环境以及不同模式下的检测;
  4. 接着开始构建请求,不同的执行模式,需要构建不同的参数,同时还需要结合上线操作的记录来构建;
  5. 然后异步提交,提交还是使用代理的方式(跟前面一样,参考5.1),根据flink版本加载不同的类加载器,最后使用反射去触发不同执行模式对应的submit方法(map集合存储对应的实现,执行模式→不同类型的客户端);
  6. 最后,异步等待提交的结果,并把future放入map集合(应用ID→future),接着保存执行结果。

5.4 FlinkClientTrait

以下是不同执行模式对应的客户端实现

执行模式编码执行模式客户端实现描述
0LOCALLocalClient应该是已经废弃了,本地启动一个Flink集群,后台运行
1STANDALONERemoteClient
2YARN_PER_JOBYarnPerJobClient
3YARN_SESSIONYarnSessionClient
4YARN_APPLICATIONYarnApplicationClient
5KUBERNETES_NATIVE_SESSIONKubernetesNativeSessionClient
6KUBERNETES_NATIVE_APPLICATIONKubernetesNativeApplicationClient

其中,每种客户端都继承了FlinkClientTrait基类,里面封装了公共的方法实现以及行为(模板方法设计模式),具体代码如下:

  /*** 提交作业模板方法 **/
  def submit(submitRequest: SubmitRequest): SubmitResponse = {
    logInfo(
      s"""
         |--------------------------------------- flink job start ---------------------------------------
         |    userFlinkHome    : ${submitRequest.flinkVersion.flinkHome}
         |    flinkVersion     : ${submitRequest.flinkVersion.version}
         |    appName          : ${submitRequest.appName}
         |    devMode          : ${submitRequest.developmentMode.name()}
         |    execMode         : ${submitRequest.executionMode.name()}
         |    k8sNamespace     : ${submitRequest.kubernetesNamespace}
         |    flinkExposedType : ${submitRequest.flinkRestExposedType}
         |    clusterId        : ${submitRequest.clusterId}
         |    applicationType  : ${submitRequest.applicationType.getName}
         |    savePoint        : ${submitRequest.savePoint}
         |    properties       : ${submitRequest.properties.mkString(" ")}
         |    args             : ${submitRequest.args}
         |    appConf          : ${submitRequest.appConf}
         |    flinkBuildResult : ${submitRequest.buildResult}
         |-------------------------------------------------------------------------------------------
         |""".stripMargin)

    val (commandLine, flinkConfig) = getCommandLineAndFlinkConfig(submitRequest)
    if (submitRequest.userJarFile != null) {
      val uri = PackagedProgramUtils.resolveURI(submitRequest.userJarFile.getAbsolutePath)
      val programOptions = ProgramOptions.create(commandLine)
      val executionParameters = ExecutionConfigAccessor.fromProgramOptions(
        programOptions,
        Collections.singletonList(uri.toString))
      executionParameters.applyToConfiguration(flinkConfig)
    }

    // 设置通用的flink配置
    flinkConfig
      .safeSet(PipelineOptions.NAME, submitRequest.effectiveAppName)
      .safeSet(DeploymentOptions.TARGET, submitRequest.executionMode.getName)
      .safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
      .safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS, submitRequest.appMain)
      .safeSet(ApplicationConfiguration.APPLICATION_ARGS, extractProgramArgs(submitRequest))
      .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.jobId)

    if (
      !submitRequest.properties.containsKey(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())
    ) {
      val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
        submitRequest.flinkVersion.flinkHome)
      // state.checkpoints.num-retained
      val retainedOption = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS
      flinkConfig.safeSet(retainedOption, flinkDefaultConfiguration.get(retainedOption))
    }

    // 设置savepoint相关的参数
    if (submitRequest.savePoint != null) {
      flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
      flinkConfig.setBoolean(
        SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
        submitRequest.allowNonRestoredState)
    }

    // 设置JVM相关的参数
    setJvmOptions(submitRequest, flinkConfig)

    // 其它设置由具体的提交客户端子类实现
    setConfig(submitRequest, flinkConfig)

    // 提交也是由具体的提交客户端子类实现
    doSubmit(submitRequest, flinkConfig)

  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

继承了FlinkClientTrait基类的有多种,接下来以典型的RemoteClient为例子。

5.4.2 RemoteClient

接着前面的 doSubmit 方法,RemoteClient的实现方式如下:

  override def doSubmit(
      submitRequest: SubmitRequest,
      flinkConfig: Configuration): SubmitResponse = {

    // 2) submit job
    super.trySubmit(submitRequest, flinkConfig, submitRequest.userJarFile)(
      jobGraphSubmit,
      restApiSubmit)
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

可以看到有两种提交方式,一个是jobGraph,另外一种是restApitrySubmit这个方法具体实现如下:

  def trySubmit(submitRequest: SubmitRequest, flinkConfig: Configuration, jarFile: File)(
      jobGraphFunc: (SubmitRequest, Configuration, File) => SubmitResponse,
      restApiFunc: (SubmitRequest, Configuration, File) => SubmitResponse): SubmitResponse = {
    // 优先使用jobGraph方式提交作业,RestApi作为备选方案
    Try {
      logInfo(s"[flink-submit] Submit job with JobGraph Plan.")
      // 使用jobGraph方式提交
      jobGraphFunc(submitRequest, flinkConfig, jarFile)
    } match {
      case Failure(e) =>
       // 出了异常,使用restApi方式提交作业。
        Try(restApiFunc(submitRequest, flinkConfig, jarFile)) match {
          case Success(r) => r
          case Failure(e1) =>
            throw new RuntimeException(
              s"""\n
                 |[flink-submit] Both JobGraph submit plan and Rest API submit plan all failed!
                 |JobGraph Submit plan failed detail:
                 |------------------------------------------------------------------
                 |${Utils.stringifyException(e)}
                 |------------------------------------------------------------------
                 |
                 | RestAPI Submit plan failed detail:
                 | ------------------------------------------------------------------
                 |${Utils.stringifyException(e1)}
                 |------------------------------------------------------------------
                 |""".stripMargin)
        }
      case Success(v) => v
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

可以看到,客户端优先使用jobGraph方式提交作业,RestApi作为备选方案

5.4.2.1 JobGraph

JobGraph提交方式的代码如下:

  /*** JobGraph提交方式**/
  def jobGraphSubmit(
      submitRequest: SubmitRequest,
      flinkConfig: Configuration,
      jarFile: File): SubmitResponse = {
    var clusterDescriptor: StandaloneClusterDescriptor = null;
    var packageProgram: PackagedProgram = null
    var client: ClusterClient[StandaloneClusterId] = null
    try {
      val standAloneDescriptor = getStandAloneClusterDescriptor(flinkConfig)
      clusterDescriptor = standAloneDescriptor._2
      
      // 构建JobGraph,这里会触发到org.apache.streampark.flink.cli.SqlClient$.mode方法来构建JobGraph
      val packageProgramJobGraph = super.getJobGraph(flinkConfig, submitRequest, jarFile)
      packageProgram = packageProgramJobGraph._1
      val jobGraph = packageProgramJobGraph._2
      client = clusterDescriptor.retrieve(standAloneDescriptor._1).getClusterClient
      
      // 提交作业
      val jobId = client.submitJob(jobGraph).get().toString
      logInfo(
        s"${submitRequest.executionMode} mode submit by jobGraph, WebInterfaceURL ${client.getWebInterfaceURL}, jobId: $jobId")
      val result = SubmitResponse(null, flinkConfig.toMap, jobId, client.getWebInterfaceURL)
      result
    } catch {
      case e: Exception =>
        logError(s"${submitRequest.executionMode} mode submit by jobGraph fail.")
        e.printStackTrace()
        throw e
    } finally {
      if (submitRequest.safePackageProgram) {
        Utils.close(packageProgram)
      }
      Utils.close(client, clusterDescriptor)
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

可以看到使用JobGraph的方式提交作业,需要先构建作业图。构建作业图,需要用到了org.apache.streampark.flink.cli.SqlClient$.mode方法来构建。

备注:SQL类型的作业,运行在集群的入口类也是org.apache.streampark.flink.cli.SqlClient

5.4.2.2 RestApi

RestApi的提交方式,使用的是FlinkSessionSubmitHelper工具类,本质就是一个HTTP工具类,这里不再详述其逻辑。

   /*** RestApi提交方式**/
  def restApiSubmit(
      submitRequest: SubmitRequest,
      flinkConfig: Configuration,
      fatJar: File): SubmitResponse = {
    // retrieve standalone session cluster and submit flink job on session mode
    var clusterDescriptor: StandaloneClusterDescriptor = null;
    var client: ClusterClient[StandaloneClusterId] = null
    val standAloneDescriptor = getStandAloneClusterDescriptor(flinkConfig)
    val yarnClusterId: StandaloneClusterId = standAloneDescriptor._1
    clusterDescriptor = standAloneDescriptor._2

    client = clusterDescriptor.retrieve(yarnClusterId).getClusterClient
    val jobId =
      FlinkSessionSubmitHelper.submitViaRestApi(client.getWebInterfaceURL, fatJar, flinkConfig)
    logInfo(
      s"${submitRequest.executionMode} mode submit by restApi, WebInterfaceURL ${client.getWebInterfaceURL}, jobId: $jobId")
    SubmitResponse(null, flinkConfig.toMap, jobId, client.getWebInterfaceURL)
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

那么,提交完作业,之后怎么执行的呢?接下来继续讲解。

5.4.3 SqlClient

5.4.3.1 流程图

SQL Client的核心类流程逻辑如下:

在这里插入图片描述

描述:

  • SqlClient会结合 sql + 入参 + flink配置 来判断执行的模式;
  • 如果为流处理模式(Streaming Mode),则执行StreamSqlAppmain方法,main方法使用的是模版流程方法,按步骤执行,最后调用的是FlinkSqlExecutorexecuteSQL方法;
  • 同理,批处理模式大致一样。

具体讲讲具体的流程。

5.4.3.2 执行逻辑

以下是提交时的调试结果 ,可以看到公共的配置大致有如下(做了分类简化):

## 基础配置
env.java.opts.all=--add-exports=java.base/sun.net.util=ALL-UNNAMED ...,
classloader.resolve-order=parent-first,
state.checkpoints.num-retained=1, 
parallelism.default=1, 
execution.savepoint.ignore-unclaimed-state=false,
execution.savepoint-restore-mode=NO_CLAIM,
execution.target=remote,
execution.attached=true,
execution.shutdown-on-attached-exit=false,  
rest.bind-address=localhost,
rest.port=8081,
rest.address=127.0.0.1,

## jobmanager配置
jobmanager.execution.failover-strategy=region, 
jobmanager.rpc.address=localhost, 
jobmanager.bind-host=localhost, 
jobmanager.memory.process.size=1600m, 
jobmanager.rpc.port=6123,

## taskmanager配置
taskmanager.host=localhost,taskmanager.numberOfTaskSlots=1,
taskmanager.memory.process.size=1728m, 
taskmanager.bind-host=localhost,

## 程序配置
$internal.application.program-args=[--flink.conf, eNrtW...w==, --app.name, eNorLknMS0nMyc9L1S0uzAEAKyQFpw==, --parallelism.default, 1, --sql, eNqlU...nE8bw],
$internal.application.main=org.apache.streampark.flink.cli.SqlClient, 
$internal.pipeline.job-id=a923bafe56e2ace404f7daf0b899bd52,
$internal.deployment.config-dir=/Users/yanglinwei/Desktop/StreamPark/sdk/flink-1.19.0/conf}

## 依赖配置
pipeline.classpaths=[],pipeline.name=standalone-sql,
pipeline.jars=[file:/tmp/streampark/workspace/100000/streampark-flinkjob_Flink_SQL_Demo.jar],
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

通过上面的配置,可以看到执行程序的jar包为:streampark-flinkjob_Flink_SQL_Demo.jar,入口类为:org.apache.streampark.flink.cli.SqlClient,其实这个Jar包对应的就是“streampark-flink-sqlclient”这个目录下的代码。继续来研读这块代码。

备注:SQL模式,入口类都为org.apache.streampark.flink.cli.SqlClient

SqlClient入口类,主体逻辑如下:

object SqlClient extends App {

  val arguments = ArrayBuffer(args: _*)

  /**
   * 参数解析工具类
   */
  private[this] val parameterTool = ParameterTool.fromArgs(args)

  /**
   * 解析FlinkSQL(Base64解密+解压)
   */
  private[this] val flinkSql = {
    val sql = parameterTool.get(KEY_FLINK_SQL())
    require(StringUtils.isNotBlank(sql), "Usage: flink sql cannot be null")
    Try(DeflaterUtils.unzipString(sql)) match {
      case Success(value) => value
      case Failure(_) =>
        throw new IllegalArgumentException("Usage: flink sql is invalid or null, please check")
    }
  }

  /**
   * sql集合
   */
  private[this] val sets = SqlCommandParser.parseSQL(flinkSql).filter(_.command == SqlCommand.SET)

  /**
   * 默认模式:Streaming or Batch
   */
  private[this] val defaultMode = RuntimeExecutionMode.STREAMING.name()

  /**
   * 获取执行模式(从3个地方判断)
   */
  private[this] val mode = sets.find(_.operands.head == ExecutionOptions.RUNTIME_MODE.key()) match {
    case Some(e) =>
      // 1) flink sql的execution.runtime-mode配置有最高优先级
      val m = e.operands(1).toUpperCase()
      arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
      m
    case None =>
      // 2) 动态参数(execution.runtime-mode)
      parameterTool.get(ExecutionOptions.RUNTIME_MODE.key(), null) match {
        case null =>
          val m = parameterTool.get(KEY_APP_CONF(), null) match {
            case null => defaultMode
            case f =>
              val parameter = PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(f.drop(7)))
              // 3) flink-conf.yaml配置(即:execution.runtime-mode)
              parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode).toUpperCase()
          }
          arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
          m
        case m => m
      }
  }

  /**
   * 不同执行模式,对应不同的启动方式
   */
  mode match {
    case "STREAMING" | "AUTOMATIC" => StreamSqlApp.main(arguments.toArray)
    case "BATCH" => BatchSqlApp.main(arguments.toArray)
    case _ =>
      throw new IllegalArgumentException(
        "Usage: runtime execution-mode invalid, optional [STREAMING|BATCH|AUTOMATIC]")
  }

  /**
   * Batch 执行模式(继承FlinkTable)
   */
  private[this] object BatchSqlApp extends FlinkTable {
    override def handle(): Unit = context.sql()
  }

  /**
   * SQL 执行模式(继承FlinkStreamTable)
   */
  private[this] object StreamSqlApp extends FlinkStreamTable {
    override def handle(): Unit = context.sql()
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85

可以看到有两个类分别执行了流处理(Streaming)还是批处理(Batch)。从前面的图可以看到流程大致相同,这里以Streaming模式来讲。

在前面的代码,可以看到StreamSqlApp是继承了FlinkStreamTable。其中使用context.sql()方法覆盖了handle的实现。

/**
* SQL 执行模式(继承FlinkStreamTable)
*/ 
private[this] object StreamSqlApp extends FlinkStreamTable {
    override def handle(): Unit = context.sql()
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

看看FlinkStreamTable的执行流程:

trait FlinkStreamTable extends Logger {

  implicit final def tableExt(table: Table): TableExt.Table = new TableExt.Table(table)

  implicit final def tableConversions(table: Table): TableExt.TableConversions =
    new TableExt.TableConversions(table)

  implicit final lazy val parameter: ParameterTool = context.parameter

  implicit var context: StreamTableContext = _

  private[this] def init(args: Array[String]): Unit = {
    SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkStreamTable])
    context = new StreamTableContext(
      FlinkTableInitializer.initialize(args, configStream, configTable))
  }

  /**
   * 执行流程定义(模板方法模式)
   */
  def main(args: Array[String]): Unit = {
    //初始化参数,包括初始化StreamTableContext
    init(args)
    //准备操作(子类可实现)
    ready()
    //处理(上面定义这个类的时候,有声明覆盖了)
    handle()
    //执行操作
    context.start()
    //销毁操作(子类可实现)
    destroy()
  }

  def configStream(env: StreamExecutionEnvironment, parameter: ParameterTool): Unit = {}

  def configTable(tableConfig: TableConfig, parameter: ParameterTool): Unit = {}

  def ready(): Unit = {}

  def handle(): Unit

  def destroy(): Unit = {}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

继续看看hadnle方法做了什么?也就是StreamTableContext的sql方法(StreamTableContext继承了FlinkStreamTableTrait):

在这里插入图片描述

可以看到,传入了几个参数,一个是要执行的sql,一个是StreamTableContext,还有参数parameter配置,其实最终的目的是执行:context.executeSql(x.originSql),逻辑如下:

/*** Flink SQL执行器 **/
object FlinkSqlExecutor extends Logger {

  private[this] val lock = new ReentrantReadWriteLock().writeLock

  // 执行Flink SQL
  private[streampark] def executeSql(
      sql: String,
      parameter: ParameterTool,
      context: TableEnvironment)(implicit callbackFunc: String => Unit = null): Unit = {

    val flinkSql: String =
      if (StringUtils.isBlank(sql)) parameter.get(KEY_FLINK_SQL()) else parameter.get(sql)
    require(StringUtils.isNotBlank(flinkSql), "verify failed: flink sql cannot be empty")
    
    //...........

    // 获取执行模式
    val runMode = parameter.get(ExecutionOptions.RUNTIME_MODE.key())

    var hasInsert = false
    val statementSet = context.createStatementSet()
    
    // 解析并执行SQL
    SqlCommandParser
      .parseSQL(flinkSql)
      .foreach(
        x => {
          val args = if (x.operands.isEmpty) null else x.operands.head
          val command = x.command.name
          x.command match {
            // For display sql statement result information
            case SHOW_CATALOGS =>
              val catalogs = context.listCatalogs
              callback(s"$command: ${catalogs.mkString("\n")}")
            //...........
            case INSERT =>
              statementSet.addInsertSql(x.originSql)
              hasInsert = true
            case SELECT =>
              logError("StreamPark dose not support 'SELECT' statement now!")
              throw new RuntimeException("StreamPark dose not support 'select' statement now!")
            case DELETE | UPDATE =>
              if (runMode == "STREAMING") {
                throw new UnsupportedOperationException(
                  s"Currently, ${command.toUpperCase()} statement only supports in batch mode, " +
                    s"and it requires the target table connector implements the SupportsRowLevelDelete, " +
                    s"For more details please refer to: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/$command")
              }
            case _ =>
              try {
                lock.lock()
                // 执行Flink SQL
                val result = context.executeSql(x.originSql)
                logInfo(s"$command:$args")
              } finally {
                if (lock.isHeldByCurrentThread) {
                  lock.unlock()
                }
              }
          }
        })

    if (hasInsert) {
      statementSet.execute() match {
        case t if t != null =>
          Try(t.getJobClient.get.getJobID).getOrElse(null) match {
            case x if x != null => logInfo(s"jobId:$x")
            case _ =>
          }
        case _ =>
      }
    } else {
      logError("No 'INSERT' statement to trigger the execution of the Flink job.")
      throw new RuntimeException("No 'INSERT' statement to trigger the execution of the Flink job.")
    }

    logInfo(
      s"\n\n\n==============flinkSql==============\n\n $flinkSql\n\n============================\n\n\n")
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

ok,这就是作业启动之后,整体的流程。

5.5 作业停止

备注:这里跟作业启动流程比较详细,直接贴出流程图,不做深入探讨。

在这里插入图片描述

描述

  1. ApplicationController#cancel:启动作业入口;
  2. ApplicationService#cancel:更新作业日志,通知监听器需要取消的应用;
  3. 构造请求并使用FlinkClient提交,后续跟作业启动一样的流程,都是根据版本去获取对应的类加载器;
  4. 结束后,更新记录并解除对普通应用以及k8s应用的监听(FlinkAppHttpWatcherFlinkK8sWatcher)。

5.6 作业映射

作业映射,对应的界面如下:

在这里插入图片描述

作业映射,我理解的是把 已经运行的作业映射到选中的作业里面,当做是该作业跑的,可看作是一种补偿的操作

演示方式:可以先跑一个作业,然后修改数据库的·t_flink_app·表的状态为失败,然后重新映射操作上一次正在运行的作业即可。

核心方法:ApplicationServiceImpl#mapping

  @Override
  public boolean mapping(Application appParam) {
    //更新应用的application_id or jobId
    boolean mapping = this.baseMapper.mapping(appParam);
    Application application = getById(appParam.getId());
    if (application.isKubernetesModeJob()) {
      // 加入监听,实时更新作业状态(k8s作业)
      flinkK8sWatcher.doWatching(k8sWatcherWrapper.toTrackId(application));
    } else {
       // 加入监听,实时更新作业状态(非k8s作业)
      FlinkAppHttpWatcher.doWatching(application);
    }
    return mapping;
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

5.7 作业快照

在这里插入图片描述

保存点是 Flink 中的一种机制,用于将流式作业的状态持久化到外部存储系统。这样可以在需要时(例如故障恢复、程序更新或迁移时)恢复作业的状态,从而继续处理数据。

描述:上图是触发savepoint的流程,跟前面的描述的主体流程差不多,最终还是调用FlinkAPI去触发savepoint的生成。

在StreamPark中,作业的状态快照备份与恢复,有两种实现方式,

  • 手动触发 SavePointClusterClient#triggerSavepoint:程序运行时,可以指定SavePoint路径进行恢复,程序运行时手动触发,停止程序时也可以指定备份路径。快照可以存储在savepoint的文件系统目录(如:hdfs://xxx:8020/streampark/savepoints
  • 自动触发 Checkpoint

5.7.1 手动触发(Savepoint)

触发时机:在StreamPark,手动触发SavePoint,可以发生在运行时,以及停止作业时。


触发时机(运行时)

  • SavePointController#trigger →
  • SavePointService#trigger →
  • FlinkClient#triggerSavepoint →
  • FlinkClientHandler#triggerSavepoint →
  • 不同模式的实现(如:RemoteClient、YarnClient等) →
  • RestClusterClient#triggerSavepoint

触发时机(停止作业时)

  • ApplicationController#cancel →

  • ApplicationService#cancel →

  • FlinkClient#cancel →

  • 不同模式的实现(如:RemoteClient、YarnClient等) →

  • drain为false(默认false)时:RestClusterClient#cancelWithSavepoint

    drain为true(在触发 savepoint 和停止作业之前发送 MAX_WATERMARK,也就是指示 Flink 是否应该将作业的时间推进到其可能的最大值)时:RestClusterClient#stopWithSavepoint

备注:停止出发savepoint时,如果draintrueFlink 会尝试将作业的时间推进到所有时间特征的最大值,这通常用于确保在创建保存点之前,所有的时间相关的操作都已经完成。

5.7.2 自动触发(Checkpoint)

自动checkpoint触发通过作业配置实现,示例配置内容如下:

execution:
      checkpointing:
        mode: EXACTLY_ONCE
        interval: 30s
        timeout: 10min
        unaligned: false
        externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
        tolerable-failed-checkpoints: 99
    # state backend
    state:
      backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
      backend.incremental: true
      checkpoint-storage: filesystem
      savepoints.dir: hdfs://xxx:8020/streampark/savepoints
      checkpoints.dir: hdfs://xxx:8020/streampark/checkpoints
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

5.7.3 状态监听(最新Checkpoint保存、告警or重启)

在前面 《4.2.1 作业指标更新》 有提到过,FlinkAppHttpWatcher定时1秒查询一次,查询方式分为flink rest apiyarn rest api。当然也包括检查点的监听,通过RestAPI的方式:FlinkAppHttpWatcher#handleCheckPoints

  /** 获取最新的CheckPoint*/
  private void handleCheckPoints(Application application) throws Exception {
    CheckPoints checkPoints = httpCheckpoints(application); // 地址:jobs/%s/checkpoints
    if (checkPoints != null) {
      checkpointProcessor.process(application, checkPoints);
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

CheckpointProcessor#process实际逻辑如下(失败重启或告警):

public void process(Application application, @Nonnull CheckPoints checkPoints) {
  checkPoints.getLatestCheckpoint().forEach(checkPoint -> process(application, checkPoint));
}

----
/*** 这里根据Checkpoint的状态去做处理,失败重启或告警 **/  
private void process(Application application, @Nonnull CheckPoints.CheckPoint checkPoint) {
    String jobID = application.getJobId();
    Long appId = application.getId();
    CheckPointStatus status = checkPoint.getCheckPointStatus();
    CheckPointKey checkPointKey = new CheckPointKey(appId, jobID, checkPoint.getId());

    if (CheckPointStatus.COMPLETED.equals(status)) { // 如果CheckPoint状态为已完成,保存CheckPoint
      if (checkSaveAsSavepoint(checkPointKey, checkPoint)) {
        savepointedCache.put(checkPointKey.getSavePointId(), DEFAULT_FLAG_BYTE);
        saveSavepoint(checkPoint, application.getId());
        flinkAppHttpWatcher.cleanSavepoint(application);
        return;
      }

      Long latestChkId = getLatestCheckpointId(appId, checkPointKey.getCheckPointId());
      if (checkSaveAsCheckpoint(checkPoint, latestChkId)) {
        checkPointCache.put(checkPointKey.getCheckPointId(), checkPoint.getId());
        saveSavepoint(checkPoint, application.getId());
      }
    }
    // 如果CheckPoint状态为失败,根据配置的条件去做告警或重启
    else if (shouldProcessFailedTrigger(checkPoint, application.cpFailedTrigger(), status)) { 
      Counter counter = checkPointFailedCache.get(appId);
      if (counter == null) {
        checkPointFailedCache.put(appId, new Counter(checkPoint.getTriggerTimestamp()));
      } else {
        long minute = counter.getDuration(checkPoint.getTriggerTimestamp());
        if (minute <= application.getCpFailureRateInterval()
            && counter.getCount() >= application.getCpMaxFailureInterval()) { // 如果失败超过特定配置的次数
          checkPointFailedCache.remove(appId);
          FailoverStrategy failoverStrategy = FailoverStrategy.of(application.getCpFailureAction());
          if (failoverStrategy == null) { // 失败策略为:重启或告警
            throw new IllegalArgumentException(
                "Unexpected cpFailureAction: " + application.getCpFailureAction());
          }
          switch (failoverStrategy) { 
            case ALERT: // 告警
              alertService.alert(application, CheckPointStatus.FAILED);
              break;
            case RESTART: // 重启
              try {
                applicationService.restart(application);
              } catch (Exception e) {
                throw new RuntimeException(e);
              }
              break;
            default:
              // do nothing
              break;
          }
        } else {
          counter.increment();
        }
      }
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

5.8 项目构建

Project模块执行项目构建:

在这里插入图片描述

5.8.1 流程图

相关的流程图如下:

在这里插入图片描述

描述:

  • 项目构建的入口是ProjectControllerbuild方法,使用ProjectServicebuild方法进行构建;
  • 启动一个ProjectBuildTask任务进行构建,构建最大时长为20分钟;该任务继承AbstractLogFileTask,里面初始化了logger,日志放到日志文件夹,例如:/tmp/streampark/logs/build_logs/100000/build.log
  • 构建的逻辑:clone源码 →maven build(maven构建) → deploy(发布)
  • 上述步骤使用到了Git工具类和Command命令工具类。

5.8.2 代码分析

构建服务:

// 构建项目
public void build(Long id) throws Exception {
    Long currentBuildCount = this.baseMapper.getBuildingCount();

    // 限制最大构建数量
    ApiAlertException.throwIfTrue(
        maxProjectBuildNum > -1 && currentBuildCount > maxProjectBuildNum,
        String.format(
            "The number of running Build projects exceeds the maximum number: %d of max-build-num",
            maxProjectBuildNum));

    Project project = getById(id);
    this.baseMapper.updateBuildState(project.getId(), BuildState.BUILDING.get());
    
    // 获取构建日志路径
    String logPath = getBuildLogPath(id);
    
    // 初始化构建日志任务
    ProjectBuildTask projectBuildTask =
        new ProjectBuildTask(
            logPath,
            project,
            buildState -> {
              baseMapper.updateBuildState(id, buildState.get());
              if (buildState == BuildState.SUCCESSFUL) {
                baseMapper.updateBuildTime(id);
              }
              flinkAppHttpWatcher.initialize();
            },
            fileLogger -> {
              List<Application> applications =
                  this.applicationService.getByProjectId(project.getId());
              applications.forEach(
                  (app) -> {
                    fileLogger.info(
                        "update deploy by project: {}, appName:{}",
                        project.getName(),
                        app.getJobName());
                    app.setRelease(ReleaseState.NEED_RELEASE.get());
                    app.setBuild(true);
                    this.applicationService.updateRelease(app);
                  });
              flinkAppHttpWatcher.initialize();
            });
    
    // 开始构建,构建超时时间20分钟
    CompletableFuture<Void> buildTask =
        CompletableFuture.runAsync(projectBuildTask, projectBuildExecutor);
    // TODO May need to define parameters to set the build timeout in the future.
    CompletableFutureUtils.runTimeout(buildTask, 20, TimeUnit.MINUTES);
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

构建核心逻辑:

  @Override
  protected void doRun() throws Throwable {
    log.info("Project {} start build", project.getName());
    fileLogger.info(project.getLog4BuildStart());
    // 克隆源码
    boolean cloneSuccess = cloneSourceCode(project);
    if (!cloneSuccess) {
      fileLogger.error("[StreamPark] clone or pull error.");
      stateUpdateConsumer.accept(BuildState.FAILED);
      return;
    }
    // 项目构建
    boolean build = projectBuild(project);
    if (!build) {
      stateUpdateConsumer.accept(BuildState.FAILED);
      fileLogger.error("build error, project name: {} ", project.getName());
      return;
    }
    stateUpdateConsumer.accept(BuildState.SUCCESSFUL);
    // 项目发布
    this.deploy(project);
    notifyReleaseConsumer.accept(fileLogger);
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

5.9 核心注解

StreamPark的核心注解在src/main/java/org/apache/streampark/console/core/annotation目录,截图如下:

在这里插入图片描述

其中:

  • ApiAccess注解:主要是判断能否直接curl进行调用,有该注解的方法才能被外部调用。
  • AppUpdated注解:如果该注解被方法声明时,需要重新初始化作业监听(即:flinkAppHttpWatcher.initialize())。
  • PermissionScope注解:被改注解声明的方法,需要校验当前用户是否有权限操作(RBAC)。

我们可以通过ApplicationController和StreamParkAspect去理解这些注解。首先是ApplicationController的启动作业方法:

@Operation(
      summary = "Start application",
      tags = {ApiDocConstant.OPENAPI_TAG})
  @Parameters({
    @Parameter(
        name = "Authorization",
        description = "Access authorization token",
        in = ParameterIn.HEADER,
        required = true,
        schema = @Schema(implementation = String.class)),
    @Parameter(
        name = "id",
        description = "start app id",
        in = ParameterIn.QUERY,
        required = true,
        example = "100000",
        schema = @Schema(implementation = Long.class)),
    @Parameter(
        name = "teamId",
        description = "current user teamId",
        in = ParameterIn.QUERY,
        required = true,
        example = "100000",
        schema = @Schema(implementation = Long.class)),
    @Parameter(
        name = "savePointed",
        description = "restored app from the savepoint or latest checkpoint",
        in = ParameterIn.QUERY,
        example = "false",
        schema = @Schema(implementation = boolean.class, defaultValue = "false")),
    @Parameter(
        name = "savePoint",
        description = "savepoint or checkpoint path",
        in = ParameterIn.QUERY,
        required = false,
        schema = @Schema(implementation = String.class)),
    @Parameter(
        name = "allowNonRestored",
        description = "ignore savepoint if cannot be restored",
        in = ParameterIn.QUERY,
        schema = @Schema(implementation = boolean.class, defaultValue = "false"))
  })
  @ApiAccess // 可以通过外部cURL直接访问
  @PermissionScope(app = "#app.id", team = "#app.teamId") // 权限注解
  @PostMapping(value = "start")
  @RequiresPermissions("app:start")
  public RestResponse start(@Parameter(hidden = true) Application app) {
    try {
      applicationService.checkEnv(app);
      applicationService.start(app, false);
      return RestResponse.success(true);
    } catch (Exception e) {
      return RestResponse.success(false).message(e.getMessage());
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

针对这些注解进行的操作均在StreamParkAspect切面进行拦截操作:

@Slf4j
@Component
@Aspect
public class StreamParkAspect {

  @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
  @Autowired private ServiceHelper serviceHelper;
  @Autowired private MemberService memberService;
  @Autowired private ApplicationService applicationService;

  // --------------apiAccess注解相关操作--------------
  @Pointcut(
      "execution(public"
          + " org.apache.streampark.console.base.domain.RestResponse"
          + " org.apache.streampark.console.*.controller.*.*(..))")
  public void apiAccess() {}

  @SuppressWarnings("checkstyle:SimplifyBooleanExpression")
  @Around(value = "apiAccess()")
  public RestResponse apiAccess(ProceedingJoinPoint joinPoint) throws Throwable {
    MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
    log.debug("restResponse aspect, method:{}", methodSignature.getName());
    Boolean isApi =
        (Boolean) SecurityUtils.getSubject().getSession().getAttribute(AccessToken.IS_API_TOKEN);
    if (isApi != null && isApi) {
      ApiAccess apiAccess = methodSignature.getMethod().getAnnotation(ApiAccess.class);
      if (apiAccess == null) {
        throw new ApiAlertException("current api unsupported!");
      }
    }
    return (RestResponse) joinPoint.proceed();
  }

   // --------------AppUpdated注解相关操作--------------
  @Pointcut("@annotation(org.apache.streampark.console.core.annotation.AppUpdated)")
  public void appUpdated() {}

  @Around("appUpdated()")
  public Object appUpdated(ProceedingJoinPoint joinPoint) throws Throwable {
    MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
    log.debug("appUpdated aspect, method:{}", methodSignature.getName());
    Object target = joinPoint.proceed();
    flinkAppHttpWatcher.initialize();
    return target;
  }

   // --------------PermissionScope注解相关操作--------------
  @Pointcut("@annotation(org.apache.streampark.console.core.annotation.PermissionScope)")
  public void permissionAction() {}

  @Around("permissionAction()")
  public RestResponse permissionAction(ProceedingJoinPoint joinPoint) throws Throwable {
    MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
    PermissionScope permissionScope =
        methodSignature.getMethod().getAnnotation(PermissionScope.class);

    User currentUser = serviceHelper.getLoginUser();
    ApiAlertException.throwIfNull(currentUser, "Permission denied, please login first.");

    boolean isAdmin = currentUser.getUserType() == UserType.ADMIN;

    if (!isAdmin) {
      // 1) check userId
      Long userId = getId(joinPoint, methodSignature, permissionScope.user());
      ApiAlertException.throwIfTrue(
          userId != null && !currentUser.getUserId().equals(userId),
          "Permission denied, operations can only be performed with the permissions of the currently logged-in user.");

      // 2) check team
      Long teamId = getId(joinPoint, methodSignature, permissionScope.team());
      if (teamId != null) {
        Member member = memberService.findByUserName(teamId, currentUser.getUsername());
        ApiAlertException.throwIfTrue(
            member == null,
            "Permission denied, only members of this team can access this permission");
      }

      // 3) check app
      Long appId = getId(joinPoint, methodSignature, permissionScope.app());
      if (appId != null) {
        Application app = applicationService.getById(appId);
        ApiAlertException.throwIfTrue(app == null, "Invalid operation, application is null");
        if (!currentUser.getUserId().equals(app.getUserId())) {
          Member member = memberService.findByUserName(app.getTeamId(), currentUser.getUsername());
          ApiAlertException.throwIfTrue(
              member == null,
              "Permission denied, this job not created by the current user, And the job cannot be found in the current user's team.");
        }
      }
    }

    return (RestResponse) joinPoint.proceed();
  }
  // ......
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95

5.10 目录

最后,StreamPark跑了一段时间,在配置文件指定的workspace可以看到相关的目录,目录层次如下:

├── backups   ## 备份目录
├── client    ## streampark执行程序(streampark-flink-sqlclient)
├── dist      ## 项目构建目录(构建成功)
│   └── 100000
│       └── quickstart-flink-1.2.3
├── flink     ## flink 版本
│   └── flink-1.19.0
├── jars  
├── logs      ## 构建日志
│   └── build_logs
│       └── 100000 ## 应用ID
├── plugins   ## 插件
├── project   ## 项目构建目录
│   └── 100000
│       └── streamx-quickstart-main
├── savepoints  ## savepoints元数据存储目录
├── shims       ## 不同的flink版本(作业提交时代理获取不同flink版本的类加载器)
│   ├── flink-1.12
│   ├── flink-1.xxx.....
│   └── flink-1.19
├── uploads     ## 上传目录
└── workspace   ## 工作空间
    ├── 100013  ## 应用ID
    │   └── k8s-application-jar@streampark
    └── 100015  ## 应用ID
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

这是本地的截图:

在这里插入图片描述

当然,如果yarn模式,对应的hdfs也会有相应的目录:

在这里插入图片描述

对应的代码用到了不同的文件存储Operator,目前文件存储类型,StreamPark仅支持如下的两种:

// 存储类型
public enum StorageType implements Serializable {

  /** hdfs */
  HDFS("hdfs"),

  /**本地文件系统 */
  LFS("lfs");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

使用FsOperator来初始化对象存储工具类:

object FsOperator {

  lazy val lfs: FsOperator = FsOperator.of(StorageType.LFS)

  lazy val hdfs: FsOperator = FsOperator.of(StorageType.HDFS)

  def of(storageType: StorageType): FsOperator = {
    storageType match {
      case StorageType.HDFS => HdfsOperator
      case StorageType.LFS => LfsOperator
      case _ => throw new UnsupportedOperationException(s"Unsupported storageType:$storageType")
    }
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

6. Q&A

最后,以下是我在调试过程可能会遇到的问题,这里做个记录,或许能帮助到大家。

6.1 Unmet peer dependencies

备注:为了避免历史依赖问题,我这边直接把maven关联的repo清空了,让maven重新下载依赖。

问题描述:执行maven reload project后,IDEA直接执行上图操作进行打包,出现如下错误:

[INFO]  ERR_PNPM_PEER_DEP_ISSUES  Unmet peer dependencies
[INFO] 
[INFO] .
[INFO] └─┬ @antfu/eslint-config
[INFO]   ├─┬ eslint-processor-vue-blocks
[INFO]   │ └── ✕ missing peer @vue/compiler-sfc@^3.3.0
[INFO]   ├─┬ @stylistic/eslint-plugin
[INFO]   │ └─┬ @stylistic/eslint-plugin-plus
[INFO]   │   └─┬ @typescript-eslint/utils
[INFO]   │     └── ✕ unmet peer eslint@^8.56.0: found 9.4.0
[INFO]   ├─┬ @typescript-eslint/eslint-plugin
[INFO]   │ ├── ✕ unmet peer eslint@^8.56.0: found 9.4.0
[INFO]   │ └─┬ @typescript-eslint/type-utils
[INFO]   │   └── ✕ unmet peer eslint@^8.56.0: found 9.4.0
[INFO]   ├─┬ @typescript-eslint/parser
[INFO]   │ └── ✕ unmet peer eslint@^8.56.0: found 9.4.0
[INFO]   └─┬ eslint-plugin-unused-imports
[INFO]     └── ✕ unmet peer eslint@8: found 9.4.0
[INFO] Peer dependencies that should be installed:
[INFO]   @vue/compiler-sfc@^3.3.0
[INFO] 
[INFO] hint: If you want peer dependencies to be automatically installed, add "auto-install-peers=true" to an .npmrc file at the root of your project.
[INFO] hint: If you don't want pnpm to fail on peer dependency issues, add "strict-peer-dependencies=false" to an .npmrc file at the root of your project.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

问题原因:主要是由于一些前端依赖项的版本不匹配或缺失导致的。

解决方式:升级pnpm版本或在streampark-console-webapp目录新建 .npmrc 文件。这里新建 .npmrc 文件,并添加如下内容:

auto-install-peers=true
strict-peer-dependencies=false
  • 1
  • 2

6.2 TypeError

问题描述:编译,出现如下错误:

vite build

error during build:
TypeError: (0 , _presetWind.presetWind) is not a function
    at /Users/yanglinwei/Desktop/StreamPark/code/streampark-learning/streampark-console/streampark-console-webapp/node_modules/.pnpm/@unocss+preset-uno@0.60.4/node_modules/@unocss/preset-uno/dist/index.mjs:69:43
    at /Users/yanglinwei/Desktop/StreamPark/code/streampark-learning/streampark-console/streampark-console-webapp/uno.config.ts:32:25
.......
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

问题原因:错误表示在 uno.config.ts 文件中调用 presetWind 时出现了问题(第32行)。这可能是因为导入或使用 presetWind 的方式不正确。

解决方式:删掉uno.config.ts文件(这种做法有点粗暴)。对应官方issue:https://github.com/unocss/unocss/issues/3869

6.3 Could not load @purge-icons/generated

问题描述:编译,出现如下错误:

x Build failed in 1m 12s
error during build:
[purge-icons] Could not load @purge-icons/generated (imported by src/components/Icon/Icon.vue?vue&type=script&setup=true&lang.ts): Client network socket disconnected before secure TLS connection was established
Error: Client network socket disconnected before secure TLS connection was established
    at TLSSocket.onConnectEnd (node:_tls_wrap:1727:19)
    at TLSSocket.emit (node:events:531:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

问题原因:在构建过程中出现了与TLS连接相关的问题。这可能是由于网络问题导致的连接断开。试过换其它镜像源、cnpm也不行。

解决方式:修改Icon.vue,引入iconify

// 旧:import Iconify from '@purge-icons/generated';
import Iconify from '@iconify/iconify';
  • 1
  • 2

6.4 xx is not a member of package…

问题描述:启动StreamParkConsoleBootstrap,出现如下错误:

/Users/yanglinwei/Desktop/StreamPark/code/streampark-learning/streampark-common/src/main/scala/org/apache/streampark/common/util/JsonUtils.scala:19:30
object shaded is not a member of package org.apache.streampark
import org.apache.streampark.shaded.com.fasterxml.jackson.annotation.JsonInclude
  • 1
  • 2
  • 3

问题原因:现在读取的是streampark-shaded包下的内容,所以要把这个包的内容打进本地maven(因为编译打包后的路径前缀加了“org.apache.streampark.shaded”)才能正常读取。

在这里插入图片描述

解决方式:IDEA打包streampark-shaded进本地仓库,然后去除profile的为shaded的勾选,截图如下:

在这里插入图片描述

注意:操作之后,需要maven reload project(或unlink maven),如果还有问题,需要关闭IDEA,删除项目的.idea,重新导入即可。注意导入之后,相当于新的项目,需要配置JDK、Scala和maven。

后续出现:is not a member of package之类的错误,都可以使用mvn clean install的方式打进仓库。

6.5 system initialization check failed.

问题描述:启动StreamParkConsoleBootstrap,出现如下错误:

Exception in thread "main" java.lang.ExceptionInInitializerError: [StreamPark] The system initialization check failed. If started local for development and debugging, please ensure the -Dapp.home parameter is clearly specified, more detail: https://streampark.apache.org/docs/user-guide/deployment
	at org.apache.streampark.console.base.config.SpringProperties.getUserConfig(SpringProperties.java:129)
	at org.apache.streampark.console.base.config.SpringProperties.get(SpringProperties.java:50)
	at org.apache.streampark.console.StreamParkConsoleBootstrap.main(StreamParkConsoleBootstrap.java:54)
  • 1
  • 2
  • 3
  • 4

问题原因:是因为VM Options没有配置app.home变量。

解决方式:参考https://streampark.apache.org/zh-CN/docs/user-guide/development

-Dapp.home=路径为项目dist下目录压缩包的解压目录/apache-streampark_2.12-2.1.4-incubating-bin
  • 1

6.6 Client network socket disconnected

问题描述:浏览器访问前端也面,一直转圈打不开,连接超时

在这里插入图片描述

问题原因:前端引入的资源,无法访问(可能需要科学上网),

解决方式:修改Icon.vue,引入iconify

// 旧:import Iconify from '@purge-icons/generated';
import Iconify from '@iconify/iconify';
  • 1
  • 2

6.7 InaccessibleObjectException

问题描述:启动yarn Session集群报了如下错误:

Detail exception: 
org.apache.streampark.console.base.exception.ApiDetailException: Detail exception: 
deploy cluster yarn-flink-cluster-1.19.0failed, exception:
java.lang.reflect.InaccessibleObjectException: Unable to make field final jdk.internal.loader.URLClassPath jdk.internal.loader.ClassLoaders$AppClassLoader.ucp accessible: module java.base does not "opens jdk.internal.loader" to unnamed module @c24ae30
	at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:340)
	at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:280)
	at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:176)
	at java.base/java.lang.reflect.Field.setAccessible(Field.java:170)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

问题原因:这个错误通常发生在尝试通过反射修改JDK内部API时,而这些API在Java 9及以后的版本中可能由于模块系统的限制而不可用。

解决方式:IDEA修改VM Options配置,添加:--add-opens java.base/jdk.internal.loader=ALL-UNNAMED,完整修改如下:

-Dapp.home=/Users/yanglinwei/Desktop/StreamPark/code/incubator-streampark/dist/apache-streampark_2.12-2.1.5-incubating-bin --add-opens java.base/jdk.internal.loader=ALL-UNNAMED 
  • 1

在这里插入图片描述

6.8 Could not find or load main class …YarnApplicationClusterEntryPoint

问题描述:使用yarn application模式、flink1.19.0版本,报了如下错误:

...
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn Application Cluster
	at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:547)
	at org.apache.streampark.flink.client.impl.YarnApplicationClient$$anon$1.call(YarnApplicationClient.scala:120)
	at org.apache.streampark.flink.client.impl.YarnApplicationClient$$anon$1.call(YarnApplicationClient.scala:101)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
	... 24 more
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1715312146458_1346 failed 1 times (global limit =2; local limit is =1) due to AM Container for appattempt_1715312146458_1346_000001 exited with  exitCode: 1
Failing this attempt.Diagnostics: [2024-06-07 09:45:01.231]Exception from container-launch.
Container id: container_1715312146458_1346_01_000001
Exit code: 1

[2024-06-07 09:45:01.235]Container exited with a non-zero exit code 1. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :

[2024-06-07 09:45:01.237]Container exited with a non-zero exit code 1. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :

For more detailed output, check the application tracking page: http://node-10-194-186-216:8088/cluster/app/application_1715312146458_1346 Then click on links to logs of each attempt.
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1715312146458_1346
	at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1305)
	at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:692)
	at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:540)
	... 29 more

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

问题原因:保存成功之后,返回到列表页,点击上线按钮没响应反馈,可能已经在上线中了,只有再次点击(或用户存在暴力点击),有可能是这个问题导致的。

错误详情:

在这里插入图片描述

解决方式:删除hdfs的/streampark/flink目录的flink-1.19.0版本,然后重新上线(不要连续点)。

## 需要在把环境变量文件bash_profile的HADOOP_USER_NAME改为hdfs,然后source生效
vi ~/.bash_profile
source ~/.bash_profile

## 删除目录命令
hdfs dfs -rm -r /streampark/flink/flink-1.19.0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

对应hdfs目录如下:

在这里插入图片描述

6.9 ClassCastException

问题描述:使用yarn PerJob模式、flink1.19.0版本,报了如下错误(使用1.13.6版本不会有问题):

java.util.concurrent.CompletionException: java.lang.reflect.InvocationTargetException
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1702)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.reflect.InvocationTargetException
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.streampark.flink.client.FlinkClient$.$anonfun$proxy$1(FlinkClient.scala:87)
	at org.apache.streampark.flink.proxy.FlinkShimsProxy$.$anonfun$proxy$1(FlinkShimsProxy.scala:60)
	at org.apache.streampark.common.util.ClassLoaderUtils$.runAsClassLoader(ClassLoaderUtils.scala:38)
	at org.apache.streampark.flink.proxy.FlinkShimsProxy$.proxy(FlinkShimsProxy.scala:60)
	at org.apache.streampark.flink.client.FlinkClient$.proxy(FlinkClient.scala:82)
	at org.apache.streampark.flink.client.FlinkClient$.submit(FlinkClient.scala:53)
	at org.apache.streampark.flink.client.FlinkClient.submit(FlinkClient.scala)
	at org.apache.streampark.console.core.service.impl.ApplicationServiceImpl.lambda$start$8(ApplicationServiceImpl.java:1658)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1700)
	... 4 more
Caused by: java.lang.ClassCastException: class java.io.File cannot be cast to class org.apache.hadoop.fs.Path (java.io.File is in module java.base of loader 'bootstrap'; org.apache.hadoop.fs.Path is in unnamed module of loader 'app')
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	at java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1812)
	at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
	at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
	at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:528)
	at org.apache.flink.yarn.YarnClusterDescriptor.isUsrLibDirIncludedInShipFiles(YarnClusterDescriptor.java:1913)
	at org.apache.flink.yarn.YarnClusterDescriptor.addShipFiles(YarnClusterDescriptor.java:349)
	at org.apache.streampark.flink.client.impl.YarnPerJobClient$.doSubmit(YarnPerJobClient.scala:74)
	at org.apache.streampark.flink.client.trait.FlinkClientTrait.submit(FlinkClientTrait.scala:123)
	at org.apache.streampark.flink.client.trait.FlinkClientTrait.submit$(FlinkClientTrait.scala:60)
	at org.apache.streampark.flink.client.impl.YarnPerJobClient$.submit(YarnPerJobClient.scala:40)
	at org.apache.streampark.flink.client.FlinkClientHandler$.submit(FlinkClientHandler.scala:40)
	at org.apache.streampark.flink.client.FlinkClientHandler.submit(FlinkClientHandler.scala)
	... 17 more

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

问题原因:类转换异常,HADOOP版本问题?

可能解决方式一:HADOOP本地客户端版本保持与服务器的一致:

在这里插入图片描述

下载地址:https://mirrors.huaweicloud.com/apache/hadoop/common/

似乎没生效,有可能是yarn perjob模式不兼容flink1.19.0版本

6.10 Could not get the rest endpoint of flink-cluster

问题描述:使用kubernetes session 方式提交作业,提示错误:

java.lang.RuntimeException: org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the rest endpoint of flink-cluster
	at org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$1(KubernetesClusterDescriptor.java:114)
	at org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:157)
	at org.apache.streampark.flink.client.impl.KubernetesNativeSessionClient$.jobGraphSubmit(KubernetesNativeSessionClient.scala:108)
	at org.apache.streampark.flink.client.impl.KubernetesNativeSessionClient$.$anonfun$doSubmit$2(KubernetesNativeSessionClient.scala:58)
	at org.apache.streampark.flink.client.trait.FlinkClientTrait.$anonfun$trySubmit$1(FlinkClientTrait.scala:207)
	at scala.util.Try$.apply(Try.scala:209)
	at org.apache.streampark.flink.client.trait.FlinkClientTrait.trySubmit(FlinkClientTrait.scala:205)
	at org.apache.streampark.flink.client.trait.FlinkClientTrait.trySubmit$(FlinkClientTrait.scala:201)
	at org.apache.streampark.flink.client.impl.KubernetesNativeSessionClient$.doSubmit(KubernetesNativeSessionClient.scala:59)
	at org.apache.streampark.flink.client.trait.FlinkClientTrait.submit(FlinkClientTrait.scala:123)
	at org.apache.streampark.flink.client.trait.FlinkClientTrait.submit$(FlinkClientTrait.scala:60)
	at org.apache.streampark.flink.client.impl.KubernetesNativeSessionClient$.submit(KubernetesNativeSessionClient.scala:46)
	at org.apache.streampark.flink.client.FlinkClientHandler$.submit(FlinkClientHandler.scala:40)
	at org.apache.streampark.flink.client.FlinkClientHandler.submit(FlinkClientHandler.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.streampark.flink.client.FlinkClient$.$anonfun$proxy$1(FlinkClient.scala:87)
	at org.apache.streampark.flink.proxy.FlinkShimsProxy$.$anonfun$proxy$1(FlinkShimsProxy.scala:60)
	at org.apache.streampark.common.util.ClassLoaderUtils$.runAsClassLoader(ClassLoaderUtils.scala:38)
	at org.apache.streampark.flink.proxy.FlinkShimsProxy$.proxy(FlinkShimsProxy.scala:60)
	at org.apache.streampark.flink.client.FlinkClient$.proxy(FlinkClient.scala:82)
	at org.apache.streampark.flink.client.FlinkClient$.submit(FlinkClient.scala:53)
	at org.apache.streampark.flink.client.FlinkClient.submit(FlinkClient.scala)
	at org.apache.streampark.console.core.service.impl.ApplicationServiceImpl.lambda$start$8(ApplicationServiceImpl.java:1658)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1700)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the rest endpoint of flink-cluster
	... 31 more
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

问题原因:可能读取的是“~/.kube/config”里面配置文件的内容,而不是集群绑定的kube config

解决方式:直接替换~/.kube/config文件就成功了。

7. 文末

本文的篇幅比较长,总字数约为8.5w字,都是博主经过实践一步一步整理出来的(如下截图),满满的干货。最后希望能帮助到大家,谢谢大家的阅读,本文完!
在这里插入图片描述

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号