赞
踩
Kafka支持以下SASL机制:GSSAPI 、PLAIN、 SCRAM-SHA-256、 SCRAM-SHA-512、 OAUTHBEARER。
本指南主要以SCRAM机制配置为主。
当使用SCRAM机制时,Kafka使用Zookeeper存储用户加密后的凭证,所以需要先使用Kafka提供的脚本进行用户的创建。
比如创建用户名为kafkaAdmin,密码为admin用户的操作命令如下:
> bin/Kafka-configs.sh --zookeeper localhost:2182 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name KafkaAdmin
Kafka使用Java的JAAS机制进行安全验证文件的加载,首先创建文件命名为broker_jaas.config,内容如下:
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="KafkaAdmin"
password="admin";
};
然后编辑/bin/kafka-server-start.sh文件,使用Java的-Djava.security.auth.login.config参数指定JAAS文件位置。
[kafka@CTSP1 bin]$ cat kafka-server-start.sh #!/bin/bash export JMX_PORT=9876 export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=10.10.92.17" #指定JAAS文件 export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/config/broker_jaas.config" if [ $# -lt 1 ]; then echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" exit 1 fi base_dir=$(dirname $0) if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" fi if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G " fi EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'} COMMAND=$1 case $COMMAND in -daemon) EXTRA_ARGS="-daemon "$EXTRA_ARGS shift ;; *) ;; esac exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
修改/config/server.properties文件,启用安全认证协议。修改项如下:
#指定客户端连接使用的协议以及地址,端口
listeners=SASL_PLAINTEXT://127.0.0.1:9092
#指定服务器之间使用的安全连接协议,可选的协议有PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。
security.inter.broker.protocol=SASL_PLAINTEXT
#指定启用的SASL认证机制,可选项有PLAIN,GSSAPI,SCRAM。
sasl.enabled.mechanisms=SCRAM-SHA-256
#指定集群内部的认证机制
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
客户端可以通过指定JAAS文件的方式使用或者在客户端创建的Producer或者Consumer的属性中添加对应的键值对使用。
配置客户端JAAS格式文件如下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafkaAdmin" password="admin";
在bin目录中的脚本文件,可以通过–command-config参数指定JAAS文件,或者是生产者脚本的–producer.config或者消费者脚本的–consumer.config参数指定JAAS文件进行验证。
为集群的每一个broker生成对应节点的密钥对,执行命令后会要求设置密码,这个密码需要记下来已备后面使用。
以下命令为生成一个10年的有效期的密钥对,密码为123456 :
keytool -keystore server.keystore.jks -alias localhost -validity 3650 -genkey -keyalg RSA -storetype pkcs12
注意点:Kafka默认启用了SSL的主机名验证功能,可以设置配置文件参数ssl.endpoint.identification.algorithm为空方便测试。启用验证的时候,制作密钥对需要添加SAN信息,握手过程中会对当前连接的域名和地址进行校验,如果不是预期的域名和IP则握手失败。
keytool -keystore server.keystore.jks -alias localhost -validity 3650 -genkey -keyalg RSA -storetype pkcs12 -ext SAN=DNS:{填写域名},IP:{填写ip地址}
CA证书用于给不同的通信端提供权威授权的认证,当某个节点接收到SSL请求时,先验证是否证书是否为CA签名,如果CA签名则信任该请求。一般公司内部,可以设置一台机器作为CA认证服务中心。
由于OpenSSL的bug,x509 模块不会将请求的扩展字段从 CSR 复制到最终证书中。所以制作ca需要依靠指定config的方式。
ca.cnf文件内容如下:
HOME = . RANDFILE = $ENV::HOME/.rnd #################################################################### [ ca ] default_ca = CA_default # The default ca section [ CA_default ] base_dir = . certificate = $base_dir/ca-cert # The CA certifcate private_key = $base_dir/cakey.pem # The CA private key new_certs_dir = $base_dir # Location for new certs after signing database = $base_dir/index.txt # Database index file serial = $base_dir/serial.txt # The current serial number default_days = 1000 # How long to certify for default_crl_days = 30 # How long before next CRL default_md = sha256 # Use public key default MD preserve = no # Keep passed DN ordering x509_extensions = ca_extensions # The extensions to add to the cert email_in_dn = no # Don't concat the email in the DN copy_extensions = copy # Required to copy SANs from CSR to cert #################################################################### [ req ] default_bits = 4096 default_keyfile = cakey.pem distinguished_name = ca_distinguished_name x509_extensions = ca_extensions string_mask = utf8only #################################################################### [ ca_distinguished_name ] countryName = Country Name (2 letter code) countryName_default = DE stateOrProvinceName = State or Province Name (full name) stateOrProvinceName_default = Test Province localityName = Locality Name (eg, city) localityName_default = Test Town organizationName = Organization Name (eg, company) organizationName_default = Test Company organizationalUnitName = Organizational Unit (eg, division) organizationalUnitName_default = Test Unit commonName = Common Name (e.g. server FQDN or YOUR name) commonName_default = Test Name emailAddress = Email Address emailAddress_default = test@test.com #################################################################### [ ca_extensions ] subjectKeyIdentifier = hash authorityKeyIdentifier = keyid:always, issuer basicConstraints = critical, CA:true keyUsage = keyCertSign, cRLSign #################################################################### [ signing_policy ] countryName = optional stateOrProvinceName = optional localityName = optional organizationName = optional organizationalUnitName = optional commonName = supplied emailAddress = optional #################################################################### [ signing_req ] subjectKeyIdentifier = hash authorityKeyIdentifier = keyid,issuer basicConstraints = CA:FALSE keyUsage = digitalSignature, keyEncipherment
然后创建数据库和序列号文件,这些文件将用于跟踪使用该 CA 签署的证书。这两个文件都是简单的文本文件,与您的 CA 密钥位于同一目录中。
echo 01 > serial.txt
touch index.txt
#需要主机认证的时候
openssl req -x509 -config ca.cnf -newkey rsa:4096 -sha256 -nodes -out ca-cert -outform PEM
#不需要主机认证
openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650
将生成的CA添加到服务端和客户端的信任库,以便服务端和客户端可以信任这个CA:
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
用步骤2生成的CA签名步骤1生成的证书。首先需要导出请求文件。
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
cert-file: 服务器的未签名证书
然后用CA进行签名认证。
#不需要主机认证的时候
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:123456
#需要主机验证的时候
openssl ca -config ca.cnf -policy signing_policy -extensions signing_req -out cert-signed -infiles cert-file
最后,你需要导入CA的证书和已签名的证书到密钥仓库:
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
备注:如果服务端对客户端的连接有认证需求,可以使用相同的步骤生成客户端的keystore文件,然后使用CA证书进行认证。
如果需要启用SSL,生成对应的证书文件后,server.properties如下配置:
#如果broker之间不需要启用SSL,则同时需要配置SSL和PLAINTEXT
listeners=PLAINTEXT://host.name:port,SSL://host.name:port
#如果broker之间也使用SSL,则需要设置
#security.inter.broker.protocol=SSL
#listeners=SSL://host.name:port
ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=123456
#关闭主机名验证时设置以下参数
ssl.endpoint.identification.algorithm=
但从 Kafka 2.0.0 开始,默认为客户端连接以及代理间连接启用服务器主机名验证,可以通过设置ssl.endpoint.identification.algorithm为空字符串来关闭主机名验证。
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=123456
#关闭主机名验证时设置以下参数
ssl.endpoint.identification.algorithm=
kafka-console-producer.sh --bootstrap-server localhost:9095 --topic test --producer.config client-ssl.properties
kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic test --consumer.config client-ssl.properties
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。