当前位置:   article > 正文

【Kafka】手把手SASL,SSL教学_kafka sasl_ssl

kafka sasl_ssl

Kafka配置SASL

1.确定使用的SASL协议

Kafka支持以下SASL机制:GSSAPI 、PLAIN、 SCRAM-SHA-256、 SCRAM-SHA-512、 OAUTHBEARER。

本指南主要以SCRAM机制配置为主。

2.准备用户凭证

当使用SCRAM机制时,Kafka使用Zookeeper存储用户加密后的凭证,所以需要先使用Kafka提供的脚本进行用户的创建。

比如创建用户名为kafkaAdmin,密码为admin用户的操作命令如下:

> bin/Kafka-configs.sh --zookeeper localhost:2182 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name KafkaAdmin
  • 1

3.设置JAAS安全验证文件

Kafka使用Java的JAAS机制进行安全验证文件的加载,首先创建文件命名为broker_jaas.config,内容如下:

KafkaServer {
	org.apache.kafka.common.security.scram.ScramLoginModule required 
	username="KafkaAdmin" 
	password="admin";
};

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

然后编辑/bin/kafka-server-start.sh文件,使用Java的-Djava.security.auth.login.config参数指定JAAS文件位置。

[kafka@CTSP1 bin]$ cat kafka-server-start.sh 
#!/bin/bash
export JMX_PORT=9876
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=10.10.92.17"

#指定JAAS文件
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/config/broker_jaas.config"
if [ $# -lt 1 ];
then
	echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
	exit 1
fi
base_dir=$(dirname $0)


if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G "
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

4.配置server.properties文件

修改/config/server.properties文件,启用安全认证协议。修改项如下:

#指定客户端连接使用的协议以及地址,端口
listeners=SASL_PLAINTEXT://127.0.0.1:9092

#指定服务器之间使用的安全连接协议,可选的协议有PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。
security.inter.broker.protocol=SASL_PLAINTEXT

#指定启用的SASL认证机制,可选项有PLAIN,GSSAPI,SCRAM。
sasl.enabled.mechanisms=SCRAM-SHA-256

#指定集群内部的认证机制
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

5.配置Kafka客户端

客户端可以通过指定JAAS文件的方式使用或者在客户端创建的Producer或者Consumer的属性中添加对应的键值对使用。

配置客户端JAAS格式文件如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafkaAdmin" password="admin";
  • 1
  • 2
  • 3

在bin目录中的脚本文件,可以通过–command-config参数指定JAAS文件,或者是生产者脚本的–producer.config或者消费者脚本的–consumer.config参数指定JAAS文件进行验证。

Kafka配置SSL

1.为每个Broker生成 SSL 密钥和证书

为集群的每一个broker生成对应节点的密钥对,执行命令后会要求设置密码,这个密码需要记下来已备后面使用。

以下命令为生成一个10年的有效期的密钥对,密码为123456 :

keytool -keystore server.keystore.jks -alias localhost -validity 3650 -genkey -keyalg RSA -storetype pkcs12
  • 1

注意点:Kafka默认启用了SSL的主机名验证功能,可以设置配置文件参数ssl.endpoint.identification.algorithm为空方便测试。启用验证的时候,制作密钥对需要添加SAN信息,握手过程中会对当前连接的域名和地址进行校验,如果不是预期的域名和IP则握手失败。

keytool -keystore server.keystore.jks -alias localhost -validity 3650 -genkey -keyalg RSA -storetype pkcs12 -ext SAN=DNS:{填写域名},IP:{填写ip地址}
  • 1

2.生成CA证书

CA证书用于给不同的通信端提供权威授权的认证,当某个节点接收到SSL请求时,先验证是否证书是否为CA签名,如果CA签名则信任该请求。一般公司内部,可以设置一台机器作为CA认证服务中心。

由于OpenSSL的bug,x509 模块不会将请求的扩展字段从 CSR 复制到最终证书中。所以制作ca需要依靠指定config的方式。

ca.cnf文件内容如下:

HOME            = .
RANDFILE        = $ENV::HOME/.rnd

####################################################################
[ ca ]
default_ca    = CA_default      # The default ca section

[ CA_default ]

base_dir      = .
certificate   = $base_dir/ca-cert   # The CA certifcate
private_key   = $base_dir/cakey.pem    # The CA private key
new_certs_dir = $base_dir              # Location for new certs after signing
database      = $base_dir/index.txt    # Database index file
serial        = $base_dir/serial.txt   # The current serial number

default_days     = 1000         # How long to certify for
default_crl_days = 30           # How long before next CRL
default_md       = sha256       # Use public key default MD
preserve         = no           # Keep passed DN ordering

x509_extensions = ca_extensions # The extensions to add to the cert

email_in_dn     = no            # Don't concat the email in the DN
copy_extensions = copy          # Required to copy SANs from CSR to cert

####################################################################
[ req ]
default_bits       = 4096
default_keyfile    = cakey.pem
distinguished_name = ca_distinguished_name
x509_extensions    = ca_extensions
string_mask        = utf8only

####################################################################
[ ca_distinguished_name ]
countryName         = Country Name (2 letter code)
countryName_default = DE

stateOrProvinceName         = State or Province Name (full name)
stateOrProvinceName_default = Test Province

localityName                = Locality Name (eg, city)
localityName_default        = Test Town

organizationName            = Organization Name (eg, company)
organizationName_default    = Test Company

organizationalUnitName         = Organizational Unit (eg, division)
organizationalUnitName_default = Test Unit

commonName         = Common Name (e.g. server FQDN or YOUR name)
commonName_default = Test Name

emailAddress         = Email Address
emailAddress_default = test@test.com

####################################################################
[ ca_extensions ]

subjectKeyIdentifier   = hash
authorityKeyIdentifier = keyid:always, issuer
basicConstraints       = critical, CA:true
keyUsage               = keyCertSign, cRLSign

####################################################################
[ signing_policy ]
countryName            = optional
stateOrProvinceName    = optional
localityName           = optional
organizationName       = optional
organizationalUnitName = optional
commonName             = supplied
emailAddress           = optional

####################################################################
[ signing_req ]
subjectKeyIdentifier   = hash
authorityKeyIdentifier = keyid,issuer
basicConstraints       = CA:FALSE
keyUsage               = digitalSignature, keyEncipherment
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

然后创建数据库和序列号文件,这些文件将用于跟踪使用该 CA 签署的证书。这两个文件都是简单的文本文件,与您的 CA 密钥位于同一目录中。

echo 01 > serial.txt
touch index.txt
  • 1
  • 2
#需要主机认证的时候
openssl req -x509 -config ca.cnf -newkey rsa:4096 -sha256 -nodes -out ca-cert -outform PEM
#不需要主机认证
openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650
  • 1
  • 2
  • 3
  • 4

3.创建信任库

将生成的CA添加到服务端和客户端的信任库,以便服务端和客户端可以信任这个CA:

keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
  • 1
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
  • 1

4.签名证书

用步骤2生成的CA签名步骤1生成的证书。首先需要导出请求文件。

keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
  • 1

cert-file: 服务器的未签名证书

然后用CA进行签名认证。

#不需要主机认证的时候
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:123456
#需要主机验证的时候
openssl ca -config ca.cnf -policy signing_policy -extensions signing_req -out cert-signed -infiles cert-file
  • 1
  • 2
  • 3
  • 4

最后,你需要导入CA的证书和已签名的证书到密钥仓库:

keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
  • 1
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
  • 1

备注:如果服务端对客户端的连接有认证需求,可以使用相同的步骤生成客户端的keystore文件,然后使用CA证书进行认证。

5.服务端配置

如果需要启用SSL,生成对应的证书文件后,server.properties如下配置:

#如果broker之间不需要启用SSL,则同时需要配置SSL和PLAINTEXT
listeners=PLAINTEXT://host.name:port,SSL://host.name:port

#如果broker之间也使用SSL,则需要设置
#security.inter.broker.protocol=SSL
#listeners=SSL://host.name:port

ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=123456
#关闭主机名验证时设置以下参数
ssl.endpoint.identification.algorithm=
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

6.客户端配置文件

但从 Kafka 2.0.0 开始,默认为客户端连接以及代理间连接启用服务器主机名验证,可以通过设置ssl.endpoint.identification.algorithm为空字符串来关闭主机名验证。

security.protocol=SSL
ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=123456
#关闭主机名验证时设置以下参数
ssl.endpoint.identification.algorithm=
  • 1
  • 2
  • 3
  • 4
  • 5

7.测试方法

kafka-console-producer.sh --bootstrap-server localhost:9095 --topic test --producer.config client-ssl.properties
kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic test --consumer.config client-ssl.properties
  • 1
  • 2
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/391511
推荐阅读
相关标签
  

闽ICP备14008679号