赞
踩
通过"bin/flink run"提交jar包到Flink集群,在"bin/flink"脚本中实际是调用org.apache.flink.client.cli.CliFrontend#main触发真正的执行。
org.apache.flink.client.cli.CliFrontend#main的源码如下:
public static void main(final String[] args) { // 主要就是获取JVM信息、hadoop信息等打印日志 EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args); // 1. find the configuration directory // 1. 获取Flink的conf目录所在路径 final String configurationDirectory = getConfigurationDirectoryFromEnv(); // 2. load the global configuration // 2. 根据conf路径, 加载flink-conf.yaml, 解析其中配置保存到Configuration中, Configuration实际就是HashMap final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory); // 3. load the custom command lines // 3. 按照GenericCLI、FlinkYarnSessionCli、DefaultCLI顺序封装命令行接口, 并且保存到List中 final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(configuration, configurationDirectory); int retCode = 31; try { // 创建CliFrontend对象(封装了configuration和customCommandLines) final CliFrontend cli = new CliFrontend(configuration, customCommandLines); // 环境设置 SecurityUtils.install(new SecurityConfiguration(cli.configuration)); // TODO: 执行parseAndRun,在这里面会通过反射执行用户jar包中的main方法 retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args)); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); LOG.error("Fatal error while running command line interface.", strippedThrowable); strippedThrowable.printStackTrace(); } finally { System.exit(retCode); } }
org.apache.flink.client.cli.CliFrontend#main总结起来主要有2个功能:
(a)读取flink-conf.yaml配置
(b)根据flink-conf.yaml配置,生成CliFronted用于提交任务
org.apache.flink.client.cli.CliFrontend#parseAndRun的源码如下:
/** * 以"bin/flink run -t yarn-per-job /etc/flink-1.14/examples/streaming/SocketWindowWordCount.jar --port 8888"举例, * "bin/flink"之后的都是args, 包括"run" * @param args 封装用户提交作业时的参数 * @return */ public int parseAndRun(String[] args) { // check for action // 如果只执行"bin/flink", 后面不加任何参数的话, 将会打印help内容 if (args.length < 1) { CliFrontendParser.printHelp(customCommandLines); System.out.println("Please specify an action."); return 1; } // get action // 以方法上面的举例说吗, action就是run String action = args[0]; // remove action from parameters // 拷贝输入参数到params中 final String[] params = Arrays.copyOfRange(args, 1, args.length); try { // do action switch (action) { // 如果action为run的话, 满足这个case case ACTION_RUN: run(params); return 0; case ACTION_RUN_APPLICATION: ... } catch (CliArgsException ce) { return handleArgException(ce); } catch (ProgramParametrizationException ppe) { return handleParametrizationException(ppe); } catch (ProgramMissingJobException pmje) { return handleMissingJobException(); } catch (Exception e) { return handleError(e); } }
org.apache.flink.client.cli.CliFrontend#run源码如下:
protected void run(String[] args) throws Exception { LOG.info("Running 'run' command."); // 1. 将默认的Option、Flink识别的Option、savepoint Option保存到Options中 final Options commandOptions = CliFrontendParser.getRunCommandOptions(); // 2. 将用户的输入参数args与Options进行匹配, 匹配后生成CommandLine final CommandLine commandLine = getCommandLine(commandOptions, args, true); // evaluate help flag // 如果输入参数中有"h"的话, 则打印出help if (commandLine.hasOption(HELP_OPTION.getOpt())) { CliFrontendParser.printHelpForRun(customCommandLines); return; } // 3. 以用户启动参数作为判断依据, 挨个判断GenericCLI、FlinkYarnSessionCli、DefaultCLI哪个为Active final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine)); // 4. 对输入参数再进行解析, 封装为ProgramOptions final ProgramOptions programOptions = ProgramOptions.create(commandLine); // 5. 获取用户提交的jar包以及依赖jar的URL, 都封装到这个List中 final List<URL> jobJars = getJobJarAndDependencies(programOptions); // TODO: 6. 获取配置, 包括: JobManager内存、TaskManager内存、每个TaskManager保存的slot等 final Configuration effectiveConfiguration = getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars); LOG.debug("Effective executor configuration: {}", effectiveConfiguration); // 7. 生成的PackagedProgram是最终要提交到集群的配置, 封装了用户提交参数以及flink-conf.yaml中的配置 try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) { // TODO: 8. 执行程序 executeProgram(effectiveConfiguration, program); } }
从(3)再往下将会执行到此处的源码部分:
public static void executeProgram( PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, PackagedProgram program, boolean enforceSingleJobExecution, boolean suppressSysout) throws ProgramInvocationException { checkNotNull(executorServiceLoader); // TODO: 1. 这一步会进行类加载器的替换, 将当前线程所用的类加载器替换为ChildFirstClassLoader或者ParentFirstClassLoader, // 所以运行用户代码的类加载器实际是ChildFirstClassLoader(默认)或者ParentFirstClassLoader final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader(); final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(userCodeClassLoader); LOG.info( "Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED)); // 2. 配置环境的上下文, 用户代码里使用的getExecutionEnvironment就会拿到这些信息 ContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); // 3. 将Configuration(保存有flink-conf.yaml和用户自定义参数), userCodeClassLoader(执行用户代码使用的类加载器)等 // 封装到StreamContextEnvironment, 所以后面能够在StreamContextEnvironment中获取到这些配置 StreamContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); try { // 4. 执行用户代码 program.invokeInteractiveModeForExecution(); } finally { ContextEnvironment.unsetAsContext(); StreamContextEnvironment.unsetAsContext(); } } finally { // 5. 类加载器再换回当前线程的类加载器 Thread.currentThread().setContextClassLoader(contextClassLoader); } }
下面重点对第1步获取执行用户代码的类加载器进行讲解:
(a)查看org.apache.flink.client.program.PackagedProgram#getUserCodeClassLoader源码
public ClassLoader getUserCodeClassLoader() {
return this.userCodeClassLoader;
}
直接执行“return this.userCodeClassLoader;”,而this.userCodeClassLoader是在(3).7生成PackagedProgram时赋值的。
(b)生成PackagedProgram的源码:
private PackagedProgram( @Nullable File jarFile, List<URL> classpaths, @Nullable String entryPointClassName, Configuration configuration, SavepointRestoreSettings savepointRestoreSettings, String... args) throws ProgramInvocationException { ... // 生成加载用户代码的ClassLoader, 其中: // getJobJarAndDependencies(): 获取当前要提交到Flink的jar包以及依赖jar // classpaths: 用户启动参数"C"指定的classpath路径 this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader( getJobJarAndDependencies(), classpaths, getClass().getClassLoader(), configuration); // load the entry point class this.mainClass = loadMainClass( // if no entryPointClassName name was given, we try and look one up through // the manifest entryPointClassName != null ? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile), userCodeClassLoader); ... }
(c)创建加载用户jar的类加载器源码:
/** * @param jars 当前要提交到Flink集群到jar包以及其依赖jar * @param classpaths 用户启动参数中"C"指定的classpath路径 * @param parent PackagedProgram类的ClassLoader * @param configuration 根据flink-conf.yaml和用户提交参数生成的Configuration * @return */ public static URLClassLoader buildUserCodeClassLoader( List<URL> jars, List<URL> classpaths, ClassLoader parent, Configuration configuration) { // 将以下jar包全都添加到URL[]中: // (1). 用户提交的jar包 // (2). 依赖jar // (3). 用户启动参数中"C"指定的classpath路径下的jar URL[] urls = new URL[jars.size() + classpaths.size()]; for (int i = 0; i < jars.size(); i++) { urls[i] = jars.get(i); } for (int i = 0; i < classpaths.size(); i++) { urls[i + jars.size()] = classpaths.get(i); } final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(configuration); // 从Configuration中读取"classloader.resolve-order"配置, 这个配置将会决定采用child-first还是parent-first加载类 final String classLoaderResolveOrder = configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER); FlinkUserCodeClassLoaders.ResolveOrder resolveOrder = FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder); final boolean checkClassloaderLeak = configuration.getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER); // 创建ClassLoader return FlinkUserCodeClassLoaders.create( resolveOrder, urls, parent, alwaysParentFirstLoaderPatterns, NOOP_EXCEPTION_HANDLER, checkClassloaderLeak); }
(d)org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders#create源码
public static URLClassLoader create( ResolveOrder resolveOrder, URL[] urls, ClassLoader parent, String[] alwaysParentFirstPatterns, Consumer<Throwable> classLoadingExceptionHandler, boolean checkClassLoaderLeak) { // ChildFirstClassLoader和ParentFirstClassLoader父类都是FlinkUserCodeClassLoader switch (resolveOrder) { // 采用child-first加载类时, 返回的类加载器是ChildFirstClassLoader, 这个加载器重写类loadClassWithoutExceptionHandling方法, // 这个方法是FlinkUserCodeClassLoader.loadClass调用的, 因此这个加载器打破了类双亲委派模型, 自己去加载类 case CHILD_FIRST: return childFirst( urls, parent, alwaysParentFirstPatterns, classLoadingExceptionHandler, checkClassLoaderLeak); // 采用parent-first加载类时, 返回的类加载器是ParentFirstClassLoader, 这个加载器会把类交由父类加载器加载, 即最终通过URLClassLoader加载 case PARENT_FIRST: return parentFirst( urls, parent, classLoadingExceptionHandler, checkClassLoaderLeak); default: throw new IllegalArgumentException( "Unknown class resolution order: " + resolveOrder); } }
因此Flink可以通过flink-config.yaml中的“classloader.resolve-order”决定是否要打破双亲委派模型去加载类。
第(3).4 中的"program.invokeInteractiveModeForExecution();"最终将会执行到此处:
/** * * @param entryClass 要执行的主类 * @param args 用户输入的参数 * @throws ProgramInvocationException */ private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException { Method mainMethod; ... try { mainMethod = entryClass.getMethod("main", String[].class); } catch (NoSuchMethodException e) { throw new ProgramInvocationException( "The class " + entryClass.getName() + " has no main(String[]) method."); } catch (Throwable t) { throw new ProgramInvocationException( "Could not look up the main(String[]) method from the class " + entryClass.getName() + ": " + t.getMessage(), t); } ... try { mainMethod.invoke(null, (Object) args); } ... catch (Throwable t) { throw new ProgramInvocationException( "An error occurred while invoking the program's main method: " + t.getMessage(), t); } }
在这个方法中,通过反射执行了用户jar包中的main方法,也就正式要执行用户代码中的逻辑了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。