赞
踩
Kafka作为一个分布式的发布-订阅消息系统,在日常项目中被频繁使用,通常情况下无论是生产者还是消费者只要订阅Topic后,即可进行消息的发送和接收。而kafka在0.9.0.0版本后添加了身份认证和权限控制两种安全服务,本文主要介绍在实际项目使用过程中遇到第三方kafka需身份认证时如何解决,以及对可能会碰到的问题进行总结。
Kafka身份认证主要分为以下几种:
(1)客户端与broker之间的连接认证
(2)broker与broker之间的连接认证
(3)broker与zookeeper之间的连接认证
日常项目中,无论是生产者还是消费者,我们都是作为客户端与kafka进行交互,因此使用的最多的是客户端与broker之间的连接认证。图1是客户端与服务端broker之间的认证过程图,客户端提交认证数据,服务端会根据认证数据对当前客户端进行身份校验,校验成功后的客户端即可成功登录kafka,进行后续操作。
图1 客户端与broker之间认证过程图
目前Kafka提供了SASL、SSL、Delegation Tokem三种安全认证机制,而SASL认证又分为了以下几种方式:
(1)基于Kerberos的GSSAPI
SASL-GSSAPI提供了一种非常安全的身份验证方法,但使用前提是企业中有Kerberos基础,一般使用随机密码的keytab认证方式,密码是加密的,在0.9版本中引入,目前是企业中使用最多的认证方式。
(2)SASL-PLAIN
SASL-PLAIN方式是一个经典的用户名/密码的认证方式,其中用户名和密码是以明文形式保存在服务端的JAAS配置文件中的,当客户端使用PLAIN模式进行认证时,密码是明文传输的,因此安全性较低,但好处是足够简单,方便我们对其进行二次开发,在0.10版本引入。
(3)SASL-SCRAM
SASL-SCRAM是针对SASL-PLAIN方式的不足而提供的另一种认证方式,它将用户名/密码存储在zookeeper中,并且可以通过脚本动态增减用户,当客户端使用SCRAM模式进行认证时,密码会经过SHA-256或SHA-512哈希加密后传输到服务器,因此安全性较高,在0.10.2版本中引入。
对Kafka集群来说,要想实现完整的安全模式,首先为集群中的每台机器生成密钥和证书是第一步,其次利用SASL对客户端进行身份验证是第二步,最后对不同客户端进行读写操作的授权是第三步,这些步骤即可以单独运作也可以同时运作,从而提高kafka集群的安全性。
本文主要介绍作为kafka生产者,如何基于Kerberos进行身份认证给第三方kafka发送数据。
Kerberos主要由三个部分组成:密钥分发中心Key Distribution Center(即KDC)、客户端Client、服务端Service,大致关系图如下图2所示,其中KDC是实现身份认证的核心组件,其包含三个部分:
我们作为生产者向第三方kafka发送数据,因此需要第三方提供以下安全认证文件:
获取以上安全认证文件后,即可编写java代码连接第三方kafka,步骤如下:
1、将安全认证文件xx.keytab和krb5.conf放置于某一路径下,确保后续java代码可进行读取
2、添加kafka配置文件,开启安全模式认证,其中kerberos.path是第一步中认证文件所在的目录
3、修改Kafka生产者配置,开启安全连接
4、调用认证工具类进行登录认证
LoginUtil认证工具类的核心是根据第一步中提供的安全认证文件自动生成jaas配置文件,该文件是kafka安全模式下认证的核心。代码如下:
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.io.File;
- import java.io.FileWriter;
- import java.io.IOException;
-
- /**
- * @ProjectName: stdp-security-demo
- * @Package:
- * @ClassName: LoginUtil
- * @Author: stdp
- * @Description: ${description}
- */
- public class LoginUtil {
-
- public enum Module {
- KAFKA("KafkaClient"), ZOOKEEPER("Client");
-
- private String name;
-
- Module(String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
- }
-
- private static final Logger LOGGER = LoggerFactory.getLogger(LoginUtil.class);
-
- /**
- * line operator string
- */
- private static final String LINE_SEPARATOR = System.getProperty("line.separator");
-
- /**
- * jaas file postfix
- */
- private static final String JAAS_POSTFIX = ".jaas.conf";
-
- private static final String JAVA_SECURITY_KRB5_CONF_KEY = "java.security.krb5.conf";
-
- public static final String JAVA_SECURITY_LOGIN_CONF_KEY = "java.security.auth.login.config";
-
- private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";
-
-
- private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");
-
- /**
- * oracle jdk login module
- */
- private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
-
-
- public synchronized static void login(String userPrincipal, String userKeytabPath, String krb5ConfPath)
- throws IOException
- {
- // 1.check input parameters
- if ((userPrincipal == null) || (userPrincipal.length() <= 0))
- {
- LOGGER.error("input userPrincipal is invalid.");
- throw new IOException("input userPrincipal is invalid.");
- }
-
- if ((userKeytabPath == null) || (userKeytabPath.length() <= 0))
- {
- LOGGER.error("input userKeytabPath is invalid.");
- throw new IOException("input userKeytabPath is invalid.");
- }
-
- if ((krb5ConfPath == null) || (krb5ConfPath.length() <= 0))
- {
- LOGGER.error("input krb5ConfPath is invalid.");
- throw new IOException("input krb5ConfPath is invalid.");
- }
-
- // 2.check file exsits
- File userKeytabFile = new File(userKeytabPath);
- if (!userKeytabFile.exists())
- {
- LOGGER.error("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") does not exsit.");
- throw new IOException("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") does not exsit.");
- }
- if (!userKeytabFile.isFile())
- {
- LOGGER.error("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") is not a file.");
- throw new IOException("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") is not a file.");
- }
-
- File krb5ConfFile = new File(krb5ConfPath);
- if (!krb5ConfFile.exists())
- {
- LOGGER.error("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") does not exsit.");
- throw new IOException("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") does not exsit.");
- }
- if (!krb5ConfFile.isFile())
- {
- LOGGER.error("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") is not a file.");
- throw new IOException("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") is not a file.");
- }
-
- // 3.set and check krb5config
- setKrb5Config(krb5ConfFile.getAbsolutePath());
-
- // LOGGER.info("check zookeeper server Principal =============================================");
- setZookeeperServerPrincipal(userPrincipal);
- // LOGGER.info("check jaas.conf +++++++++++++++++++++++++++++++++++++++++++++++++");
- setJaasFile(userPrincipal,userKeytabPath);
- LOGGER.info("Login success!!!!!!!!!!!!!!");
- }
-
-
- public static void setKrb5Config(String krb5ConfigFile) throws IOException {
- System.setProperty(JAVA_SECURITY_KRB5_CONF_KEY,krb5ConfigFile);
- String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF_KEY);
- if (ret == null) {
- LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY + " is null.");
- throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY + " is null.");
- }
- if (!ret.equals(krb5ConfigFile)){
- LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY + " is " + ret + " is not " + krb5ConfigFile + ".");
- throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY + " is " + ret + " is not " + krb5ConfigFile + ".");
- }
- }
-
- public static void setJaasFile(String userPrincipal,String userKeytabPath) throws IOException {
- String jaasPath = new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + JAAS_POSTFIX;
- LOGGER.info("jaasPath = {}",jaasPath);
- //windows路径下分隔符替换
- jaasPath = jaasPath.replace("\\","\\\\");
- userKeytabPath = userKeytabPath.replace("\\","\\\\");
- //删除jaas文件
- deleteJaasFile(jaasPath);
- writeJaasFile(jaasPath,userPrincipal,userKeytabPath);
- System.setProperty(JAVA_SECURITY_LOGIN_CONF_KEY,jaasPath);
- }
-
- private static void deleteJaasFile(String jaasPath) throws IOException {
- File jaasFile = new File(jaasPath);
- if (jaasFile.exists()){
- if (!jaasFile.delete()){
- throw new IOException("failed to delete exists jaas file.");
- }
- }
- }
-
- private static void writeJaasFile(String jaasPath,String userPrincipal,String userKeytabPath) throws IOException {
- FileWriter writer = new FileWriter(new File(jaasPath));
- try{
- writer.write(getJaasConfContext(userPrincipal,userKeytabPath));
- writer.flush();
- }catch (IOException e){
- throw new IOException("Failed to create jaas.conf File.");
- }finally {
- writer.close();
- }
- }
-
-
- private static String getJaasConfContext(String userPrincipal,String userKeytabPath) throws IOException{
- Module[] allModule = Module.values();
- StringBuffer builder = new StringBuffer();
- for (Module module: allModule){
- String serviceName = null;
- if ("Client".equals(module.getName())){
- serviceName = "zookeeper";
- }else if ("KafkaClient".equals(module.getName())){
- serviceName = "kafka";
- }
- builder.append(getModuleContext(userPrincipal,userKeytabPath,module,serviceName));
- }
- return builder.toString();
- }
-
- private static String getModuleContext(String userPrincipal,String userKeytabPath,Module module,String serviceName) throws IOException {
- StringBuffer builder = new StringBuffer();
- if (IS_IBM_JDK){
- builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
- builder.append("credsType=both").append(LINE_SEPARATOR);
- builder.append("principal=\"" + userPrincipal.trim() + "\"").append(LINE_SEPARATOR);
- builder.append("useKeytab=\"" + userKeytabPath + "\"").append(LINE_SEPARATOR);
- builder.append("serviceName=\""+serviceName + "\"").append(LINE_SEPARATOR);
- builder.append("debug=true;").append(LINE_SEPARATOR);
- builder.append("};").append(LINE_SEPARATOR);
- }else {
- builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
- builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
- builder.append("useKeyTab=true").append(LINE_SEPARATOR);
- builder.append("keyTab=\"" + userKeytabPath + "\"").append(LINE_SEPARATOR);
- builder.append("principal=\"" + userPrincipal.trim() + "\"").append(LINE_SEPARATOR);
- builder.append("serviceName=\""+serviceName + "\"").append(LINE_SEPARATOR);
- builder.append("useTicketCache=false").append(LINE_SEPARATOR);
- builder.append("storeKey=true").append(LINE_SEPARATOR);
- builder.append("debug=true;").append(LINE_SEPARATOR);
- builder.append("};").append(LINE_SEPARATOR);
- }
- return builder.toString();
- }
-
-
- public static void setZookeeperServerPrincipal(String zkServerPrincipal) throws IOException {
- System.setProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY,zkServerPrincipal);
- String ret = System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY);
- if (ret == null) {
- LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is null.");
- throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is null.");
- }
- if (!ret.equals(zkServerPrincipal)){
- LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is " + ret + " is not " + zkServerPrincipal + ".");
- throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is " + ret + " is not " + zkServerPrincipal + ".");
- }
- }
- }
经过以上四步的配置,启动项目后即可自动连接kafka进行身份校验,若登录成功,会输出如下提示信息:Login success,并且会将生成的jaas文件路径打印出来。
1、认证文件找不到
这是因为步骤1中kerberos.path配置有问题,检查path路径下是否存在认证文件keytab和krb5.conf。
2、 principal和keytab不匹配
不同的用户名对应不同的密码,在身份校验时,需保证用户名principle和密码keytab的一致性,否则无法验证通过。而principal和keytab不匹配可能存在以下两种场景:
如果步骤1检查没用问题,则可根据日志中输出的jaas文件路径查看自动生成的jaas文件中的principal和配置文件中的kerberos.principle是否一致。比如我的这个项目中,就是由于现场技术配置kerberos.principle时后面多打了一个空格,导致自动生成的jaas文件中的principle后多一个空格,因此和keytab认证失败。
为了彻底解决这个误打空格的问题,可以直接修改认证工具类LoginUtil,在生成jaas文件的principle时去掉可能存在的空格。
3、用户密码keytab更新,导致出现checksum failed
这是由于principal对应的密码修改了,但是程序中使用的还是旧的密码,就会出现这个问题。解决办法是找第三方提供principal对应的最新的密码文件keytab。
4、jaas文件找不到
该问题是由于找不到jaas.conf 这个文件导致的,而基于kerberos认证时一般不会出现,这是因为kerberos认证时jaas文件是由LoginUtil工具类根据安全认证文件自动生成并且存储在指定路径下的。
该问题通常出现在SASL-PLAIN方式的认证中,因为该方式需要添加一个配置参数java.security.auth.login.config来标识jaas文件的路径,如果文件路径出错则会报以上错误。
在kafka身份认证的过程中,需要的principal,keytab,ServiceName等信息均配置在jaas文件中,因此保证认证的服务可以读取到正确的文件及正确的配置是kafka安全模式下认证的核心。
基于kerberos认证时,可根据安全认证文件自动生成jaas配置文件,从而保证了密码加密传输,相比于SASL-PLAIN模式更具安全性,并且认证实现过程也较为简单。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。