当前位置:   article > 正文

flink 作业提交流程_yarnsessionclusterexecutor

yarnsessionclusterexecutor

之前给大家介绍了DataStream API中 Environment 和 Transformation 连个体系的源代码,今天来了小插曲,给大家宏观介绍下 Flink 作业的提交流程,希望对大家有帮助。

一、DataStream 作业提交流程

1)、首先,先给大家展示下流程图:
在这里插入图片描述

2)、提交流程说明:

FlinkCli 先创建一个 Flink 环境变量

然后将环境变量存入到ThreadLocal中

在启动 Flink 作业jar包的 main 方法

Flink 应用程序通过 StreamExecutionEnvironment.getExecutionEnvironment() 获取到相应的执行环境变量

Flink 应用程序将用户编写的作业转换成 jobGraph 提交给Flink 集群

3)、Flink 作业以哪种方式提交,取决于 StreamExecutionEnvironment 的配置信息;

起到主要作用的配置参数是 execution.target;

execution.target 取值:

remote

local

yarn-per-job

yarn-session

kubernetes-session

yarn-application

kubernetes-application

StreamExecutionEnvironment 会根据 execution.target 配置的不同取值创建相应的 PipelineExecutorFactory, 再由 PipelineExecutorFactory 创建相应的 PipelineExecutor, PipelineExecutor执行相应的作业提交工作;

源代码探究:

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute()

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(String jobName)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamGraph streamGraph)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamGraph streamGraph)

org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(final Configuration configuration) (见 代码 3-1)

ExecutorFactory 举例,org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory,(见代码 3-2)

代码 3-1

@Override
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
checkNotNull(configuration);

// 通过 java SPI 技术加载 实现了 PipelineExecutorFactory 接口的类
final ServiceLoader loader =
ServiceLoader.load(PipelineExecutorFactory.class);

final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
final Iterator<PipelineExecutorFactory> factories = loader.iterator();
while (factories.hasNext()) {
    try {
        final PipelineExecutorFactory factory = factories.next();
        // 根据 execution.target 的取值 过滤出匹配到的 PipelineExecutorFactory 
        if (factory != null && factory.isCompatibleWith(configuration)) {
            compatibleFactories.add(factory);
        }
    } catch (Throwable e) {}
}

if (compatibleFactories.size() > 1) {}
if (compatibleFactories.isEmpty()) {}
return compatibleFactories.get(0);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

}

代码 3-2

@Internal
public class YarnSessionClusterExecutorFactory implements PipelineExecutorFactory {

@Override
public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
    return YarnSessionClusterExecutor.NAME.equalsIgnoreCase(
            configuration.get(DeploymentOptions.TARGET));
}
  • 1
  • 2
  • 3
  • 4
  • 5

}

// 配置选项
public static final ConfigOption TARGET =
key(“execution.target”)

4)、FlinkCli 创建 Flink 环境变量相关流程:

org.apache.flink.client.cli.CliFrontend.main()

org.apache.flink.client.cli.CliFrontend.executeProgram()

org.apache.flink.client.ClientUtils.executeProgram()

public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);

    LOG.info(
            "Starting program (detached: {})",
            !configuration.getBoolean(DeploymentOptions.ATTACHED));

    ContextEnvironment.setAsContext(
            executorServiceLoader,
            configuration,
            userCodeClassLoader,
            enforceSingleJobExecution,
            suppressSysout);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

// 设置流环境变量
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);

    try {
        // 启动用户程序的main方法
        program.invokeInteractiveModeForExecution();
    } finally {
        ContextEnvironment.unsetAsContext();
        StreamContextEnvironment.unsetAsContext();
    }
} finally {
    Thread.currentThread().setContextClassLoader(contextClassLoader);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

}

5)、StreamExecutionEnvironment.getExecutionEnvironment() 获取执行环境的逻辑:

先从 threadLocal 获取环境变量

如果 threadLocal 中没有相应的环境变量,则创建一个本地环境变量

return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
.orElseGet(StreamExecutionEnvironment::createLocalEnvironment);

public static Optional resolveFactory(ThreadLocal threadLocalFactory, @Nullable T staticFactory) {
final T localFactory = threadLocalFactory.get();
final T factory = localFactory == null ? staticFactory : localFactory;

return Optional.ofNullable(factory);
}

二、Flink Table

1)、flink Sql 作业提交流程
在这里插入图片描述

2)、提交流程说明

TableEnvironmentImpl 在创建的过程中创建了 Executor , ExecutorBase 中包含了StreamExecutionEnvironment 的实例, StreamExecutionEnvironment 的实例由 StreamExecutionEnvironment .getExecutionEnvironment() 方法创建。

TableEnvironmentImpl 作业的提交依赖 StreamExecutionEnvironment 的作业提交流程。

TableEnvironmentImpl 借助Parser组件将 SQL 语句转换成 Operation,然后借助 Planner组件将Operation转换成 List。

使用StreamExecutionEnvironment 将 List 转换成 StreamGraph。

后续操作与DataStream提交流程一样。

3)、 TableEnvironmentImpl .executeSql() 执行逻辑:

Sql 解析, 将Sql语句解析为 List 变量;

Transformation转换,将 List 转换为 List<Transformation<?>>

PipeLine转换, 将List<Transformation<?>> 转换为 PipeLine

4)、TableEnvironmentImpl 创建过程:

ModuleManager 的创建

CatalogManager 的创建

FunctionCatalog 的创建

Executor (执行环境)的创建, 先通过 java SPI 加载 Executor 工厂, 通过EnvironmentSettings.Builder.useBlinkPlanner() 指定为 org.apache.flink.table.planner.delegation.BlinkExecutorFactory

Planner的创建(包括Parser的构造),先通过 java SPI 加载 Planner 工厂,通过EnvironmentSettings.Builder.useBlinkPlanner() 指定为org.apache.flink.table.planner.delegation.BlinkPlannerFactory

构造TableEnvironmentImpl

5)、Sql解析 (Blink Planner: StreamPlanner / BatchPlanner)

基本流程:

Sql语句解析成Sql 抽象语法树

Planner对sql 语法树进行验证

将验证过的语法树转换成关系代数树

将关系代数树封装成Flink对应的Operation

public List parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
SqlNode parsed = parser.parse(statement);

Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);

Calsite :Sql 解析框架

SqlNode 代表Sql 抽象语法树中的节点,CalciteParser 内部使用 SqlParser 将Sql语句解析成Sql 抽象语法树。

Operation (Flink Table API中抽象出来的概念) 代表任意类型的Sql操作行为,例如 Select 、Insert、Drop 等sql操作可以表示为QueryOperation、CatalogSinkModifyOperation、DropOperation。FlinkPlannerImpl内部使用 Calsite 的 SqlToRelConverter 将验证后的抽象语法树转换成关系代数树。

6)、Operation 转换为 Transformation 逻辑 (Blink Planner : StreamPlanner / BatchPlanner)

基本流程:

从Operation中 获取到 关系代数树

根据优化规则优化关系代数树

生成物理执行计划

将物理执行计划转换成 List<Transformation<?>>

override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[]] = {
validateAndOverrideConfiguration()
if (modifyOperations.isEmpty) {
return List.empty[Transformation[
]]
}

val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
val transformations = translateToPlan(execGraph)
cleanupInternalConfigurations()
transformations
}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/794370
推荐阅读
相关标签
  

闽ICP备14008679号