赞
踩
接到一个任务,调查一下Kafka的权限机制。捣鼓了2天,终于弄出来了。期间走了不少的坑。还有一堆不靠谱的家伙的博客。
根据Kafka的官网文档可知,Kafka的权限认证主要有如下三种:
其中SSL会导致数据传输延迟,所以不做推荐。一般常用的是后两种,第三种最方便。
Demo Provider & Consumer代码在github上,地址如下:
https://github.com/SeanYanxml/bigdata
操作步骤主要包括3点:
有的博客ZooKeeper和Kafka的配置配置在一台机器上,ZooKeeper读取的是Kafka的zookeeper.properties的配置,需要明确,不要弄混了。
1 zoo.cfg文件配置
添加如下配置:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
2 在conf/目录下创建文件名为zk_server_jaas.conf
的文件。
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
配置文件我命名为zk_server_jaas.conf,并放在部署目录的conf/下。文件中定义了身份认证类(org.apache.kafka.common.security.plain.PlainLoginModule),可以看到这个认证类是kafka命名空间,也就是需要加入kafka的插件,所以下面一步非常重要。
这个文件中定义了两个用户,一个是kafka,一个是producer(user_可以定义多个用户,等于的值就是用户密码),这些用user_配置出来的用户都可以提供给生产者程序和消费者程序认证使用。还有两个属性,username和password,其中username是配置Zookeeper节点之间内部认证的用户名,password是对应的密码。
3 拷贝第三方Jar包到ZooKeeper目录。创建一个在conf/目录下,创建sasl_jars目录,放置这些Jar包。
第三方的Jar包包括,可以从https://mvnrepository.com/ 查看需要的包依赖。
kafka-clients-1.0.0.jar
lz4-java-1.4.jar
org.osgi.core-4.3.0.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.4.jar
4 更改脚本bin/zkEnv.sh
,让ZooKeeper启动时,加载Jar包和jaas.conf文件
for i in "$ZOOBINDIR"/../conf/sasl_jars/*.jar; do
CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "
1 创建kafka_server_jaas.conf
文件
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_alice="alice"; }; KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice"; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret"; };
2 配置server.properties
配置文件
# server.properties
listeners=SASL_PLAINTEXT://ip(127.0.0.1):port(9092)
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# default false | true to accept all the users to use it.
#allow.everyone.if.no.acl.found=true
super.users=User:admin;User:alice
3 最后需要为 Kafka 添加 java.security.auth.login.config 环境变量。在 bin/kafka-run-class.sh 中添加以下内容
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/opt/meituan/kafka_2.10-0.10.0.0/config/kafka_server_jaas.conf'
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi
4 为你需要使用的用户授权
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --add --allow-principal User:alice --operation Read --operation Write --topic sean-security
查询已经授权的用户
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --list --topic sean-security
1 创建kafka_client_jaas.conf
文件
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice2";
};
2 让程序加载这个配置文件
System.setProperty("java.security.auth.login.config",
"/Users/Sean/Documents/Gitrep/bigdata/kafka/src/main/resources/kafka_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
并且注意要把端口从9092改成你写的PLAIN后面的端口号。
[cpic@62rhel kafka_2.11-1.0.0]$ ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --add --allow-principal User:alice --operation Read --operation Write --topic sean-security
Adding ACLs for resource `Topic:sean-security`:
User:alice has Allow permission for operations: Read from hosts: *
User:alice has Allow permission for operations: Write from hosts: *
Current ACLs for resource `Topic:sean-security`:
User:alice has Allow permission for operations: Read from hosts: *
User:alice has Allow permission for operations: Write from hosts: *
[cpic@62rhel kafka_2.11-1.0.0]$ ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --list --topic sean-security
Current ACLs for resource `Topic:sean-security`:
User:alice has Allow permission for operations: Read from hosts: *
User:alice has Allow permission for operations: Write from hosts: *
ERROR Connection to node 0 failed authentication due to: Authen
解答: 权限控制没有配置 对,主要注意 zk_jass.conf kafka_jaas.conf kafka_client_jaas.conf 这三个文件的配置
Selector:189 - [Producer clientId=producer-1] Connection with 192.168.100.62/192.168.100.62 disconne
解答: 权限密码没有控制对,或者端口不是配置的端口。我之前写9092,配置PLAIN后面的端口为"9093",调试了半个多小时。
WARN
javax.security.auth.login.LoginException: No JAAS configuration section named ‘Client’ was found in specified JAAS configuration file: ‘/usr/cpic/apps/kafka_2.11-1.0.0/current/packages/kafka_2.11-1.0.0/config/kafka_server_jaas.conf’. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
解答:kafka_jaas.conf 文件没有client子项,写的很清楚了。
2018-03-14 16:42:57,206 [myid:] - ERROR [main:ZooKeeperServerMain@64] - Unexpected exception, exiting abnormally
java.io.IOException: Could not configure server because SASL configuration did not allow the ZooKeeper server to authenticate itself properly: javax.security.auth.login.LoginException: 无法找到 LoginModule 类: org.apache.kafka.common.security.plain.PlainLoginModule
at org.apache.zookeeper.server.ServerCnxnFactory.configureSaslLogin(ServerCnxnFactory.java:211)
at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:82)
at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:117)
at org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:87)
at org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:53)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:116)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:78)
解答:zookeeper.out的报错 ZooKeeper的classpath没有需要的Jar包
org.I0Itec.zkclient.exception.ZkAuthFailedException: Authentication failure
Kafka启动失败异常,密码不是ZooKeeper内配置的密码。
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client m
密码不一致错误
[1] 集群Kafka配置SASL用户名密码认证
[2] Kafka 0.10.0 SASL/PLAIN身份认证及权限实现
[3] Kafka JAAS Plain SASL 安全认证配置
[4] kafka使用SASL验证(官方文档中文版 0.10.1)
[5] Kafka ACLs in Practice – User Authentication and Authorization
[6] (StackOverFlow :kafka-sasl-zookeeper-authentication)
[7] StackOverFlow : kafka-sasl-plain-setup-with-ssl
[8] Kafka JAAS 安全认证流程
# provider 连接不上 log 19:29:52,228 INFO ConsumerConfig:223 - ConsumerConfig values: auto.commit.interval.ms = 1000 auto.offset.reset = latest bootstrap.servers = [192.168.100.62:9093, 192.168.100.63:9093, 192.168.100.64:9093] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = test heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = PLAIN security.protocol = SASL_PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 19:29:52,233 DEBUG KafkaConsumer:177 - [Consumer clientId=consumer-1, groupId=test] Initializing the Kafka consumer 19:29:52,467 DEBUG Metadata:270 - Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.100.63:9093 (id: -2 rack: null), 192.168.100.64:9093 (id: -3 rack: null), 192.168.100.62:9093 (id: -1 rack: null)], partitions = []) 19:29:52,586 INFO AbstractLogin:53 - Successfully logged in. 19:29:52,602 DEBUG Metrics:404 - Added sensor with name fetch-throttle-time 19:29:52,615 DEBUG Metrics:404 - Added sensor with name connections-closed: 19:29:52,620 DEBUG Metrics:404 - Added sensor with name connections-created: 19:29:52,621 DEBUG Metrics:404 - Added sensor with name successful-authentication: 19:29:52,621 DEBUG Metrics:404 - Added sensor with name failed-authentication: 19:29:52,622 DEBUG Metrics:404 - Added sensor with name bytes-sent-received: 19:29:52,623 DEBUG Metrics:404 - Added sensor with name bytes-sent: 19:29:52,625 DEBUG Metrics:404 - Added sensor with name bytes-received: 19:29:52,626 DEBUG Metrics:404 - Added sensor with name select-time: 19:29:52,627 DEBUG Metrics:404 - Added sensor with name io-time: 19:29:52,658 DEBUG Metrics:404 - Added sensor with name heartbeat-latency 19:29:52,661 DEBUG Metrics:404 - Added sensor with name join-latency 19:29:52,667 DEBUG Metrics:404 - Added sensor with name sync-latency 19:29:52,671 DEBUG Metrics:404 - Added sensor with name commit-latency 19:29:52,677 DEBUG Metrics:404 - Added sensor with name bytes-fetched 19:29:52,678 DEBUG Metrics:404 - Added sensor with name records-fetched 19:29:52,678 DEBUG Metrics:404 - Added sensor with name fetch-latency 19:29:52,679 DEBUG Metrics:404 - Added sensor with name records-lag 19:29:52,682 INFO AppInfoParser:109 - Kafka version : 1.0.0 19:29:52,682 INFO AppInfoParser:110 - Kafka commitId : aaa7af6d4a11b29d 19:29:52,684 DEBUG KafkaConsumer:177 - [Consumer clientId=consumer-1, groupId=test] Kafka consumer initialized 19:29:52,684 DEBUG KafkaConsumer:183 - [Consumer clientId=consumer-1, groupId=test] Subscribed to topic(s): sean-security 19:29:52,685 DEBUG AbstractCoordinator:183 - [Consumer clientId=consumer-1, groupId=test] Sending GroupCoordinator request to broker 192.168.100.62:9093 (id: -1 rack: null) 19:29:52,799 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Initiating connection to node 192.168.100.62:9093 (id: -1 rack: null) 19:29:52,881 DEBUG SaslClientAuthenticator:256 - Set SASL client state to SEND_APIVERSIONS_REQUEST 19:29:52,884 DEBUG SaslClientAuthenticator:150 - Creating SaslClient: client=null;service=kafka;serviceHostname=192.168.100.62;mechs=[PLAIN] 19:29:52,889 DEBUG Metrics:404 - Added sensor with name node--1.bytes-sent 19:29:52,890 DEBUG Metrics:404 - Added sensor with name node--1.bytes-received 19:29:52,890 DEBUG Metrics:404 - Added sensor with name node--1.latency 19:29:52,893 DEBUG Selector:195 - [Consumer clientId=consumer-1, groupId=test] Created socket with SO_RCVBUF = 66608, SO_SNDBUF = 131768, SO_TIMEOUT = 0 to node -1 19:29:52,898 DEBUG SaslClientAuthenticator:256 - Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE 19:29:52,899 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Completed connection to node -1. Fetching API versions. 19:29:52,902 DEBUG SaslClientAuthenticator:256 - Set SASL client state to SEND_HANDSHAKE_REQUEST 19:29:52,903 DEBUG SaslClientAuthenticator:256 - Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE 19:29:52,903 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Initialize connection to node 192.168.100.63:9093 (id: -2 rack: null) for sending metadata request 19:29:52,904 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Initiating connection to node 192.168.100.63:9093 (id: -2 rack: null) 19:29:52,905 DEBUG SaslClientAuthenticator:256 - Set SASL client state to SEND_APIVERSIONS_REQUEST 19:29:52,905 DEBUG SaslClientAuthenticator:150 - Creating SaslClient: client=null;service=kafka;serviceHostname=192.168.100.63;mechs=[PLAIN] 19:29:52,907 DEBUG SaslClientAuthenticator:256 - Set SASL client state to INITIAL 19:29:52,908 DEBUG SaslClientAuthenticator:256 - Set SASL client state to INTERMEDIATE 19:29:52,908 DEBUG Metrics:404 - Added sensor with name node--2.bytes-sent 19:29:52,909 DEBUG Metrics:404 - Added sensor with name node--2.bytes-received 19:29:52,911 DEBUG Metrics:404 - Added sensor with name node--2.latency 19:29:52,911 DEBUG Selector:195 - [Consumer clientId=consumer-1, groupId=test] Created socket with SO_RCVBUF = 66608, SO_SNDBUF = 131768, SO_TIMEOUT = 0 to node -2 19:29:52,912 DEBUG SaslClientAuthenticator:256 - Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE 19:29:52,912 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Completed connection to node -2. Fetching API versions. 19:29:52,913 DEBUG SaslClientAuthenticator:256 - Set SASL client state to FAILED 19:29:52,914 DEBUG Selector:189 - [Consumer clientId=consumer-1, groupId=test] Connection with 192.168.100.62/192.168.100.62 disconnected due to authentication exception org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password 19:29:52,922 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Node -1 disconnected. 19:29:52,924 ERROR NetworkClient:296 - [Consumer clientId=consumer-1, groupId=test] Connection to node -1 failed authentication due to: Authentication failed: Invalid username or password Exception in thread "main" org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。