赞
踩
1、kafka sasl认证配置
配置环境
1.1 单机+身份验证配置(SASL/PLAIN认证方式)
1.1.1 服务端配置
1、服务器节点下配置服务器JASS文件,命名为kafka_server_jaas.conf
#cd /opt/modules/kafka_2.11-2.0.0/config #vi kafka_server_jaas.conf KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="eystar" password="eystar8888" user_eystar="eystar8888" user_yxp="yxp-secret"; }; |
说明:
username +password 表示kafka集群环境各个代理之间进行通信时使用的身份验证信息。
user_eystar="eystar8888" 表示定义客户端连接到代理的用户信息,即创建一个用户名为eystar,密码为eystar8888的用户身份信息,kafka代理对其进行身份验证,可以创建多个用户,格式user_XXX=”XXX”;
2、将JAAS配置到kafka服务器节点的JVM启动参数中,复制kafka的启动脚本kafka-server-start.sh,命名kafka-server-start-saal.sh,如下操作
#cd /opt/modules/kafka_2.11-2.0.0/bin #vi kafka-server-start-saal.sh if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/modules/kafka_2.11-2.0.0/config/kafka_server_jaas.conf" fi |
3、配置服务器节点服务信息文件server-sasl.properties的SASL的mechanism和port,拷贝server.properties命名为server-sasl.properties
#cd /opt/modules/kafka_2.11-2.0.0/config #vi server-sasl.properties listeners=SASL_PLAINTEXT://192.168.1.176:7792 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN |
其他配置信息,根据实际情况配置
broker.id=0
zookeeper.connect=192.168.1.176:7781
log.dirs=/opt/data/kafka
1.1.2 客户端配置
1、配置客户端JAAS文件,命名为kafka_client_jaas.conf
# vi kafka_client_jaas.conf KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="eystar" password="eystar8888"; }; |
2、JAVA调用的KafkaClient客户端连接时指定配置属性sasl.jaas.config
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="eystar" \ password="eystar8888"; 即配置属性: Pro.set(“sasl.jaas.config”,”org.apache.kafka.common.security.plain.PlainLoginModule required username=\"eystar\" password=\"eystar8888\";"; ”); |
3、命令行连接时,配置消费者consumer.properties
#cd /opt/modules/kafka_2.11-2.0.0/config #vi consumer.properties security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN |
4、配置消费者脚本kafka-console-consumer.sh文件,添加JVM配置数据
#cd /opt/modules/kafka_2.11-2.0.0/bin #vi kafka-console-consumer.sh if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/modules/kafka_2.11-2.0.0/config/kafka_client_jaas.conf" fi |
5、客户端生产者和消费者配置一样
1.2 集群+身份验证配置(SASL/PLAIN认证方式)
1.3 常用操作命令
1.3.1启动
指定SASL配置文件
1、 只输出错误日志
#nohup ./kafka-server-start-saal.sh ../config/server-sasl.properties >/dev/null 2>log & |
2、什么信息也不要
#nohup ./kafka-server-start-saal.sh ../config/server-sasl.properties >/dev/null 2>&1 & |
指定默认配置文件
#nohup ./kafka-server-start.sh ../config/server.properties >/dev/null 2>&1 & |
1.3.2 停止
1、通过脚本
# ./kafka-server-stop.sh |
2、通过kill pid
#jps 10765 Kafka #kill -9 10765 |
1.3.3 创建TOPIC
无限制权限TOPIC
# ./kafka-topics.sh --zookeeper test176:7781 --replication-factor 1 --partitions 6 --topic heartbeat_info --create |
添加用户权限TOPIC
#./kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.1.176:7781 --add --allow-principal User:eystar --operation Read --operation Write --topic heartbeat_info |
1.3.4 查看TOPIC授权
#./kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.1.176:7781 --list --topic heartbeat_info |
1.3.5 删除TOPIC
#./kafka-topics.sh --zookeeper test176:7781 --topic heartbeat_info --delete |
1.3.6 生产消息
#kafka-console-producer.sh --broker-list test176:7792 --topic heartbeat_info >123456data |
认证方式配置kafka-console-producer.sh的JVM参数,详见客户端配置
1.3.7 消费消息
#./kafka-console-consumer.sh --bootstrap-server test176:7792 --topic heartbeat_info |
认证方式配置kafka-console-consumer.sh的JVM参数,详见客户端配置
1.3.8 查看TOPIC列表
#./kafka-topics.sh --zookeeper test176:7781 --list |
1.3.9 查看TOPIC详情
#./kafka-topics.sh --describe --zookeeper test176:7781 --topic heartbeat_info |
2、JAVA中client身份认证SASL实现
方式一:配置Properties 属性
//安全模式 用户名 密码
props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"usn\" password=\"pwd\";");
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "PLAIN");
方式二:设置系统属性参数
// 指定kafka_client_jaas.conf文件路径
String confPath = TestKafkaComsumer.class.getResource("/").getPath()+ "/kafka_client_jaas.conf";
System.setProperty("java.security.auth.login.config", confPath);
方式三:配置程序启动时参数
可以在程序启动参数中设置java.security.auth.login.config
也就是启动参数加上-Djava.security.auth.login.config=D:/workspace/kafka_client_jaas.conf。
具体代码实现
消费者:
public class TestComsumer { public static void main(String[] args) { Properties props = new Properties(); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); } |
生产者:
public class TestProduce { public static void main(String args[]) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.176:9092"); //sasl Producer<String, String> producer = new KafkaProducer<>(props); try { } catch (InterruptedException e) { } } |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。