赞
踩
上篇,我们讲解了EnvironmentInformation.logEnvironmentInfo函数。主要是log4j2日志框架如何绑定到Flink、log4j2配置文件和日志路径的定义
这篇我们来讲解Flink的flink-conf.yaml和flink命令自定义参数解析
在flink-clients/src/org.apache.flink.client.cli.CliFrontend类的main方法中,定义了获取Flink的conf目录路径
/** Submits the job based on the arguments. */
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
......省略部分......
}
这里调用了当前类CliFrontend的getConfigurationDirectoryFromEnv函数,函数实现具体如下:
// -------------------------------------------------------------------------------------------- // Miscellaneous Utilities // -------------------------------------------------------------------------------------------- public static String getConfigurationDirectoryFromEnv() { String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); if (location != null) { if (new File(location).exists()) { return location; } else { throw new RuntimeException( "The configuration directory '" + location + "', specified in the '" + ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable, does not exist."); } } else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) { location = CONFIG_DIRECTORY_FALLBACK_1; } else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) { location = CONFIG_DIRECTORY_FALLBACK_2; } else { throw new RuntimeException( "The configuration directory was not specified. " + "Please specify the directory containing the configuration file through the '" + ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable."); } return location; }
我们前面执行flink脚本时,调用了config.sh脚本,里面执行了export FLINK_CONF_DIR
命令设置了flink的conf目录变量。这里再通过System.getenv
进行flink conf目录路径的获取
在flink-clients/src/org.apache.flink.client.cli.CliFrontend类的main方法中,定义了加载flink-conf.yaml配置文件
/** Submits the job based on the arguments. */ public static void main(final String[] args) { EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args); // 1. find the configuration directory final String configurationDirectory = getConfigurationDirectoryFromEnv(); // 2. load the global configuration final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory); // 3. load the custom command lines final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(configuration, configurationDirectory); ......省略部分...... }
这里调用了GlobalConfiguration.loadConfiguration函数。传入的参数是flink conf目录的路径
跳转后最终的loadConfiguration函数实现如下:
/** * Loads the configuration files from the specified directory. If the dynamic properties * configuration is not null, then it is added to the loaded configuration. * * @param configDir directory to load the configuration from * @param dynamicProperties configuration file containing the dynamic properties. Null if none. * @return The configuration loaded from the given configuration directory */ public static Configuration loadConfiguration( final String configDir, @Nullable final Configuration dynamicProperties) { if (configDir == null) { throw new IllegalArgumentException( "Given configuration directory is null, cannot load configuration"); } final File confDirFile = new File(configDir); if (!(confDirFile.exists())) { throw new IllegalConfigurationException( "The given configuration directory name '" + configDir + "' (" + confDirFile.getAbsolutePath() + ") does not describe an existing directory."); } // get Flink yaml configuration file final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME); if (!yamlConfigFile.exists()) { throw new IllegalConfigurationException( "The Flink config file '" + yamlConfigFile + "' (" + yamlConfigFile.getAbsolutePath() + ") does not exist."); } Configuration configuration = loadYAMLResource(yamlConfigFile); if (dynamicProperties != null) { configuration.addAll(dynamicProperties); } return configuration; }
主要是读取flink conf目录下的flink-conf.yaml文件,然后一行行的解析,解析时去掉注释行,再通过:
分隔获取配置的key和value保存到configuration变量中
在flink-clients/src/org.apache.flink.client.cli.CliFrontend类的main方法中,定义了添加3种flink命令行客户端
/** Submits the job based on the arguments. */ public static void main(final String[] args) { EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args); // 1. find the configuration directory final String configurationDirectory = getConfigurationDirectoryFromEnv(); // 2. load the global configuration final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory); // 3. load the custom command lines final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(configuration, configurationDirectory); int retCode = 31; ......省略部分...... }
这里调用了本类的loadCustomCommandLines函数,传入的是读取的flink-conf.yaml配置和conf配置目录
loadCustomCommandLines函数的实现如下:
public static List<CustomCommandLine> loadCustomCommandLines( Configuration configuration, String configurationDirectory) { List<CustomCommandLine> customCommandLines = new ArrayList<>(); customCommandLines.add(new GenericCLI(configuration, configurationDirectory)); // Command line interface of the YARN session, with a special initialization here // to prefix all options with y/yarn. final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli"; try { customCommandLines.add( loadCustomCommandLine( flinkYarnSessionCLI, configuration, configurationDirectory, "y", "yarn")); } catch (NoClassDefFoundError | Exception e) { final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli"; try { LOG.info("Loading FallbackYarnSessionCli"); customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration)); } catch (Exception exception) { LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e); } } // Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get // the // active CustomCommandLine in order and DefaultCLI isActive always return true. customCommandLines.add(new DefaultCLI()); return customCommandLines; }
可以看到,这个函数主要是创建了一个customCommandLines列表,然后依次添加了GenericCLI、flinkYarnSessionCLI、DefaultCLI三个命令行客户端到列表中。下面对这三种命令行客户端进行说明:
flink-clients/src/main/java/org.apache.flink.client.cli.GenricCli.isActive函数实现如下:
@Override
public boolean isActive(CommandLine commandLine) {
// flink-conf.yaml中是否指定execution.target: remote | local | yarn-per-job(deprecated) | yarn-session | kubernetes-session
return configuration.getOptional(DeploymentOptions.TARGET).isPresent()
// 运行的flink命令行是否指定-e remote | local | yarn-per-job(deprecated) | yarn-session | kubernetes-session
// -e这种参数已经被deprecated
|| commandLine.hasOption(executorOption.getOpt())
// 运行的flink命令行是否指定-t remote | local | yarn-per-job(deprecated) | yarn-session | kubernetes-session | kubernetes-application | yarn-application
// -t参数和flink-conf.yaml指定的execution.target等效,但优先权更高
|| commandLine.hasOption(targetOption.getOpt());
}
GenericCli我们看isActive方法就好了,后面会根据这个方法判断激活哪种命令行客户端,然后获取该命令行客户端对应的参数。而其他的命令行客户端对应的参数就不会获取
GenericCli指定我们的application运行的模式,有本地单机集群,远程集群,yarn和kubernetes的。GenericCli的激活如代码中的注释所示,是看我们的flink-conf.yaml配置文件是否指定execution-target参数,flink命令是否指定-e、-t参数
flinkYarnSessionCLI是通过反射来进行实例化的,反射实例化的方法如下:
/** * Loads a class from the classpath that implements the CustomCommandLine interface. * * @param className The fully-qualified class name to load. * @param params The constructor parameters */ private static CustomCommandLine loadCustomCommandLine(String className, Object... params) throws Exception { Class<? extends CustomCommandLine> customCliClass = Class.forName(className).asSubclass(CustomCommandLine.class); // construct class types from the parameters Class<?>[] types = new Class<?>[params.length]; for (int i = 0; i < params.length; i++) { checkNotNull(params[i], "Parameters for custom command-lines may not be null."); types[i] = params[i].getClass(); } Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types); return constructor.newInstance(params); }
所以需要我们重点看org.apache.flink.yarn.cli.FlinkYarnSessionCli这个类,看他的isActive方法是如何实现的。flink-yarn/src/main/java/org.apache.flink.yarn.cli.AbstractYarnCli.isActive的实现如下:
@Override public boolean isActive(CommandLine commandLine) { final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null); // flink命令执行是否有-m参数,且值等于yarn-cluster final boolean yarnJobManager = ID.equals(jobManagerOption); // flink命令执行是否有-yid参数,或者flink-conf.yaml配置文件是否有yarn.application.id参数 final boolean hasYarnAppId = commandLine.hasOption(applicationId.getOpt()) || configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent(); // flink-conf.yaml配置文件的execution.target参数的值是否等于yarn-session或yarn-per-job final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equalsIgnoreCase( configuration.get(DeploymentOptions.TARGET)) || YarnJobClusterExecutor.NAME.equalsIgnoreCase( configuration.get(DeploymentOptions.TARGET)); // 上面的3个条件,只要满足一个即可 return hasYarnExecutor || yarnJobManager || hasYarnAppId; }
FlinkYarnSessionCli的激活方式如代码的注释所示,主要是提交我们的application到yarn。以这种方式提交application到yarn,可以很方便的在flink命令行传递很多参数
而我们学习时,flink提交application时都没有指定上面的参数,是以standalone方式提交的。而GenericCli就是standalone的命令行客户端实现。可以看到它的isActive方法返回的永远都是true
flink-clients/src/main/java/org.apache.flink.client.cli.AbstractCustomCommandLine.isActive实现如下:
/** The default CLI which is used for interaction with standalone clusters. */ public class DefaultCLI extends AbstractCustomCommandLine { public static final String ID = "default"; private static final Option addressOption = new Option( "m", "jobmanager", true, "Address of the JobManager to which to connect. " + "Use this flag to connect to a different JobManager than the one specified in the configuration. " + "Attention: This option is respected only if the high-availability configuration is NONE."); @Override public boolean isActive(CommandLine commandLine) { // always active because we can try to read a JobManager address from the config return true; } ......省略部分...... }
GenericCli还有一个重要的地方,就是toConfiguration函数,主要是将flink命令行传递的参数进行解析,转换成Flink的Configuration类对象。
flink-clients/src/main/java/org.apache.flink.client.cli.AbstractCustomCommandLine.toConfiguration实现如下:
@Override public Configuration toConfiguration(CommandLine commandLine) throws FlinkException { // 添加-z(zookeeperNamespace)和值到resultingConfiguration final Configuration resultingConfiguration = super.toConfiguration(commandLine); // 添加-m(jobmanager)和值到resultingConfiguration if (commandLine.hasOption(addressOption.getOpt())) { String addressWithPort = commandLine.getOptionValue(addressOption.getOpt()); InetSocketAddress jobManagerAddress = NetUtils.parseHostPortAddress(addressWithPort); setJobManagerAddressInConfig(resultingConfiguration, jobManagerAddress); } // 默认添加execution.target=remote到resultingConfiguration resultingConfiguration.setString(DeploymentOptions.TARGET, RemoteExecutor.NAME); // 对-Dkey=value的参数进行解析,并添加到resultingConfiguration DynamicPropertiesUtil.encodeDynamicProperties(commandLine, resultingConfiguration); return resultingConfiguration; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。