当前位置:   article > 正文

Kafka SSL 和 ACL 配置_ssl.truststore.location

ssl.truststore.location

很久没写文章了,之所以写这篇文章是想其他同学少走弯路,因为我在进行配置的时候发现google及百度没有一篇像样的文章。官方doc说的又不是很清楚,所以比较蛋疼,最终还是折腾出来了。


Kafka SSL 配置
大家先可以去看官方doc:

我感觉比较误导人。。

首先来看看整个集群的架构

Kafka1
Kafka2
Kafka3
192.168.56.100
192.168.56.101
192.168.56.102
Zookeeper
Zookeeper
Zookeeper
Kafka broker 100
Kafka broker 101
Kafkabroker 102

集群共三个节点如上述所示


配置步骤如下:
zookeeper与kafka的安装部署我这里就不说了,主要说SSL配置

1、配置三个节点的server.properties 文件
  1. [root@kafka1 kafka-0.9.0.1]# cat config/server.properties
  2. broker.id=100
  3. #port=9020
  4. port=9093
  5. host.name=192.168.56.100
  6. advertised.host.name=192.168.56.100
  7. zookeeper.connect=192.168.56.100:2181,192.168.56.101:2181,192.168.56.102:2181/kafka91
  8. allow.everyone.if.no.acl.found=true
  9. #allow.everyone.if.no.acl.found=false
  10. #super.users=User:Bob;User:Alice
  11. super.users=User:CN=kafka1,OU=test,O=test,L=test,ST=test,C=test
  12. #listeners=PLAINTEXT://192.168.56.100:9020, SSL://192.168.56.100:9093
  13. #advertised.listeners=PLAINTEXT://192.168.56.100:9020, SSL://192.168.56.100:9093
  14. listeners=SSL://192.168.56.100:9093
  15. advertised.listeners=SSL://192.168.56.100:9093
  16. ssl.keystore.location=/root/kafka1/kafka.server.keystore.jks
  17. ssl.keystore.password=zbdba94
  18. ssl.key.password=zbdba94
  19. ssl.truststore.location=/root/kafka1/kafka.server.truststore.jks
  20. ssl.truststore.password=zbdba94
  21. ssl.client.auth=required
  22. ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
  23. ssl.keystore.type=JKS
  24. ssl.truststore.type=JKS
  25. security.inter.broker.protocol=SSL
  26. #zookeeper.set.acl=true
  27. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  28. principal.builder.class=org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
  29. # Replication configurations
  30. num.replica.fetchers=4
  31. replica.fetch.max.bytes=1048576
  32. replica.fetch.wait.max.ms=500
  33. replica.high.watermark.checkpoint.interval.ms=5000
  34. replica.socket.timeout.ms=30000
  35. replica.socket.receive.buffer.bytes=65536
  36. replica.lag.time.max.ms=10000
  37. controller.socket.timeout.ms=30000
  38. controller.message.queue.size=10
  39. default.replication.factor=3
  40. # Log configuration
  41. log.dir=/data1/kafka-0.9.0.1/data
  42. kafka.logs.dir=logs
  43. num.partitions=20
  44. message.max.bytes=1000000
  45. auto.create.topics.enable=true
  46. log.index.interval.bytes=4096
  47. log.index.size.max.bytes=10485760
  48. log.retention.hours=720
  49. log.flush.interval.ms=10000
  50. log.flush.interval.messages=20000
  51. log.flush.scheduler.interval.ms=2000
  52. log.roll.hours=168
  53. log.retention.check.interval.ms=300000
  54. log.segment.bytes=1073741824
  55. delete.topic.enable=true
  56. # ZK configuration
  57. zookeeper.connection.timeout.ms=6000
  58. zookeeper.sync.time.ms=2000
  59. # Socket server configuration
  60. num.io.threads=8
  61. num.network.threads=8
  62. socket.request.max.bytes=104857600
  63. socket.receive.buffer.bytes=1048576
  64. socket.send.buffer.bytes=1048576
  65. queued.max.requests=16
  66. fetch.purgatory.purge.interval.requests=100
  67. producer.purgatory.purge.interval.requests=100
2、在kafka1节点上面生成certificate和ca文件

  1. #!/bin/bash
  2. name=$HOSTNAME
  3. folder=securityDemo
  4. cd /root
  5. rm -rf $folder
  6. mkdir $folder
  7. cd $folder
  8. printf "zbdba94\nzbdba94\kafka1\ntest\ntest\ntest\ntest\ntest\nyes\n\n" | keytool -keystore kafka.server.keystore.jks -alias $name -validity 365 -genkey
  9. printf "te\ntest\ntest\ntest\ntest\kafka1\nasdf@test.com\n" | openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -passout pass:zbdba94
  10. echo "done"
  11. printf "zbdba94\nzbdba94\nyes\n" | keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
  12. printf "zbdba94\nzbdba94\nyes\n" | keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
  13. printf "zbdba94\n" | keytool -keystore kafka.server.keystore.jks -alias $name -certreq -file cert-file
  14. openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:zbdba94
  15. printf "zbdba94\nyes\n" | keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  16. printf "zbdba94\nyes\n" | keytool -keystore kafka.server.keystore.jks -alias $name -import -file cert-signed
  17. #producer.propeties
  18. rm -rf producer.properties
  19. printf $PWD
  20. echo "bootstrap.servers=$name:9093" >> producer.properties
  21. echo "security.protocol=SSL" >> producer.properties
  22. echo "ssl.truststore.location=$PWD/kafka.client.truststore.jks">> producer.properties
  23. echo "ssl.truststore.password=zbdba94">> producer.properties
  24. echo "ssl.keystore.location=$PWD/kafka.server.keystore.jks">> producer.properties
  25. echo "ssl.keystore.password=zbdba94">> producer.properties
  26. echo "ssl.key.password=zbdba94">> producer.properties
注意将kafka1换成你机器的host

3、在kafka2机器上生成客户端certificate并采用kafka1生成的ca文件进行标识
执行以下脚本

client1.sh

  1. #!/bin/bash
  2. name=$HOSTNAME
  3. cd /root
  4. dirname=securityDemo
  5. rm -rf $dirname
  6. mkdir $dirname
  7. cd $dirname
  8. printf "zbdba94\nzbdba94\n$name\ntest\ntest\ntest\ntest\ntest\nyes\n\n" | keytool -keystore kafka.client.keystore.jks -alias $name -validity 36 -genkey
  9. printf "zbdba94\nzbdba94\nyes\n" |keytool -keystore kafka.client.keystore.jks -alias $name -certreq -file cert-file
  10. cp cert-file cert-file-$name

在kafka2节点上将kafka1生成的文件拷贝过来

cd /root/ && scp -r kafka1:/root/securityDemo /root/kafka1

然后执行以下脚本
client2.sh
  1. #!/bin/bash
  2. name=$HOSTNAME
  3. cd /root
  4. openssl x509 -req -CA /root/kafka1/ca-cert -CAkey /root/kafka1/ca-key -in /root/securityDemo/cert-file-$name -out /root/securityDemo/cert-signed-$name -days 36 -CAcreateserial -passin pass:zbdba94
然后执行以下脚本
client3.sh
  1. #!/bin/sh
  2. name=$HOSTNAME
  3. cd /root/securityDemo
  4. printf "zbdba94\nyes\n" | keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file /root/kafka1/ca-cert
  5. printf "zbdba94\nyes\n" | keytool -keystore kafka.client.keystore.jks -alias $name -import -file /root/securityDemo/cert-signed-$name
  6. #producer.propeties
  7. rm -rf producer.properties
  8. printf $PWD
  9. echo "bootstrap.servers=localhost:9093" >> producer.properties
  10. echo "security.protocol=SSL" >> producer.properties
  11. echo "ssl.truststore.location=$PWD/kafka.client.keystore.jks">> producer.properties
  12. echo "ssl.truststore.password=zbdba94">> producer.properties
  13. echo "ssl.keystore.location=$PWD/kafka.client.keystore.jks">> producer.properties
  14. echo "ssl.keystore.password=zbdba94">> producer.properties
  15. echo "ssl.key.password=zbdba94">> producer.properties
同理kafka3节点安装kafka2节点进行配置


4、启动集群
启动集群logfile中打印如下日志:
三个节点分别打印

  1. INFO Registered broker 100 at path /brokers/ids/100 with addresses: SSL -> EndPoint(192.168.56.100,9093,SSL) (kafka.utils.ZkUtils)
  2. INFO Registered broker 101 at path /brokers/ids/101 with addresses: SSL -> EndPoint(192.168.56.101,9093,SSL) (kafka.utils.ZkUtils)
  3. INFO Registered broker 102 at path /brokers/ids/102 with addresses: SSL -> EndPoint(192.168.56.102,9093,SSL) (kafka.utils.ZkUtils)

然后进行验证

可以按照官方给的demo验证
To check quickly if the server keystore and truststore are setup properly you can run the following command
openssl s_client -debug -connect localhost:9093 -tls1
(Note: TLSv1 should be listed under ssl.enabled.protocols)
In the output of this command you should see server's certificate:
  -----BEGIN CERTIFICATE-----
  {variable sized random bytes}
  -----END CERTIFICATE-----
  subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
  issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com


开始写入和消费验证
在kafka1写入消息:

/data1/kafka-0.9.0.1/bin/kafka-console-producer.sh --broker-list kafka1:9093 --topic jingbotest5 --producer.config /root/securityDemo/producer.properties
在kafka2消费消息:

/data1/kafka-0.9.0.1/bin/kafka-console-consumer.sh --bootstrap-server kafka2:9093 --topic jingbotest5 --new-consumer --consumer.config /root/securityDemo/producer.properties

如果可以正常消费则没有问题

下面看看Java client如何连接
这里只给出简单的demo,主要展示如何连接

  1. package com.jingbo.test;
  2. import java.util.Properties;
  3. import org.apache.kafka.clients.CommonClientConfigs;
  4. import org.apache.kafka.clients.producer.KafkaProducer;
  5. import org.apache.kafka.clients.producer.ProducerConfig;
  6. import org.apache.kafka.clients.producer.ProducerRecord;
  7. import org.apache.kafka.common.config.SslConfigs;
  8. public class ProducerZbdba {
  9. public static void main(String[] args) {
  10. Properties props = new Properties();
  11. Properties producerProps = new Properties();
  12. producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.102:9093");
  13. producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "myApiKey");
  14. producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "C:/Users/zbdba/Downloads/kafka.client.keystore.jks");
  15. producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "zbdba94");
  16. producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "C:/Users/zbdba/Downloads/kafka.client.keystore.jks");
  17. producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "zbdba94");
  18. producerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
  19. producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
  20. producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  21. producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  22. KafkaProducer producer = new KafkaProducer(producerProps);
  23. for(int i = 0; i < 100; i++)
  24. producer.send(new ProducerRecord<String, String>("jingbotest5", Integer.toString(i), Integer.toString(i)));
  25. System.out.println("test");
  26. producer.close();
  27. }
  28. }

kafka.client.keystore.jks可以从任意一个节点拷贝下来。

之前在102 的消费者可以一直开着,这是写入看那边能否消费到。如果可以正常消费,那么表示SSL已经配置成功了。

Kafka ACL 配置

本来想单独开一篇文章的,但是感觉比较简单就没有必要,那为什么要说这个呢,是因为还是 有点坑的。
大家可以先参考官方的doc:

我按照配置,最后出现了如下错误:

  1. [2016-09-05 06:32:35,144] ERROR [KafkaApi-100] error when handling request Name:UpdateMetadataRequest;Version:1;Controller:100;ControllerEpoch:39;CorrelationId:116;ClientId:100;AliveBrokers:102 : (EndPoint(192.168.56.102,9093,SSL)),101 : (EndPoint(192.168.56.101,9093,SSL)),100 : (EndPoint(192.168.56.100,9093,SSL));PartitionState:[jingbotest5,2] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:42,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,101,100),[jingbotest5,5] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:42,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,100,101),[jingbotest5,8] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:40,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,101,100) (kafka.server.KafkaApis)
  2. kafka.common.ClusterAuthorizationException: Request Request(1,192.168.56.100:9093-192.168.56.100:43909,Session(User:CN=zbdba2,OU=test,O=test,L=test,ST=test,C=test,zbdba2/192.168.56.100),null,1473071555140,SSL) is not authorized.
  3. at kafka.server.KafkaApis.authorizeClusterAction(KafkaApis.scala:910)
  4. at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:158)
  5. at kafka.server.KafkaApis.handle(KafkaApis.scala:74)
  6. at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
  7. at java.lang.Thread.run(Thread.java:744)
  8. [2016-09-05 06:32:35,310] ERROR [ReplicaFetcherThread-2-101], Error for partition [jingbotest5,4] to broker 101:org.apache.kafka.common.errors.AuthorizationException: Topic authorization failed. (kafka.server.ReplicaFetcherThread)

官方的doc说配置了:
principal.builder.class=CustomizedPrincipalBuilderClass
就可以为SSL用户进行ACL验证,但是CustomizedPrincipalBuilderClass已经过时,搜索doc的时候发现已经变更为:class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
于是开开心心拿去配置上,然而启动错误。根据日志发现其实不能用class的,也就是org.apache.kafka.common.security.auth.DefaultPrincipalBuilder

所以对热爱看官方doc的人遇到kafka还是比较蛋疼的。

最终起来了,但是还是依然报以上的错误。看到这篇文章的人就踩不到坑了,因为上面我已经帮你配置好了。
super.users=User:CN=kafka1,OU=test,O=test,L=test,ST=test,C=test
直接将SSL用户设置为superuser

这时候ACL就可以正常的跑起来了。
Kafka 的 SSL 和ACL 感觉整合起来可以实现一套完整的权限控制,但是不知道真正运行起来是否有坑,对于性能影响方面大家可以去参考slideshare上面的一个ppt

当然你可以自行压力测试,根据ppt上所示,性能会有30%左右的损耗。



作者也咨询了各大厂商,用的人比较少。还有的准备要上。我们也在考虑是否要上,业务需求比较大。

以上的脚本作者整理一下并且放入到了github中:

https://github.com/zbdba/Kafka-SSL-config


参考链接:

https://github.com/confluentinc/securing-kafka-blog 这里面通过Vagrant整合全自动配置













声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/983216
推荐阅读
相关标签
  

闽ICP备14008679号