当前位置:   article > 正文

Flink运行启动脚本解析_flink 脚本

flink 脚本

概述:

flink运行时包含ResourceManager、JobManager、TaskManager、Dispatcher等组件

一、JobManager

JobManager是Flink集群任务管理和调度的核心,是控制应用执行的主进程,一个applicationId 都有与之对应的JobManager,JobManager 又包含四个组件JobMaster、ResourceManager、Dispatcher、WebmonitorEndPoint.
Standalone

1.1 JobMaster

JobMaster是JobManager中最核心的组件,负责处理单独的作业(job),所以JobMaster和具体的Job是一一对应的,多个job可以同时运行在一个FLink集群中,每个job都有自己的JobMaster。
在作业提交的时候,JobMaster会先接收到要执行的应用。JobMaster会把JobGraph转换成一个物理执行图。包含了并发执行的任务。JobMaster会向资源管理器ResourceManager发出请求。申请执行任务所需要的资源,然后将执行图分发给TaskManager去真正的执行。在运行过程中,JobMaster会负责所有的中央协调操作,比如检查点checkpoint的执行。

1.2 资源管理器ResourceManager

ResourceManager主要负责资源的分配和管理,在flink集群中只有一个所谓的资源主要是指TaskManager的任务槽(task slots)。任务槽是Flink资源调配单位,包含了机器来执行计算的一组CPU和内存资源。每一个任务Task都需要分配到一个slot上执行。这里所说的ResourceManager是FLink内置的ResourceManager而不是Yarn的ResourceManager。

1.3 分发器Dispatcher

1、负责接收用户提交的JobGraph,然后启动一个JobMaster,类似于Yarn中的AppMaster。
2、内有一个持久化服务:JobGraphStore,负责存储JobGraph,当构建执行图或者物理图时主节点宕机并恢复,则可以从这里重新拉取作业JobGraph
3、WebMonitorEndpoint:提供Rest服务,内部有一个Netty服务,客户端的所有请求都由该组件接收处理。
当客户端Client提交一个Job到集群时(Client会把Job构建成一个JobGraph),主节点接收到提交的job的Rest请求,WebMonitorEndpoint会通过Router进行解析找到对应的Handler来执行处理。处理完毕后交由Dispatcher,DIspatcher负责拉起JobMaster来负责这个job内部的Task的部署执行,执行Task所需要的资源,JobMaster像
ResourceManager申请资源。

1.4 任务管理器TaskManager

TaskManager是Flink中的工作进程,数据流的具体计算就是它来做的。Flink集群中必须至少有一个TaskManager,每个TaskManager都包含了一定数量的任务槽task slots。slot是资源调度的最小单位,slot的数量限制了TaskManger能够并行处理的任务数量。启动之后TaskManager会向资源管理器注册它的slot;收到资源管理的指令后,TaskManager就会将一个或者多个槽位提供给JobMaster调用,JobMaster就可以分配任务执行。在执行过程中,TaskManager可以缓冲数据,还可以跟其他运行同一应用的TaskManager交换数据。

二、Flink 之per-job脚本启动解析 版本号 1.18

在flink-dist包下的flink-bin/bin 目录下 进入flink.sh

#!/usr/bin/env bash
target="$0"

iteration=0
while [ -L "$target" ]; do
    if [ "$iteration" -gt 100 ]; then
        echo "Cannot resolve path: You have a cyclic symlink in $target."
        break
    fi
    ls=`ls -ld -- "$target"`
    target=`expr "$ls" : '.* -> \(.*\)$'`
    iteration=$((iteration + 1))
done

# Convert relative path to absolute path
bin=`dirname "$target"`

# get flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

CC_CLASSPATH=`constructFlinkClassPath`

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)

# Add Client-specific JVM options
FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}"

# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

执行java-cp就会开启虚拟机,在虚拟机上开启CliFronted进程,并且执行main方法,这里使用的是java -classpath指定类运行锁依赖的其他类的路径
说明 java -cp —>开启JVM虚拟机—>开启Process(CliFrontend)—>程序入口CliFrontend.main()
FLink提交任务的入口类为CliFrontend 在flink-clients模块下

    /** Submits the job based on the arguments. */
    // 根据args参数提交作业
    public static void main(final String[] args) {
    	private static final int INITIAL_RET_CODE = 31;
        int retCode = INITIAL_RET_CODE;
        try {
        	// 进入mainInternal()方法
            retCode = mainInternal(args);
        } finally {
            System.exit(retCode);
        }
    }
    
    static int mainInternal(final String[] args) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

        // 1. find the configuration directory
        final String configurationDirectory = getConfigurationDirectoryFromEnv();

        // 2. load the global configuration
        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);

        // 3. load the custom command lines
        final List<CustomCommandLine> customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);

        int retCode = INITIAL_RET_CODE;
        try {
        // CliFrontend客户端实例化
            final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
            CommandLine commandLine =
                    cli.getCommandLine(
                            new Options(),
                            Arrays.copyOfRange(args, min(args.length, 1), args.length),
                            true);
            Configuration securityConfig = new Configuration(cli.configuration);
            DynamicPropertiesUtil.encodeDynamicProperties(commandLine, securityConfig);
            SecurityUtils.install(new SecurityConfiguration(securityConfig));
            // cli.parseAndRun()进行flink自定义参数的解析,并进行application的提交
            retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
        } catch (Throwable t) {
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("Fatal error while running command line interface.", strippedThrowable);
            strippedThrowable.printStackTrace();
        }
        return retCode;
    }

// 先对CliFrontend进行实例化
    public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) {
        this(configuration, new DefaultClusterClientServiceLoader(), customCommandLines);
    }

    public CliFrontend(
            Configuration configuration,
            ClusterClientServiceLoader clusterClientServiceLoader,
            List<CustomCommandLine> customCommandLines) {
        this.configuration = checkNotNull(configuration);
        this.customCommandLines = checkNotNull(customCommandLines);
        this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);

        // 进行客户端的文件系统初始化。Flink的文件系统采用了插件的方式,以支持不同的文件系统
        FileSystem.initialize(
                configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

        this.customCommandLineOptions = new Options();

        // 添加zookeeperNamespace、jobmanager、-D传递的参数的key和value到customCommandLineOptions
        // 和GenericCli、FlinkYarnSessionCli的参数
        for (CustomCommandLine customCommandLine : customCommandLines) {
            customCommandLine.addGeneralOptions(customCommandLineOptions);
            customCommandLine.addRunOptions(customCommandLineOptions);
        }

        // 获取客户端的超时时间和flink application的默认并行度
        this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
        this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
    }



// 进入cli.parseAndRun()方法 查看commondline的具体解析 主要包含flink-conf-dir中的配置文件 以及flink run -t yarn-per-job -p 4 等命令行参数的解析
  public int parseAndRun(String[] args) {

        // check for action
        if (args.length < 1) {
        	// 参数打印输出 System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
            CliFrontendParser.printHelp(customCommandLines);
            System.out.println("Please specify an action.");
            return 1;
        }

        // get action flink run -t 
        String action = args[0];

        // remove action from parameters
        final String[] params = Arrays.copyOfRange(args, 1, args.length);

        try {
            // do action 这里进行模式匹配 run run-application list cancel stop 
            switch (action) {
                case ACTION_RUN:
                	// 进入run()方法
                    run(params);
                    return 0;
                case ACTION_RUN_APPLICATION:
                    runApplication(params);
                    return 0;
                case ACTION_LIST:
                    list(params);
                    return 0;
                case ACTION_INFO:
                    info(params);
                    return 0;
                case ACTION_CANCEL:
                    cancel(params);
                    return 0;
                case ACTION_STOP:
                    stop(params);
                    return 0;
                case ACTION_SAVEPOINT:
                    savepoint(params);
                    return 0;
                case "-h":
                case "--help":
                    CliFrontendParser.printHelp(customCommandLines);
                    return 0;
                case "-v":
                case "--version":
                    String version = EnvironmentInformation.getVersion();
                    String commitID = EnvironmentInformation.getRevisionInformation().commitId;
                    System.out.print("Version: " + version);
                    System.out.println(
                            commitID.equals(EnvironmentInformation.UNKNOWN)
                                    ? ""
                                    : ", Commit ID: " + commitID);
                    return 0;
                default:
                    System.out.printf("\"%s\" is not a valid action.\n", action);
                    System.out.println();
                    System.out.println(
                            "Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
                    System.out.println();
                    System.out.println(
                            "Specify the version option (-v or --version) to print Flink version.");
                    System.out.println();
                    System.out.println(
                            "Specify the help option (-h or --help) to get help on the command.");
                    return 1;
            }
        } catch (CliArgsException ce) {
            return handleArgException(ce);
        } catch (ProgramParametrizationException ppe) {
            return handleParametrizationException(ppe);
        } catch (ProgramMissingJobException pmje) {
            return handleMissingJobException();
        } catch (Exception e) {
            return handleError(e);
        }
    }

// 进入run()方法
    protected void run(String[] args) throws Exception {
        LOG.info("Running 'run' command.");
		// 获取默认运行的参数 help、verbose、fromSavepoint、allowNonRestoredState、restoreMode、jarfile、
        // class、classpath、parallelism、arguments、detached、shutdownOnAttachedExit、yarndetached、
        // python、pyFiles、pyModule、pyRequirements、pyArchives、pyExecutable、pyClientExecutable  大概这么多
        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        // 通过默认参数解析出只包含flink命令自定义的参数的commandLine 进入getCommandLine()
        // 先获取默认的flink运行参数,在解析出只包含flink命令自定义的参数的commadnLine
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);

        // evaluate help flag
        if (commandLine.hasOption(HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRun(customCommandLines);
            return;
        }

        final CustomCommandLine activeCommandLine =
                validateAndGetActiveCommandLine(checkNotNull(commandLine));

        final ProgramOptions programOptions = ProgramOptions.create(commandLine);

        final List<URL> jobJars = getJobJarAndDependencies(programOptions);

        final Configuration effectiveConfiguration =
                getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

        LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

        try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
            executeProgram(effectiveConfiguration, program);
        }
    }

// 先进入getCommandLine()方法
    public CommandLine getCommandLine(
    // 将flink默认的参数 和我们在CliFrontend构造函数创建的customCommandLineOPtions进行追加
    // customCommandLineOptions包含zookeeperNamespace、jobmabager、-D传递的参数的key和value 和 GenericCli、FlinkYarnSessionCli的参数
            final Options commandOptions, final String[] args, final boolean stopAtNonOptions)
            throws CliArgsException {
        final Options commandLineOptions =
        		
                CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
        // 通过flink的参数,解析出只包含flink命令自定义的参数的commandLine
        return CliFrontendParser.parse(commandLineOptions, args, stopAtNonOptions);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209

CliFrontendParser

public class CliFrontendParser {
	// 选项列表
    static final Option HELP_OPTION =
            new Option(
                    "h",
                    "help",
                    false,
                    "Show the help message for the CLI Frontend or the action.");

    static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");

    static final Option CLASS_OPTION =
            new Option(
                    "c",
                    "class",
                    true,
                    "Class with the program entry point (\"main()\" method). Only needed if the "
                            + "JAR file does not specify the class in its manifest.");

    static final Option CLASSPATH_OPTION =
            new Option(
                    "C",
                    "classpath",
                    true,
                    "Adds a URL to each user code "
                            + "classloader  on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be "
                            + "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple "
                            + "times for specifying more than one URL. The protocol must be supported by the "
                            + "{@link java.net.URLClassLoader}.");

	
    public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions)
            throws CliArgsException {
        final DefaultParser parser = new DefaultParser();

        try {
        	// DefaultParser.parse() 进入DefaultParser类
            return parser.parse(options, args, stopAtNonOptions);
        } catch (ParseException e) {
            throw new CliArgsException(e.getMessage());
        }
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

DefaultParser类

 public CommandLine parse(final Options options, final String[] arguments, final Properties properties, final boolean stopAtNonOption)
        throws ParseException {
        this.options = options;
        this.stopAtNonOption = stopAtNonOption;
        skipParsing = false;
        currentOption = null;
        expectedOpts = new ArrayList<>(options.getRequiredOptions());

        // clear the data from the groups
        for (final OptionGroup group : options.getOptionGroups()) {
            group.setSelected(null);
        }

        cmd = new CommandLine();

        if (arguments != null) {
        	// 循环处理每个参数
            for (final String argument : arguments) {
            	// 处理参数
                handleToken(argument);
            }
        }

        // check the arguments of the last option
        checkRequiredArgs();

        // add the default options
        handleProperties(properties);

        checkRequiredOptions();

        return cmd;
    }
 
 private void handleToken(final String token) throws ParseException {
        currentToken = token;

        if (skipParsing) {c
            cmd.addArg(token);
        } else if ("--".equals(token)) {
            skipParsing = true;
         // flink命令传递的参数的值(非key的处理)将flink命令传递的参数值,添加到设置的currentToken
        } else if (currentOption != null && currentOption.acceptsArg() && isArgument(token)) {
            currentOption.addValueForProcessing(stripLeadingAndTrailingQuotesDefaultOn(token));
            // 长参数--arg 的处理,并设置currentToken 对currentOption的修改就是commandLineOptions的修改
        } else if (token.startsWith("--")) {
            handleLongOption(token);
        // 短参数-arg的处理,并设置为this.currentOption。对currentOption的修改就是对commandLineOptions的修改
        } else if (token.startsWith("-") && !"-".equals(token)) {
            handleShortAndLongOption(token);
        } else {
            handleUnknownToken(token);
        }

        if (currentOption != null && !currentOption.acceptsArg()) {
            currentOption = null;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

handleToken的主要处理逻辑:
分别对长短参数-arg进行处理
如果flink命令传递的参数是key,且在之前的commandLineOptions中则设置currentOption为该key 对currentOption的修改就是对commandLineOptions的修改
如果flink命令传递的参数是value,则将该value添加到上一步设置的currentOption

创建哪种类型的客户端

在CliFrontend.java中 在new CliFrontend()初始化之前会 loadCustomCommandLines(Configuration,configurationDirectory)

// 这里一次添加了Generic、Yarn、Default三种命令行客户端
 	public static List<CustomCommandLine> loadCustomCommandLines(
            Configuration configuration, String configurationDirectory) {
        List<CustomCommandLine> customCommandLines = new ArrayList<>();
        customCommandLines.add(new GenericCLI(configuration, configurationDirectory));

        //	Command line interface of the YARN session, with a special initialization here
        //	to prefix all options with y/yarn.
        final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
        try {
            customCommandLines.add(
                    loadCustomCommandLine(
                            flinkYarnSessionCLI,
                            configuration,
                            configurationDirectory,
                            "y",
                            "yarn"));
        } catch (NoClassDefFoundError | Exception e) {
            final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
            try {
                LOG.info("Loading FallbackYarnSessionCli");
                customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
            } catch (Exception exception) {
                LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
            }
        }

        //	Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get
        // the
        //	      active CustomCommandLine in order and DefaultCLI isActive always return true.
        customCommandLines.add(new DefaultCLI());

        return customCommandLines;
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
 protected void run(String[] args) throws Exception {
        LOG.info("Running 'run' command.");

        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);

        // evaluate help flag
        if (commandLine.hasOption(HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRun(customCommandLines);
            return;
        }
        // 此处进行客户端的选择
        final CustomCommandLine activeCommandLine =
                validateAndGetActiveCommandLine(checkNotNull(commandLine));

        final ProgramOptions programOptions = ProgramOptions.create(commandLine);

        final List<URL> jobJars = getJobJarAndDependencies(programOptions);

        final Configuration effectiveConfiguration =
                getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

        LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

        try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
            executeProgram(effectiveConfiguration, program);
        }
    }
public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
... ...
for (CustomCommandLine cli : customCommandLines) {
... ...
//在 FlinkYarnSessionCli 为 active 时优先返回 FlinkYarnSessionCli。
//对于 DefaultCli,它的 isActive 方法总是返回 true。
if (cli.isActive(commandLine)) {
	return cli;
		}
	}
// 点击进入isaActive()方法
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

isActive方在接口CustomCommandLine中 ,具体实现类查看AstractYarnCli

    public boolean isActive(CommandLine commandLine) {

        final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
        // 是否指定为per-job模式,即指定-m yarn-cluster ID='yarn-cluster'
        final boolean yarnJobManager = ID.equals(jobManagerOption);
        // 是否存在flink的applicationID 即yarn-session模式是否启动
        final boolean hasYarnAppId =
                commandLine.hasOption(applicationId.getOpt())
                        || configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();
        // executor的名字为yarn-session 或 "yarn-per-job"
        final boolean hasYarnExecutor =
                YarnSessionClusterExecutor.NAME.equalsIgnoreCase(
                                configuration.get(DeploymentOptions.TARGET))
                        || YarnJobClusterExecutor.NAME.equalsIgnoreCase(
                                configuration.get(DeploymentOptions.TARGET));
        return hasYarnExecutor || yarnJobManager || hasYarnAppId;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

获取有效配置
还是在CliFrontend.java中的run()方法内

protected void run(String[] args) throws Exception {

final Configuration effectiveConfiguration = getEffectiveConfiguration( activeCommandLine, commandLine, programOptions, jobJars);

}

// 进入getEffectiveConfigration

private <T> Configuration getEffectiveConfiguration(
            final CustomCommandLine activeCustomCommandLine,
            final CommandLine commandLine,
            final ProgramOptions programOptions,
            final List<T> jobJars)
            throws FlinkException {

        final Configuration effectiveConfiguration =
        		// 进入getEffectiveConfigration
                getEffectiveConfiguration(activeCustomCommandLine, commandLine);

        final ExecutionConfigAccessor executionParameters =
                ExecutionConfigAccessor.fromProgramOptions(
                        checkNotNull(programOptions), checkNotNull(jobJars));

        executionParameters.applyToConfiguration(effectiveConfiguration);

        LOG.debug(
                "Effective configuration after Flink conf, custom commandline, and program options: {}",
                effectiveConfiguration);
        return effectiveConfiguration;
    }

//getEffectiveConfigration

    private <T> Configuration getEffectiveConfiguration(
            final CustomCommandLine activeCustomCommandLine, final CommandLine commandLine)
            throws FlinkException {

        final Configuration effectiveConfiguration = new Configuration(configuration);

        final Configuration commandLineConfiguration =
        		// 进入toConfiguration()
                checkNotNull(activeCustomCommandLine).toConfiguration(commandLine);

        effectiveConfiguration.addAll(commandLineConfiguration);

        return effectiveConfiguration;
    }
// 这个是YarnSessionCli下的toConfiguration的实现
 public Configuration toConfiguration(CommandLine commandLine) throws FlinkException {
        // we ignore the addressOption because it can only contain "yarn-cluster"
        final Configuration effectiveConfiguration = new Configuration();

        applyDescriptorOptionToConfig(commandLine, effectiveConfiguration);

        final ApplicationId applicationId = getApplicationId(commandLine);
        if (applicationId != null) {
            final String zooKeeperNamespace;
            if (commandLine.hasOption(zookeeperNamespace.getOpt())) {
                zooKeeperNamespace = commandLine.getOptionValue(zookeeperNamespace.getOpt());
            } else {
                zooKeeperNamespace =
                        effectiveConfiguration.getString(HA_CLUSTER_ID, applicationId.toString());
            }

            effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);
            effectiveConfiguration.setString(
                    YarnConfigOptions.APPLICATION_ID, applicationId.toString());
             // TARGET 就是 execution.target,目标执行器
			//决定后面什么类型的执行器提交任务:yarn-session、yarn-per-job
            effectiveConfiguration.setString(
                    DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);
        } else {
            effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
        }

        if (commandLine.hasOption(jmMemory.getOpt())) {
            String jmMemoryVal = commandLine.getOptionValue(jmMemory.getOpt());
            if (!MemorySize.MemoryUnit.hasUnit(jmMemoryVal)) {
                jmMemoryVal += "m";
            }
            effectiveConfiguration.set(
                    JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(jmMemoryVal));
        }

        if (commandLine.hasOption(tmMemory.getOpt())) {
            String tmMemoryVal = commandLine.getOptionValue(tmMemory.getOpt());
            if (!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) {
                tmMemoryVal += "m";
            }
            effectiveConfiguration.set(
                    TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(tmMemoryVal));
        }

        if (commandLine.hasOption(slots.getOpt())) {
            effectiveConfiguration.setInteger(
                    TaskManagerOptions.NUM_TASK_SLOTS,
                    Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
        }

        dynamicPropertiesEncoded = encodeDynamicProperties(commandLine);
        if (!dynamicPropertiesEncoded.isEmpty()) {
            Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
            for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
                effectiveConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
            }
        }

        if (isYarnPropertiesFileMode(commandLine)) {
            return applyYarnProperties(effectiveConfiguration);
        } else {
            return effectiveConfiguration;
        }
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114

调用用户代码中的main()方法

 protected void run(String[] args) throws Exception {
        // 调用用户的main()方法
        try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
            executeProgram(effectiveConfiguration, program);
        }
    }

    protected void executeProgram(final Configuration configuration, final PackagedProgram program)
            throws ProgramInvocationException {
        ClientUtils.executeProgram(
                new DefaultExecutorServiceLoader(), configuration, program, false, false);
    }

// ClientUtils工具类
public static void executeProgram(
            PipelineExecutorServiceLoader executorServiceLoader,
            Configuration configuration,
            PackagedProgram program,
            boolean enforceSingleJobExecution,
            boolean suppressSysout)
            throws ProgramInvocationException {
        checkNotNull(executorServiceLoader);
        final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
        final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
        	//设置当前的 classloader 为用户代码的 classloader
            Thread.currentThread().setContextClassLoader(userCodeClassLoader);

            LOG.info(
                    "Starting program (detached: {})",
                    !configuration.getBoolean(DeploymentOptions.ATTACHED));
			//用户代码中的 getExecutionEnvironment 会返回该 Environment 
            ContextEnvironment.setAsContext(
                    executorServiceLoader,
                    configuration,
                    userCodeClassLoader,
                    enforceSingleJobExecution,
                    suppressSysout);

            StreamContextEnvironment.setAsContext(
                    executorServiceLoader,
                    configuration,
                    userCodeClassLoader,
                    enforceSingleJobExecution,
                    suppressSysout);

            try {
            	// 调用用户的main方法 在invoke方法内进行反射执行
                program.invokeInteractiveModeForExecution();
            } finally {
                ContextEnvironment.unsetAsContext();
                StreamContextEnvironment.unsetAsContext();
            }
        } finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }
    }
public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
        try {
            callMainMethod(mainClass, args);
        } finally {
            FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
        }
    }
private static void callMainMethod(Class<?> entryClass, String[] args)
            throws ProgramInvocationException {
        Method mainMethod;
        if (!Modifier.isPublic(entryClass.getModifiers())) {
            throw new ProgramInvocationException(
                    "The class " + entryClass.getName() + " must be public.");
        }

        try {
            mainMethod = entryClass.getMethod("main", String[].class);
        } catch (NoSuchMethodException e) {
            throw new ProgramInvocationException(
                    "The class " + entryClass.getName() + " has no main(String[]) method.");
        } catch (Throwable t) {
            throw new ProgramInvocationException(
                    "Could not look up the main(String[]) method from the class "
                            + entryClass.getName()
                            + ": "
                            + t.getMessage(),
                    t);
        }

        if (!Modifier.isStatic(mainMethod.getModifiers())) {
            throw new ProgramInvocationException(
                    "The class " + entryClass.getName() + " declares a non-static main method.");
        }
        if (!Modifier.isPublic(mainMethod.getModifiers())) {
            throw new ProgramInvocationException(
                    "The class " + entryClass.getName() + " declares a non-public main method.");
        }

        try {
        	// 反射调用main函数
            mainMethod.invoke(null, (Object) args);
        } catch (IllegalArgumentException e) {
            throw new ProgramInvocationException(
                    "Could not invoke the main method, arguments are not matching.", e);
        } catch (IllegalAccessException e) {
            throw new ProgramInvocationException(
                    "Access to the main method was denied: " + e.getMessage(), e);
        } catch (InvocationTargetException e) {
            Throwable exceptionInMethod = e.getTargetException();
            if (exceptionInMethod instanceof Error) {
                throw (Error) exceptionInMethod;
            } else if (exceptionInMethod instanceof ProgramParametrizationException) {
                throw (ProgramParametrizationException) exceptionInMethod;
            } else if (exceptionInMethod instanceof ProgramInvocationException) {
                throw (ProgramInvocationException) exceptionInMethod;
            } else {
                throw new ProgramInvocationException(
                        "The main method caused an error: " + exceptionInMethod.getMessage(),
                        exceptionInMethod);
            }
        } catch (Throwable t) {
            throw new ProgramInvocationException(
                    "An error occurred while invoking the program's main method: " + t.getMessage(),
                    t);
        }
    }


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126

调用执行环境的 execute 方法
StreamExecutionEnvironment.java

public JobExecutionResult execute(String jobName) throws Exception {
        final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
        StreamGraph streamGraph = getStreamGraph();
        if (jobName != null) {
            streamGraph.setJobName(jobName);
        }

        try {
            return execute(streamGraph);
        } catch (Throwable t) {
            Optional<ClusterDatasetCorruptedException> clusterDatasetCorruptedException =
                    ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);
            if (!clusterDatasetCorruptedException.isPresent()) {
                throw t;
            }

            // Retry without cache if it is caused by corrupted cluster dataset.
            invalidateCacheTransformations(originalTransformations);
            streamGraph = getStreamGraph(originalTransformations);
            return execute(streamGraph);
        }
    }

public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
		// 进入executeAsync()方法
        final JobClient jobClient = executeAsync(streamGraph);

        try {
            final JobExecutionResult jobExecutionResult;

            if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
                jobExecutionResult = jobClient.getJobExecutionResult().get();
            } else {
                jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
            }

            jobListeners.forEach(
                    jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));

            return jobExecutionResult;
        } catch (Throwable t) {
            // get() on the JobExecutionResult Future will throw an ExecutionException. This
            // behaviour was largely not there in Flink versions before the PipelineExecutor
            // refactoring so we should strip that exception.
            Throwable strippedException = ExceptionUtils.stripExecutionException(t);

            jobListeners.forEach(
                    jobListener -> {
                        jobListener.onJobExecuted(null, strippedException);
                    });
            ExceptionUtils.rethrowException(strippedException);

            // never reached, only make javac happy
            return null;
        }
    }

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        checkNotNull(streamGraph, "StreamGraph cannot be null.");
        //根据提交模式选择匹配的factory
        final PipelineExecutor executor = getPipelineExecutor();
		
		// 选择合适的executor提交任务
		// Session 模式对应于 AbstractSessionClusterExecutor,Per-Job 模式对应于 AbstractJobClusterExecutor。
        CompletableFuture<JobClient> jobClientFuture =
                executor.execute(streamGraph, configuration, userClassloader);

        try {
            JobClient jobClient = jobClientFuture.get();
            jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
            collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));
            collectIterators.clear();
            return jobClient;
        } catch (ExecutionException executionException) {
            final Throwable strippedException =
                    ExceptionUtils.stripExecutionException(executionException);
            jobListeners.forEach(
                    jobListener -> jobListener.onJobSubmitted(null, strippedException));

            throw new FlinkException(
                    String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
                    strippedException);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/548735
推荐阅读
相关标签
  

闽ICP备14008679号