赞
踩
在配置flink任务时,不能通过对单个任务进行kerberos验证,只能在flink-conf文件中进行认证,这样遇到的麻烦就是,每次启动不同任务的时候,都需要进行依赖不同的conf文件
通过在flink github项目中查看,发现有pr提交了代码,可以在flink任务启动之初进行,conf文件加载
通过修改 flink/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java文件中的main方法
- public static void main(final String[] args) {
- EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
-
- // 1. find the configuration directory
- final String configurationDirectory = getConfigurationDirectoryFromEnv();
-
- // 2. load the global configuration
- final Configuration configuration =
- GlobalConfiguration.loadConfiguration(configurationDirectory);
-
- // 3. load the custom command lines
- final List<CustomCommandLine> customCommandLines =
- loadCustomCommandLines(configuration, configurationDirectory);
-
- int retCode = 31;
- try {
-
- // 更改当前代码内容,即可完成对conf文件的加载,进行conf文件的认证
- final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
- CommandLine commandLine =
- cli.getCommandLine(
- new Options(), Arrays.copyOfRange(args, 1, args.length), true);
- Configuration securityConfig = new Configuration(cli.configuration);
- DynamicPropertiesUtil.encodeDynamicProperties(commandLine, securityConfig);
- SecurityUtils.install(new SecurityConfiguration(securityConfig));
- retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
- } catch (Throwable t) {
- final Throwable strippedThrowable =
- ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
- LOG.error("Fatal error while running command line interface.", strippedThrowable);
- strippedThrowable.printStackTrace();
- } finally {
- System.exit(retCode);
- }
- }
这样在任务提交通过:
flink run -t yarn-per-job -d \
-Dsecurity.kerberos.login.keytab=/data2/home/zhu.hh/kafka3u1.keytab \
-Dsecurity.kerberos.login.principal=kafka3u1@BELLE.COM \
-Dsecurity.kerberos.login.contexts=KafkaClient \
即可完成认证
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。