赞
踩
使用SASL / SCRAM进行身份验证
请先在不配置任何身份验证的情况下启动Kafka
1. 创建SCRAM Credentials
1.1 创建broker通信用户(或称超级用户) bash Emacs bin/kafka-configs.sh --zookeeper centos1:2181 --alter --add-config 'SCRAM-SHA-256=[password=adminpwd],SCRAM-SHA-512=[password=adminpwd]' --entity-type users --entity-name admin
1.2 创建客户端用户dbcUser bash Emacs bin/kafka-configs.sh --zookeeper centos1:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=changeit],SCRAM-SHA-512=[password=changeit]' --entity-type users --entity-name dbcUser
1.3 查看SCRAM证书 bash Emacs bin/kafka-configs.sh --zookeeper centos1:2181 --describe --entity-type users --entity-name dbcUser
删除SCRAM证书 (只是说明功能,这里不执行) bash Emacs bin/kafka-configs.sh --zookeeper centos1:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name dbcUser
2. 配置Kafka Brokers
2.1 在每个Kafka broker的config目录中添加一个kafka_server_jaas.conf,内容如下 bash Emacs KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="adminpwd"; };
注意:不要少写了分号
2.2 将JAAS配置文件位置作为JVM参数传递给每个Kafka broker
修改 kafka/bin/kafka-server-start.sh
将exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" 注释, 增加下面的内容 bash Emacs #exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf kafka.Kafka "$@"
或者不修改kafka-server-start.sh脚本, 而是将下面的内容添加到~/.bashrc bash Emacs export KAFKA_PLAIN_PARAMS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf" export KAFKA_OPTS="$KAFKA_PLAIN_PARAMS $KAFKA_OPTS"
2.3 在server.properties中配置SASL端口和SASL机制。 bash Emacs # 认证配置 listeners=SASL_PLAINTEXT://centos1:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 sasl.enabled.mechanisms=SCRAM-SHA-256 # ACL配置 allow.everyone.if.no.acl.found=false super.users=User:admin authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
2.4 重启ZK/Kafka
重启ZK / Kafka服务. 所有broker在连接之前都会引用’kafka_server_jaas.conf’. bash Emacs #重启所有zookeeper bin/zookeeper-server-stop.sh bin/zookeeper-server-start.sh -daemon config/zookeeper.properties #重启说有broker bin/kafka-server-stop.sh bin/kafka-server-start.sh -daemon config/server.properties
3. 客户端配置
先使用kafka-console-producer 和 kafka-console-consumer 测试一下
kafka-console-producer
1. 创建 config/client-sasl.properties 文件 bash Emacs security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256
2. 创建 config/kafka_client_jaas_admin.conf 文件 bash Emacs KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="adminpwd"; };
3. 修改kafka-console-producer.sh脚本
这里我们复制一份新文件来改 bash Emacs cp bin/kafka-console-producer.sh bin/kafka-console-producer-admin.sh vi bin/kafka-console-producer-admin.sh #exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@" exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_admin.conf kafka.tools.ConsoleProducer "$@"
4. 创建测试topic(之前以及创建则不用创建) bash Emacs bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test
5. 测试生产消息 bash Emacs bin/kafka-console-producer-admin.sh --broker-list centos1:9092 --topic test --producer.config config/client-sasl.properties [wanghy@centos1 kafka]$ bin/kafka-console-producer-admin.sh --broker-list centos1:9092 --topic test --producer.config config/client-sasl.properties >hello, I am admin >
可以看到admin用户无需配置ACL就可以发送消息
6. 测试 dbcUser 用户
创建一个bin/kafka-console-producer-dbc.sh文件 bash Emacs cp bin/kafka-console-producer-admin.sh bin/kafka-console-producer-dbc.sh exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_dbc.conf kafka.tools.ConsoleProducer "$@"
创建kafka_client_jaas_dbc.conf文件 bash Emacs cp config/kafka_client_jaas_admin.conf config/kafka_client_jaas_dbc.conf KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="dbcUser" password="changeit"; };
生产消息 bash Emacs [wanghy@centos1 kafka]$ bin/kafka-console-producer-dbc.sh --broker-list centos1:9092 --topic stest --producer.config config/client-sasl.properties >hello, I am dbcUser [2019-03-15 09:47:28,483] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {stest=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient) [2019-03-15 09:47:28,486] ERROR Error when sending message to topic stest with key: null, value: 19 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [stest] >
报错了,提示没有访问权限
kafka-console-consumer
1. 创建 config/consumer-dbc.properties 文件 bash Emacs security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 group.id=dbc-group
2. 创建 bin/kafka-console-consumer-dbc.sh 文件 bash Emacs cp bin/kafka-console-consumer.sh bin/kafka-console-consumer-dbc.sh vi bin/kafka-console-consumer-dbc.sh
修改内容如下 bash Emacs #exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@" exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_dbc.conf kafka.tools.ConsoleConsumer "$@"
3. 测试消费者 bash Emacs bin/kafka-console-consumer-dbc.sh --bootstrap-server centos1:9092 --topic test --consumer.config config/consumer-dbc.properties --from-beginning [2019-03-15 10:03:27,794] WARN [Consumer clientId=consumer-1, groupId=dbc-group] Error while fetching metadata with correlation id 2 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient) [2019-03-15 10:03:27,796] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test] Processed a total of 0 messages
没有权限
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。