赞
踩
- 本文主要介绍在 Kafka 中如何配置 Kerberos 认证,以及 java 使用 JAAS 来进行 Kerberos 认证连接。
- 本文演示为单机版。
查看 Kerberos 版本命令:
klist -V
软件名称 | 版本 |
---|---|
jdk | 1.8.0_202 |
kafka | 2.12-2.2.1 |
kerberos | 1.15.1 |
- Kerberos 是一种由 MIT(麻省理工大学)提出的网络身份验证协议,它旨在通过使用密钥加密技术为客户端和服务器应用程序提供强身份验证。
- Kerberos 是一种基于加密 Ticket 的身份认证协议,主要由三个部分组成:Key Distribution Center (即KDC)、Client 和 Service:
客户端会先访问两次 KDC,然后再访问目标服务,如:HTTP 服务、Zookeeper 服务、Kafka 服务等。
yum install krb5-server krb5-workstation krb5-libs -y
在
/etc/hosts
文件中新增本机映射(我这里的主机名是monkey
)。
127.0.0.1 monkey
根据需要修改
/etc/krb5.conf
文件,其中WLF.COM
你可以改成你需要的,还有monkey
是你的主机映射。
# Configuration snippets may be placed in this directory as well includedir /etc/krb5.conf.d/ [logging] default = FILE:/var/log/krb5libs.log kdc = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log [libdefaults] dns_lookup_realm = false ticket_lifetime = 24h renew_lifetime = 7d forwardable = true rdns = false pkinit_anchors = FILE:/etc/pki/tls/certs/ca-bundle.crt default_realm = WLF.COM #default_ccache_name = KEYRING:persistent:%{uid} [realms] WLF.COM = { kdc = monkey admin_server = monkey } [domain_realm] .monkey = WLF.COM monkey = WLF.COM
- 修改
/var/kerberos/krb5kdc/kdc.conf
,kdc的专属配置文件。- Java 使用
aes256-cts
验证方式需要安装额外的 jar 包,所以为了方便不用哈。
[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88
[realms]
WLF.COM = {
#master_key_type = aes256-cts
acl_file = /var/kerberos/krb5kdc/kadm5.acl
dict_file = /usr/share/dict/words
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
max_file = 24h
max_renewable_life = 7d
supported_enctypes = aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal
}
- 修改权限相关配置文件
/var/kerberos/krb5kdc/kadm5.acl
- 其中前一个
*
号是通配符,表示像名为“abc/admin”或“xxx/admin”的人都可以使用此工具(远程或本地)管理kerberos数据库,后一个跟权限有关,*
表示所有权限。WLF.COM是上面配置的realm。
*/admin@WLF.COM *
kdb5_util create -r WLF.COM -s
ll -a /var/kerberos/krb5kdc/
systemctl start krb5kdc kadmin
systemctl status krb5kdc kadmin
Kerberos 服务机器上可以使用 kadmin.local
来执行各种管理的操作!
所有Kerberos 相关配置文件(java连接所需),我们都放在 Kafka 的
config/kerberos/
目录下的(kerberos 目录需新建),把krb5.conf
也拷贝过去。
cd /opt/kafka_2.12-2.2.1/
cp /etc/krb5.conf config/kerberos/cd
kadmin.local -q "add_principal -randkey kafka-server/monkey@WLF.COM"
kadmin.local -q "add_principal -randkey kafka-client@WLF.COM"
kadmin.local -q "xst -k config/kerberos/kafka-server.keytab kafka-server/monkey@WLF.COM"
kadmin.local -q "xst -k config/kerberos/kafka-client.keytab kafka-client@WLF.COM"
修改
config/server.properties
配置文件,新增如下内容:
listeners=SASL_PLAINTEXT://monkey:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka-server
新建
config/kerberos/kafka-server-jaas.conf
文件,内容如下:
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/opt/kafka_2.12-2.2.1/config/kerberos/kafka-server.keytab"
storeKey=true
useTicketCache=false
principal="kafka-server/monkey@WLF.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/opt/kafka_2.12-2.2.1/config/kerberos/kafka-client.keytab"
storeKey=true
useTicketCache=false
principal="kafka-client@WLF.COM";
};
bin/kafka-server-start.sh
,倒数第二行增加如下配置:export KAFKA_OPTS="-Dzookeeper.sasl.client=false -Dzookeeper.sasl.client.username=zk-server -Djava.security.krb5.conf=/opt/kafka_2.12-2.2.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/opt/kafka_2.12-2.2.1/config/kerberos/kafka-server-jaas.conf"
bin/kafka-topics.sh、kafka-console-producer.sh、bin/kafka-console-consumer.sh
,倒数第二行增加如下配置:export KAFKA_OPTS="-Djava.security.krb5.conf=/opt/kafka_2.12-2.2.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/opt/kafka_2.12-2.2.1/config/kerberos/kafka-client-jaas.conf"
config/kerberos/client.properties
文件,内容如下:security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka-server
operate.sh
#/bin/bash :<<! 【脚本说明】 1、此脚本用于操作kafka:启动、停止、查看运行状态、重启、查看日志、查询所有主题、创建主题、删除主题、订阅或消费主题数据; 2、建议把脚本放在kafka安装目录下; 3、适用单机版。 注意安装kafka修改配置文件: # IP替换为Kafka所在主机的IP sed -i '31 a listeners=PLAINTEXT://localhost:9092' config/server.properties ! # kafka安装目录 KAFKA_HOME=/opt/kafka_2.12-2.2.1 # zookeeper地址 ZK_SERVER=monkey # kafka地址 KAFKA_SERVER=monkey # zk启动日志 LOG_ZK=$KAFKA_HOME/logs/zookeeper-run.log # kafka启动日志 LOG_KAFKA=$KAFKA_HOME/logs/kafka-run.log # sasl CONF_SASL=config/kerberos/client.properties # 操作 operate=$1 # 参数 param=$2 # 进程 pids=`ps -ef | egrep "Kafka|QuorumPeerMain" | egrep -v grep | awk '{print $2}'` # 提示信息 msg='Please input params [<run>|<kil>|<res>|<sta>|<log> [zk]|<list>|<add> <{topic}>|<del> [{topic}]|<consume> <{topic}>|<produce> <{topic}>]' # 定制化shell输出 function custom_print(){ echo -e "\033[5;34m ***** \033[0m" echo -e "\033[32m $@ ! \033[0m" echo -e "\033[5;34m ***** \033[0m" } function run(){ rm -rf $LOG_ZK $LOG_KAFKA # 先启动zk echo "start zookeeper ..." nohup $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties > $LOG_ZK 2>&1 & sleep 5 # 再启动kafka nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_KAFKA 2>&1 & } function stop(){ if [[ $pids ]]; then kill -9 $pids msg='Stopped success' custom_print $msg else msg='The service is already down' custom_print $msg fi } function restart(){ if [[ $pids ]]; then kill -9 $pids fi run msg='Restart success' custom_print $msg } function status(){ jps | egrep "Kafka|QuorumPeerMain" if [[ $pids ]]; then # 黄底蓝字 msg='RUNNING' custom_print $msg else # 蓝底黑字 echo -e "\033[5;34m ***** \033[0m" echo -e "\033[31m STOPPED ! \033[0m" echo -e "\033[5;34m ***** \033[0m" fi } function log(){ if [[ -e $1 ]]; then tail -f $1 else msg='No log has been generated yet' custom_print $msg fi } # 判断输入参数 if [[ $operate = "run" || $operate = "start" ]]; then if [[ $pids ]]; then msg='The service is already running' custom_print $msg else run msg='Start success' custom_print $msg fi elif [[ $operate = "kil" || $operate = "stop" ]]; then stop elif [[ $operate = "res" || $operate = "restart" ]]; then restart elif [[ $operate = "sta" || $operate = "status" ]]; then status elif [[ $operate = "log" ]]; then if [[ $param = "zk" ]]; then log $LOG_ZK else log $LOG_KAFKA fi elif [[ $operate = "list" ]]; then $KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_SERVER:2181 --list elif [[ $operate = "add" && ! -z $param ]]; then $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server $KAFKA_SERVER:9092 --replication-factor 1 --partitions 1 --topic $param msg="$param create success" custom_print $msg elif [[ $operate = "del" ]]; then if [[ -z $param ]]; then topics=`$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_SERVER:2181 --list` for topic in $topics; do if [[ $topic != "__consumer_offsets" ]]; then $KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_SERVER:2181 --delete --topic $topic> /dev/null msg="$topic delete success" custom_print $msg fi done else $KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_SERVER:2181 --delete --topic $param > /dev/null msg="$param delete success" custom_print $msg fi elif [[ $operate = "consume" && ! -z $param ]]; then if [[ -z $CONF_SASL ]]; then $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER:9092 --from-beginning --topic $param else $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER:9092 --from-beginning --topic $param --consumer.config $CONF_SASL fi elif [[ $operate = "produce" && ! -z $param ]]; then if [[ -z $CONF_SASL ]]; then $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $KAFKA_SERVER:9092 --topic $param else $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $KAFKA_SERVER:9092 --topic $param --producer.config $CONF_SASL fi else custom_print $msg fi
- kdc服务默认端口是88。
firewall-cmd --zone=public --add-port=88/udp --permanent
firewall-cmd --reload
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1</version>
</dependency>
注意:需要修改
kafka-client-jaas.conf
配置文件中配置的kafka-client.keytab
路径!
package com.cloudansys; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.Future; public class TestKafkaKerberos { public static void main(String[] args) { // 消费者 testConsumer(); // 生产者 testProducer(); } private static void testConsumer() { System.setProperty("java.security.auth.login.config", "F:\\test\\kerberos\\kafka-client-jaas.conf"); System.setProperty("java.security.krb5.conf", "F:\\test\\kerberos\\krb5.conf"); Properties props = new Properties(); props.put("bootstrap.servers", "monkey:9092"); props.put("group.id", "test_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // sasl props.put("sasl.mechanism", "GSSAPI"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.kerberos.service.name", "kafka-server"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); String topic = "test"; consumer.subscribe(Collections.singletonList(topic)); while (true) { try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, partition = %d, key = %s, value = %s%n", record.offset(), record.partition(), record.key(), record.value()); } } catch (Exception e) { e.printStackTrace(); } } } private static void testProducer() { // JAAS配置文件路径和Kerberos配置文件路径 System.setProperty("java.security.auth.login.config", "F:\\test\\kerberos\\kafka-client-jaas.conf"); System.setProperty("java.security.krb5.conf", "F:\\test\\kerberos\\krb5.conf"); // kafka属性配置 Properties props = new Properties(); props.put("bootstrap.servers", "monkey:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // kerberos安全认证 props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "GSSAPI"); props.put("sasl.kerberos.service.name", "kafka-server"); String topic = "test"; String msg = "this is a test msg"; KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg); // 发送消息记录 Future<RecordMetadata> future = kafkaProducer.send(record); try { RecordMetadata metadata = future.get(); System.out.printf("Message sent to Kafka topic=%s, partition=%d, offset=%d\n", metadata.topic(), metadata.partition(), metadata.offset()); } catch (Exception e) { e.printStackTrace(); } kafkaProducer.close(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。