赞
踩
flink运行时包含ResourceManager、JobManager、TaskManager、Dispatcher等组件
JobManager是Flink集群任务管理和调度的核心,是控制应用执行的主进程,一个applicationId 都有与之对应的JobManager,JobManager 又包含四个组件JobMaster、ResourceManager、Dispatcher、WebmonitorEndPoint.
JobMaster是JobManager中最核心的组件,负责处理单独的作业(job),所以JobMaster和具体的Job是一一对应的,多个job可以同时运行在一个FLink集群中,每个job都有自己的JobMaster。
在作业提交的时候,JobMaster会先接收到要执行的应用。JobMaster会把JobGraph转换成一个物理执行图。包含了并发执行的任务。JobMaster会向资源管理器ResourceManager发出请求。申请执行任务所需要的资源,然后将执行图分发给TaskManager去真正的执行。在运行过程中,JobMaster会负责所有的中央协调操作,比如检查点checkpoint的执行。
ResourceManager主要负责资源的分配和管理,在flink集群中只有一个所谓的资源主要是指TaskManager的任务槽(task slots)。任务槽是Flink资源调配单位,包含了机器来执行计算的一组CPU和内存资源。每一个任务Task都需要分配到一个slot上执行。这里所说的ResourceManager是FLink内置的ResourceManager而不是Yarn的ResourceManager。
1、负责接收用户提交的JobGraph,然后启动一个JobMaster,类似于Yarn中的AppMaster。
2、内有一个持久化服务:JobGraphStore,负责存储JobGraph,当构建执行图或者物理图时主节点宕机并恢复,则可以从这里重新拉取作业JobGraph
3、WebMonitorEndpoint:提供Rest服务,内部有一个Netty服务,客户端的所有请求都由该组件接收处理。
当客户端Client提交一个Job到集群时(Client会把Job构建成一个JobGraph),主节点接收到提交的job的Rest请求,WebMonitorEndpoint会通过Router进行解析找到对应的Handler来执行处理。处理完毕后交由Dispatcher,DIspatcher负责拉起JobMaster来负责这个job内部的Task的部署执行,执行Task所需要的资源,JobMaster像
ResourceManager申请资源。
TaskManager是Flink中的工作进程,数据流的具体计算就是它来做的。Flink集群中必须至少有一个TaskManager,每个TaskManager都包含了一定数量的任务槽task slots。slot是资源调度的最小单位,slot的数量限制了TaskManger能够并行处理的任务数量。启动之后TaskManager会向资源管理器注册它的slot;收到资源管理的指令后,TaskManager就会将一个或者多个槽位提供给JobMaster调用,JobMaster就可以分配任务执行。在执行过程中,TaskManager可以缓冲数据,还可以跟其他运行同一应用的TaskManager交换数据。
在flink-dist包下的flink-bin/bin 目录下 进入flink.sh
#!/usr/bin/env bash target="$0" iteration=0 while [ -L "$target" ]; do if [ "$iteration" -gt 100 ]; then echo "Cannot resolve path: You have a cyclic symlink in $target." break fi ls=`ls -ld -- "$target"` target=`expr "$ls" : '.* -> \(.*\)$'` iteration=$((iteration + 1)) done # Convert relative path to absolute path bin=`dirname "$target"` # get flink config . "$bin"/config.sh if [ "$FLINK_IDENT_STRING" = "" ]; then FLINK_IDENT_STRING="$USER" fi CC_CLASSPATH=`constructFlinkClassPath` log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml) # Add Client-specific JVM options FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}" # Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
执行java-cp就会开启虚拟机,在虚拟机上开启CliFronted进程,并且执行main方法,这里使用的是java -classpath指定类运行锁依赖的其他类的路径
说明 java -cp —>开启JVM虚拟机—>开启Process(CliFrontend)—>程序入口CliFrontend.main()
FLink提交任务的入口类为CliFrontend 在flink-clients模块下
/** Submits the job based on the arguments. */ // 根据args参数提交作业 public static void main(final String[] args) { private static final int INITIAL_RET_CODE = 31; int retCode = INITIAL_RET_CODE; try { // 进入mainInternal()方法 retCode = mainInternal(args); } finally { System.exit(retCode); } } static int mainInternal(final String[] args) { EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args); // 1. find the configuration directory final String configurationDirectory = getConfigurationDirectoryFromEnv(); // 2. load the global configuration final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory); // 3. load the custom command lines final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(configuration, configurationDirectory); int retCode = INITIAL_RET_CODE; try { // CliFrontend客户端实例化 final CliFrontend cli = new CliFrontend(configuration, customCommandLines); CommandLine commandLine = cli.getCommandLine( new Options(), Arrays.copyOfRange(args, min(args.length, 1), args.length), true); Configuration securityConfig = new Configuration(cli.configuration); DynamicPropertiesUtil.encodeDynamicProperties(commandLine, securityConfig); SecurityUtils.install(new SecurityConfiguration(securityConfig)); // cli.parseAndRun()进行flink自定义参数的解析,并进行application的提交 retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args)); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); LOG.error("Fatal error while running command line interface.", strippedThrowable); strippedThrowable.printStackTrace(); } return retCode; } // 先对CliFrontend进行实例化 public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) { this(configuration, new DefaultClusterClientServiceLoader(), customCommandLines); } public CliFrontend( Configuration configuration, ClusterClientServiceLoader clusterClientServiceLoader, List<CustomCommandLine> customCommandLines) { this.configuration = checkNotNull(configuration); this.customCommandLines = checkNotNull(customCommandLines); this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); // 进行客户端的文件系统初始化。Flink的文件系统采用了插件的方式,以支持不同的文件系统 FileSystem.initialize( configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); this.customCommandLineOptions = new Options(); // 添加zookeeperNamespace、jobmanager、-D传递的参数的key和value到customCommandLineOptions // 和GenericCli、FlinkYarnSessionCli的参数 for (CustomCommandLine customCommandLine : customCommandLines) { customCommandLine.addGeneralOptions(customCommandLineOptions); customCommandLine.addRunOptions(customCommandLineOptions); } // 获取客户端的超时时间和flink application的默认并行度 this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT); this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); } // 进入cli.parseAndRun()方法 查看commondline的具体解析 主要包含flink-conf-dir中的配置文件 以及flink run -t yarn-per-job -p 4 等命令行参数的解析 public int parseAndRun(String[] args) { // check for action if (args.length < 1) { // 参数打印输出 System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]"); CliFrontendParser.printHelp(customCommandLines); System.out.println("Please specify an action."); return 1; } // get action flink run -t String action = args[0]; // remove action from parameters final String[] params = Arrays.copyOfRange(args, 1, args.length); try { // do action 这里进行模式匹配 run run-application list cancel stop switch (action) { case ACTION_RUN: // 进入run()方法 run(params); return 0; case ACTION_RUN_APPLICATION: runApplication(params); return 0; case ACTION_LIST: list(params); return 0; case ACTION_INFO: info(params); return 0; case ACTION_CANCEL: cancel(params); return 0; case ACTION_STOP: stop(params); return 0; case ACTION_SAVEPOINT: savepoint(params); return 0; case "-h": case "--help": CliFrontendParser.printHelp(customCommandLines); return 0; case "-v": case "--version": String version = EnvironmentInformation.getVersion(); String commitID = EnvironmentInformation.getRevisionInformation().commitId; System.out.print("Version: " + version); System.out.println( commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID); return 0; default: System.out.printf("\"%s\" is not a valid action.\n", action); System.out.println(); System.out.println( "Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\"."); System.out.println(); System.out.println( "Specify the version option (-v or --version) to print Flink version."); System.out.println(); System.out.println( "Specify the help option (-h or --help) to get help on the command."); return 1; } } catch (CliArgsException ce) { return handleArgException(ce); } catch (ProgramParametrizationException ppe) { return handleParametrizationException(ppe); } catch (ProgramMissingJobException pmje) { return handleMissingJobException(); } catch (Exception e) { return handleError(e); } } // 进入run()方法 protected void run(String[] args) throws Exception { LOG.info("Running 'run' command."); // 获取默认运行的参数 help、verbose、fromSavepoint、allowNonRestoredState、restoreMode、jarfile、 // class、classpath、parallelism、arguments、detached、shutdownOnAttachedExit、yarndetached、 // python、pyFiles、pyModule、pyRequirements、pyArchives、pyExecutable、pyClientExecutable 大概这么多 final Options commandOptions = CliFrontendParser.getRunCommandOptions(); // 通过默认参数解析出只包含flink命令自定义的参数的commandLine 进入getCommandLine() // 先获取默认的flink运行参数,在解析出只包含flink命令自定义的参数的commadnLine final CommandLine commandLine = getCommandLine(commandOptions, args, true); // evaluate help flag if (commandLine.hasOption(HELP_OPTION.getOpt())) { CliFrontendParser.printHelpForRun(customCommandLines); return; } final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine)); final ProgramOptions programOptions = ProgramOptions.create(commandLine); final List<URL> jobJars = getJobJarAndDependencies(programOptions); final Configuration effectiveConfiguration = getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars); LOG.debug("Effective executor configuration: {}", effectiveConfiguration); try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) { executeProgram(effectiveConfiguration, program); } } // 先进入getCommandLine()方法 public CommandLine getCommandLine( // 将flink默认的参数 和我们在CliFrontend构造函数创建的customCommandLineOPtions进行追加 // customCommandLineOptions包含zookeeperNamespace、jobmabager、-D传递的参数的key和value 和 GenericCli、FlinkYarnSessionCli的参数 final Options commandOptions, final String[] args, final boolean stopAtNonOptions) throws CliArgsException { final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); // 通过flink的参数,解析出只包含flink命令自定义的参数的commandLine return CliFrontendParser.parse(commandLineOptions, args, stopAtNonOptions); }
CliFrontendParser
public class CliFrontendParser { // 选项列表 static final Option HELP_OPTION = new Option( "h", "help", false, "Show the help message for the CLI Frontend or the action."); static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file."); static final Option CLASS_OPTION = new Option( "c", "class", true, "Class with the program entry point (\"main()\" method). Only needed if the " + "JAR file does not specify the class in its manifest."); static final Option CLASSPATH_OPTION = new Option( "C", "classpath", true, "Adds a URL to each user code " + "classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be " + "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple " + "times for specifying more than one URL. The protocol must be supported by the " + "{@link java.net.URLClassLoader}."); public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions) throws CliArgsException { final DefaultParser parser = new DefaultParser(); try { // DefaultParser.parse() 进入DefaultParser类 return parser.parse(options, args, stopAtNonOptions); } catch (ParseException e) { throw new CliArgsException(e.getMessage()); } } }
DefaultParser类
public CommandLine parse(final Options options, final String[] arguments, final Properties properties, final boolean stopAtNonOption) throws ParseException { this.options = options; this.stopAtNonOption = stopAtNonOption; skipParsing = false; currentOption = null; expectedOpts = new ArrayList<>(options.getRequiredOptions()); // clear the data from the groups for (final OptionGroup group : options.getOptionGroups()) { group.setSelected(null); } cmd = new CommandLine(); if (arguments != null) { // 循环处理每个参数 for (final String argument : arguments) { // 处理参数 handleToken(argument); } } // check the arguments of the last option checkRequiredArgs(); // add the default options handleProperties(properties); checkRequiredOptions(); return cmd; } private void handleToken(final String token) throws ParseException { currentToken = token; if (skipParsing) {c cmd.addArg(token); } else if ("--".equals(token)) { skipParsing = true; // flink命令传递的参数的值(非key的处理)将flink命令传递的参数值,添加到设置的currentToken } else if (currentOption != null && currentOption.acceptsArg() && isArgument(token)) { currentOption.addValueForProcessing(stripLeadingAndTrailingQuotesDefaultOn(token)); // 长参数--arg 的处理,并设置currentToken 对currentOption的修改就是commandLineOptions的修改 } else if (token.startsWith("--")) { handleLongOption(token); // 短参数-arg的处理,并设置为this.currentOption。对currentOption的修改就是对commandLineOptions的修改 } else if (token.startsWith("-") && !"-".equals(token)) { handleShortAndLongOption(token); } else { handleUnknownToken(token); } if (currentOption != null && !currentOption.acceptsArg()) { currentOption = null; } }
handleToken的主要处理逻辑:
分别对长短参数-arg进行处理
如果flink命令传递的参数是key,且在之前的commandLineOptions中则设置currentOption为该key 对currentOption的修改就是对commandLineOptions的修改
如果flink命令传递的参数是value,则将该value添加到上一步设置的currentOption
在CliFrontend.java中 在new CliFrontend()初始化之前会 loadCustomCommandLines(Configuration,configurationDirectory)
// 这里一次添加了Generic、Yarn、Default三种命令行客户端 public static List<CustomCommandLine> loadCustomCommandLines( Configuration configuration, String configurationDirectory) { List<CustomCommandLine> customCommandLines = new ArrayList<>(); customCommandLines.add(new GenericCLI(configuration, configurationDirectory)); // Command line interface of the YARN session, with a special initialization here // to prefix all options with y/yarn. final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli"; try { customCommandLines.add( loadCustomCommandLine( flinkYarnSessionCLI, configuration, configurationDirectory, "y", "yarn")); } catch (NoClassDefFoundError | Exception e) { final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli"; try { LOG.info("Loading FallbackYarnSessionCli"); customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration)); } catch (Exception exception) { LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e); } } // Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get // the // active CustomCommandLine in order and DefaultCLI isActive always return true. customCommandLines.add(new DefaultCLI()); return customCommandLines; }
protected void run(String[] args) throws Exception { LOG.info("Running 'run' command."); final Options commandOptions = CliFrontendParser.getRunCommandOptions(); final CommandLine commandLine = getCommandLine(commandOptions, args, true); // evaluate help flag if (commandLine.hasOption(HELP_OPTION.getOpt())) { CliFrontendParser.printHelpForRun(customCommandLines); return; } // 此处进行客户端的选择 final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine)); final ProgramOptions programOptions = ProgramOptions.create(commandLine); final List<URL> jobJars = getJobJarAndDependencies(programOptions); final Configuration effectiveConfiguration = getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars); LOG.debug("Effective executor configuration: {}", effectiveConfiguration); try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) { executeProgram(effectiveConfiguration, program); } } public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) { ... ... for (CustomCommandLine cli : customCommandLines) { ... ... //在 FlinkYarnSessionCli 为 active 时优先返回 FlinkYarnSessionCli。 //对于 DefaultCli,它的 isActive 方法总是返回 true。 if (cli.isActive(commandLine)) { return cli; } } // 点击进入isaActive()方法 }
isActive方在接口CustomCommandLine中 ,具体实现类查看AstractYarnCli
public boolean isActive(CommandLine commandLine) { final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null); // 是否指定为per-job模式,即指定-m yarn-cluster ID='yarn-cluster' final boolean yarnJobManager = ID.equals(jobManagerOption); // 是否存在flink的applicationID 即yarn-session模式是否启动 final boolean hasYarnAppId = commandLine.hasOption(applicationId.getOpt()) || configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent(); // executor的名字为yarn-session 或 "yarn-per-job" final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equalsIgnoreCase( configuration.get(DeploymentOptions.TARGET)) || YarnJobClusterExecutor.NAME.equalsIgnoreCase( configuration.get(DeploymentOptions.TARGET)); return hasYarnExecutor || yarnJobManager || hasYarnAppId; }
获取有效配置
还是在CliFrontend.java中的run()方法内
protected void run(String[] args) throws Exception { final Configuration effectiveConfiguration = getEffectiveConfiguration( activeCommandLine, commandLine, programOptions, jobJars); } // 进入getEffectiveConfigration private <T> Configuration getEffectiveConfiguration( final CustomCommandLine activeCustomCommandLine, final CommandLine commandLine, final ProgramOptions programOptions, final List<T> jobJars) throws FlinkException { final Configuration effectiveConfiguration = // 进入getEffectiveConfigration getEffectiveConfiguration(activeCustomCommandLine, commandLine); final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions( checkNotNull(programOptions), checkNotNull(jobJars)); executionParameters.applyToConfiguration(effectiveConfiguration); LOG.debug( "Effective configuration after Flink conf, custom commandline, and program options: {}", effectiveConfiguration); return effectiveConfiguration; } //getEffectiveConfigration private <T> Configuration getEffectiveConfiguration( final CustomCommandLine activeCustomCommandLine, final CommandLine commandLine) throws FlinkException { final Configuration effectiveConfiguration = new Configuration(configuration); final Configuration commandLineConfiguration = // 进入toConfiguration() checkNotNull(activeCustomCommandLine).toConfiguration(commandLine); effectiveConfiguration.addAll(commandLineConfiguration); return effectiveConfiguration; } // 这个是YarnSessionCli下的toConfiguration的实现 public Configuration toConfiguration(CommandLine commandLine) throws FlinkException { // we ignore the addressOption because it can only contain "yarn-cluster" final Configuration effectiveConfiguration = new Configuration(); applyDescriptorOptionToConfig(commandLine, effectiveConfiguration); final ApplicationId applicationId = getApplicationId(commandLine); if (applicationId != null) { final String zooKeeperNamespace; if (commandLine.hasOption(zookeeperNamespace.getOpt())) { zooKeeperNamespace = commandLine.getOptionValue(zookeeperNamespace.getOpt()); } else { zooKeeperNamespace = effectiveConfiguration.getString(HA_CLUSTER_ID, applicationId.toString()); } effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace); effectiveConfiguration.setString( YarnConfigOptions.APPLICATION_ID, applicationId.toString()); // TARGET 就是 execution.target,目标执行器 //决定后面什么类型的执行器提交任务:yarn-session、yarn-per-job effectiveConfiguration.setString( DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME); } else { effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME); } if (commandLine.hasOption(jmMemory.getOpt())) { String jmMemoryVal = commandLine.getOptionValue(jmMemory.getOpt()); if (!MemorySize.MemoryUnit.hasUnit(jmMemoryVal)) { jmMemoryVal += "m"; } effectiveConfiguration.set( JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(jmMemoryVal)); } if (commandLine.hasOption(tmMemory.getOpt())) { String tmMemoryVal = commandLine.getOptionValue(tmMemory.getOpt()); if (!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) { tmMemoryVal += "m"; } effectiveConfiguration.set( TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(tmMemoryVal)); } if (commandLine.hasOption(slots.getOpt())) { effectiveConfiguration.setInteger( TaskManagerOptions.NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(slots.getOpt()))); } dynamicPropertiesEncoded = encodeDynamicProperties(commandLine); if (!dynamicPropertiesEncoded.isEmpty()) { Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded); for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) { effectiveConfiguration.setString(dynProperty.getKey(), dynProperty.getValue()); } } if (isYarnPropertiesFileMode(commandLine)) { return applyYarnProperties(effectiveConfiguration); } else { return effectiveConfiguration; } }
调用用户代码中的main()方法
protected void run(String[] args) throws Exception { // 调用用户的main()方法 try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) { executeProgram(effectiveConfiguration, program); } } protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException { ClientUtils.executeProgram( new DefaultExecutorServiceLoader(), configuration, program, false, false); } // ClientUtils工具类 public static void executeProgram( PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, PackagedProgram program, boolean enforceSingleJobExecution, boolean suppressSysout) throws ProgramInvocationException { checkNotNull(executorServiceLoader); final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader(); final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { //设置当前的 classloader 为用户代码的 classloader Thread.currentThread().setContextClassLoader(userCodeClassLoader); LOG.info( "Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED)); //用户代码中的 getExecutionEnvironment 会返回该 Environment ContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); StreamContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); try { // 调用用户的main方法 在invoke方法内进行反射执行 program.invokeInteractiveModeForExecution(); } finally { ContextEnvironment.unsetAsContext(); StreamContextEnvironment.unsetAsContext(); } } finally { Thread.currentThread().setContextClassLoader(contextClassLoader); } } public void invokeInteractiveModeForExecution() throws ProgramInvocationException { FlinkSecurityManager.monitorUserSystemExitForCurrentThread(); try { callMainMethod(mainClass, args); } finally { FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread(); } } private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException { Method mainMethod; if (!Modifier.isPublic(entryClass.getModifiers())) { throw new ProgramInvocationException( "The class " + entryClass.getName() + " must be public."); } try { mainMethod = entryClass.getMethod("main", String[].class); } catch (NoSuchMethodException e) { throw new ProgramInvocationException( "The class " + entryClass.getName() + " has no main(String[]) method."); } catch (Throwable t) { throw new ProgramInvocationException( "Could not look up the main(String[]) method from the class " + entryClass.getName() + ": " + t.getMessage(), t); } if (!Modifier.isStatic(mainMethod.getModifiers())) { throw new ProgramInvocationException( "The class " + entryClass.getName() + " declares a non-static main method."); } if (!Modifier.isPublic(mainMethod.getModifiers())) { throw new ProgramInvocationException( "The class " + entryClass.getName() + " declares a non-public main method."); } try { // 反射调用main函数 mainMethod.invoke(null, (Object) args); } catch (IllegalArgumentException e) { throw new ProgramInvocationException( "Could not invoke the main method, arguments are not matching.", e); } catch (IllegalAccessException e) { throw new ProgramInvocationException( "Access to the main method was denied: " + e.getMessage(), e); } catch (InvocationTargetException e) { Throwable exceptionInMethod = e.getTargetException(); if (exceptionInMethod instanceof Error) { throw (Error) exceptionInMethod; } else if (exceptionInMethod instanceof ProgramParametrizationException) { throw (ProgramParametrizationException) exceptionInMethod; } else if (exceptionInMethod instanceof ProgramInvocationException) { throw (ProgramInvocationException) exceptionInMethod; } else { throw new ProgramInvocationException( "The main method caused an error: " + exceptionInMethod.getMessage(), exceptionInMethod); } } catch (Throwable t) { throw new ProgramInvocationException( "An error occurred while invoking the program's main method: " + t.getMessage(), t); } }
调用执行环境的 execute 方法
StreamExecutionEnvironment.java
public JobExecutionResult execute(String jobName) throws Exception { final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations); StreamGraph streamGraph = getStreamGraph(); if (jobName != null) { streamGraph.setJobName(jobName); } try { return execute(streamGraph); } catch (Throwable t) { Optional<ClusterDatasetCorruptedException> clusterDatasetCorruptedException = ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class); if (!clusterDatasetCorruptedException.isPresent()) { throw t; } // Retry without cache if it is caused by corrupted cluster dataset. invalidateCacheTransformations(originalTransformations); streamGraph = getStreamGraph(originalTransformations); return execute(streamGraph); } } public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { // 进入executeAsync()方法 final JobClient jobClient = executeAsync(streamGraph); try { final JobExecutionResult jobExecutionResult; if (configuration.getBoolean(DeploymentOptions.ATTACHED)) { jobExecutionResult = jobClient.getJobExecutionResult().get(); } else { jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID()); } jobListeners.forEach( jobListener -> jobListener.onJobExecuted(jobExecutionResult, null)); return jobExecutionResult; } catch (Throwable t) { // get() on the JobExecutionResult Future will throw an ExecutionException. This // behaviour was largely not there in Flink versions before the PipelineExecutor // refactoring so we should strip that exception. Throwable strippedException = ExceptionUtils.stripExecutionException(t); jobListeners.forEach( jobListener -> { jobListener.onJobExecuted(null, strippedException); }); ExceptionUtils.rethrowException(strippedException); // never reached, only make javac happy return null; } } public JobClient executeAsync(StreamGraph streamGraph) throws Exception { checkNotNull(streamGraph, "StreamGraph cannot be null."); //根据提交模式选择匹配的factory final PipelineExecutor executor = getPipelineExecutor(); // 选择合适的executor提交任务 // Session 模式对应于 AbstractSessionClusterExecutor,Per-Job 模式对应于 AbstractJobClusterExecutor。 CompletableFuture<JobClient> jobClientFuture = executor.execute(streamGraph, configuration, userClassloader); try { JobClient jobClient = jobClientFuture.get(); jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null)); collectIterators.forEach(iterator -> iterator.setJobClient(jobClient)); collectIterators.clear(); return jobClient; } catch (ExecutionException executionException) { final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException); jobListeners.forEach( jobListener -> jobListener.onJobSubmitted(null, strippedException)); throw new FlinkException( String.format("Failed to execute job '%s'.", streamGraph.getJobName()), strippedException); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。