赞
踩
转载并且补充:Flink小坑之kerberos动态认证
这段内容摘抄自官网:Kerberos Authentication Setup and Configuration
配置:Kerberos-based Authentication / Authorization
Flink 通过 Kafka 连接器提供了一流的支持,可以对 Kerberos 配置的 Kafka 安装进行身份验证。只需在 flink-conf.yaml
中配置 Flink。像这样为 Kafka 启用 Kerberos 身份验证:
security.kerberos.login.use-ticket-cache
:默认情况下,这个值是 true
,Flink 将尝试在 kinit
管理的票据缓存中使用 Kerberos 票据。注意!在 YARN 上部署的 Flink jobs 中使用 Kafka 连接器时,使用票据缓存的 Kerberos 授权将不起作用。使用 Mesos 进行部署时也是如此,因为 Mesos 部署不支持使用票据缓存进行授权。security.kerberos.login.keytab
和 security.kerberos.login.principal
:要使用 Kerberos keytabs,需为这两个属性设置值。KafkaClient
追加到 security.kerberos.login.contexts
:这告诉 Flink 将配置的 Kerberos 票据提供给 Kafka 登录上下文以用于 Kafka 身份验证。一旦启用了基于 Kerberos 的 Flink 安全性后,只需在提供的属性配置中包含以下两个设置(通过传递给内部 Kafka 客户端),即可使用 Flink Kafka Consumer 或 Producer 向 Kafk a进行身份验证:
security.protocol
设置为 SASL_PLAINTEXT
(默认为 NONE
):用于与 Kafka broker 进行通信的协议。使用独立 Flink 部署时,也可以使用 SASL_SSL
;请在此处查看如何为 SSL 配置 Kafka 客户端。sasl.kerberos.service.name
设置为 kafka
(默认为 kafka
):此值应与用于 Kafka broker 配置的 sasl.kerberos.service.name
相匹配。客户端和服务器配置之间的服务名称不匹配将导致身份验证失败。有关 Kerberos 安全性 Flink 配置的更多信息,请参见[这里]({{< ref “docs/deployment/config” >}}})。你也可以在[这里]({{< ref “docs/deployment/security/security-kerberos” >}})进一步了解 Flink 如何在内部设置基于 kerberos 的安全性。
在 YARN 模式下,可以部署一个没有 keytab 的安全 Flink 集群,只使用票据缓存(由kinit管理)。这避免了生成密钥表的复杂性,并避免将其委托给集群管理器。在这种情况下,Flink CLI 获取 Hadoop 委托令牌(用于 HDFS 和 HBase等)。主要缺点是集群必然是短暂的,因为生成的委托令牌将过期(通常在一周内)。
使用以下步骤运行安全的 Flink 集群kinit:
注:tgt有一个有效期,过期了就无法使用了,这种方式不适合长期任务。
在原生 Kubernetes、YARN 和 Mesos 模式下运行安全 Flink 集群的步骤
1.在客户端的 Flink 配置文件中添加安全相关的配置选项(见这里)。
security.kerberos.login.keytab: /kerberos/flink.keytab
security.kerberos.login.principal: flink
security.kerberos.login.contexts: Client
security.kerberos.login.use-ticket-cache: true
注意:这里的principal是不带主机名的,如果带主机名可能如下
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /home/xx/xx.keytab
# todo: 这里需要hdfs的配置
security.kerberos.login.principal: mr/node1@域名.COM
security.kerberos.login.contexts: KafkaClient,Client
security.kerberos.login.use-ticket-cache: true
但是这个认证会有问题,因为如果你在node1
提交任务的时候,会找node1
所在机器的/home/xx/xx.keytab
文件,然后如果有的话,那么就可以使用,但是当你提交任务到集群后,因为是分布式运行,你传入的mr/node1@域名.COM
只适合Node1的keytab,而node2,node3的keytab是分别绑定到node2和Node3的,然后你用mr/node1@域名.COM
去认证,就会找不到
。
这里会报错 【kafka】kerberos client is being asked for a password not available to garner authentication informa
所以有以下几种方案解决
ktadd /home/mr/mr.keytab mr/node1@域名.COM
ktadd /home/mr/mr.keytab mr/node2@域名.COM
ktadd /home/mr/mr.keytab mr/node3@域名.COM
但是对方一直不生成,气死,网友也说需要生成一个全局的
后来我找到另外一种方案
解决
最后解决,我们不使用他们的keytab和jaas文件,把他们jaas文件拷贝下来一份
原本的
[root@zdh2 flink]# cat /etc/zdh/kafka/conf.zdh.kafka/jaas.conf KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/home/mr/mr.keytab" # 这个文件在每个服务器上不相同 storeKey=true useTicketCache=false principal="mr/node2@ZCDDH.COM"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/home/mr/mr.keytab" # 这个文件在每个服务器上不相同 storeKey=true useTicketCache=false principal="mr/node2@ZCDDH.COM"; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/home/mr/mr.keytab" # 这个文件在每个服务器上不相同 storeKey=true useTicketCache=false principal="mr/node2@ZCDDH.COM"; }; [root@zdh2 f
我们把zdh2服务的/home/mr/mr.keytab文件,拷贝下来,放到其他服务器的 /usr/hdp/soft/zaas/keytab/目录下
改成如下,
[root@zdh3 keytab]# pwd /usr/hdp/soft/zaas/keytab [root@zdh3 keytab]# cat jaas.conf KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/usr/hdp/soft/zaas/keytab/mr.keytab" # 对应zdh2服务器的keytab storeKey=true useTicketCache=false principal="mr/node2@ZCDDH.COM"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/usr/hdp/soft/zaas/keytab/mr.keytab" # 对应zdh2服务器的keytab storeKey=true useTicketCache=false principal="mr/node2@ZCDDH.COM"; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/usr/hdp/soft/zaas/keytab/mr.keytab" # 对应zdh2服务器的keytab storeKey=true useTicketCache=false principal="mr/node2@ZCDDH.COM"; };
我们的目录形式如下
[root@zdh2 keytab]# ll total 8 -rw-r--r--. 1 root root 637 Apr 14 02:36 jaas.conf -rwxr-xr-x. 1 root root 238 Apr 14 02:25 mr.keytab [root@zdh2 keytab]# [root@zdh3 keytab]# ll total 8 -rw-r--r--. 1 root root 637 Apr 14 02:37 jaas.conf -rwxrwxrwx. 1 root root 238 Apr 14 02:27 mr.keytab [root@zdh4 keytab]# ll total 8 -rw-r--r--. 1 root root 637 Apr 14 02:37 jaas.conf -rwxrwxrwx. 1 root root 238 Apr 14 02:27 mr.keytab
这里注意krb5.conf一般是所有服务器上一样的,如果不一样,也这样操作,拷贝一个服务器的放到一起。
这里确实有个小坑:【FLinlk】Flink小坑之kerberos动态认证
注: 这里就遇到了我们说的小坑,kerberos认证先于命令行解析,命令行定义的配置在认证阶段是无效的
2.确保密钥表文件存在于security.kerberos.login.keytab
客户端节点上的指示的路径中
3.正常部署 Flink 集群。
在 YARN、Mesos 和原生 Kubernetes 模式下,keytab 会自动从客户端复制到 Flink 容器。
注: 这里就遇到了我们说的小坑,kerberos认证先于命令行解析,命令行定义的配置在认证阶段是无效的
因此,我们目前无法通过在命令行指定不同的keytab和principal覆盖文件中的配置,从而达到多用户认证,也就是目前我们只能通过flink用户来向YARN提交作业,没有了用户区分,我们就无法进行权限控制。
那如何解决这个问题,从而达到每个用户都能以自己名义去提交作业呢,这里我们只要解决命令行解析先于kerberos认证就可以了(修改源码)
Flink自带的命令行解析器,如果我们借助自身命令行解析器(减少改动也就减少了bug)。 我们可能同时需要兼容所有的解析器,并且在新添加解析器时也增加了限制,可能在某个解析器没有对应参数,解析就不生效。
因此,我们采用最简单最暴力的办法,自己定义一个解析器,解析命令行的参数来覆盖配置文件里定义参数。
具体步骤如下:
1.自定义一个命令行解析器。
// 位于org.apache.flink.client.cli.CliFrontendParser下 public static class ExtendedGnuParser extends GnuParser { private final boolean ignoreUnrecognizedOption; public ExtendedGnuParser(boolean ignoreUnrecognizedOption) { // GnuParser、DefaultParser在遇到未定义的参数时都会抛出异常,这里是为了进行兼容 this.ignoreUnrecognizedOption = ignoreUnrecognizedOption; } protected void processOption(String arg, ListIterator<String> iter) throws ParseException { boolean hasOption = this.getOptions().hasOption(arg); if (hasOption || !this.ignoreUnrecognizedOption) { super.processOption(arg, iter); } } }
定义命令行参数并解析, 同时覆盖配置文件参数。
// 位于org.apache.flink.client.cli.CliFrontend // 其this.configuration为从配置文件中读取的配置。 public Configuration compatCommandLineSecurityConfiguration(String[] args) throws CliArgsException, FlinkException, ParseException { // 定义一个自定义都参数解析Options。 Options securityOptions = new Options() .addOption(Option.builder("D") .argName("property=value") .numberOfArgs(2) .valueSeparator('=') .desc("Allows specifying multiple generic configuration options.") .build()); // 解析命令行传入的参数 CommandLine commandLine = new CliFrontendParser.ExtendedGnuParser(true) .parse(securityOptions, args); // 从配置文件中读取的参数。 Configuration securityConfiguration = new Configuration(this.configuration); // 用命令行传入参数替换掉配置文件中的security参数 Properties dynamicProperties = commandLine.getOptionProperties("D"); for(Map.Entry<Object, Object> entry: dynamicProperties.entrySet()){ securityConfiguration.setString( entry.getKey().toString(), entry.getValue().toString() ); } return securityConfiguration; }
修改之后我们进行编译mvn clean package -DskipTests -Dfast
编译完成后,flink根目录下会出现一个build-target的软连接,指向编译后的flink真正的安装包。
我们修改一下flink安装包的名称,由flink-1.13.2 修改为flink-1.13.2.1 来区分,同时上传到线上就可以了。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。