赞
踩
很久没写文章了,之所以写这篇文章是想其他同学少走弯路,因为我在进行配置的时候发现google及百度没有一篇像样的文章。官方doc说的又不是很清楚,所以比较蛋疼,最终还是折腾出来了。
我感觉比较误导人。。
首先来看看整个集群的架构
Kafka1
|
Kafka2
|
Kafka3
|
192.168.56.100
|
192.168.56.101
|
192.168.56.102
|
Zookeeper
|
Zookeeper
|
Zookeeper
|
Kafka broker 100
|
Kafka broker 101
|
Kafkabroker 102
|
集群共三个节点如上述所示
2、在kafka1节点上面生成certificate和ca文件
[root@kafka1 kafka-0.9.0.1]# cat config/server.properties broker.id=100 #port=9020 port=9093 host.name=192.168.56.100 advertised.host.name=192.168.56.100 zookeeper.connect=192.168.56.100:2181,192.168.56.101:2181,192.168.56.102:2181/kafka91 allow.everyone.if.no.acl.found=true #allow.everyone.if.no.acl.found=false #super.users=User:Bob;User:Alice super.users=User:CN=kafka1,OU=test,O=test,L=test,ST=test,C=test #listeners=PLAINTEXT://192.168.56.100:9020, SSL://192.168.56.100:9093 #advertised.listeners=PLAINTEXT://192.168.56.100:9020, SSL://192.168.56.100:9093 listeners=SSL://192.168.56.100:9093 advertised.listeners=SSL://192.168.56.100:9093 ssl.keystore.location=/root/kafka1/kafka.server.keystore.jks ssl.keystore.password=zbdba94 ssl.key.password=zbdba94 ssl.truststore.location=/root/kafka1/kafka.server.truststore.jks ssl.truststore.password=zbdba94 ssl.client.auth=required ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS security.inter.broker.protocol=SSL #zookeeper.set.acl=true authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer principal.builder.class=org.apache.kafka.common.security.auth.DefaultPrincipalBuilder # Replication configurations num.replica.fetchers=4 replica.fetch.max.bytes=1048576 replica.fetch.wait.max.ms=500 replica.high.watermark.checkpoint.interval.ms=5000 replica.socket.timeout.ms=30000 replica.socket.receive.buffer.bytes=65536 replica.lag.time.max.ms=10000 controller.socket.timeout.ms=30000 controller.message.queue.size=10 default.replication.factor=3 # Log configuration log.dir=/data1/kafka-0.9.0.1/data kafka.logs.dir=logs num.partitions=20 message.max.bytes=1000000 auto.create.topics.enable=true log.index.interval.bytes=4096 log.index.size.max.bytes=10485760 log.retention.hours=720 log.flush.interval.ms=10000 log.flush.interval.messages=20000 log.flush.scheduler.interval.ms=2000 log.roll.hours=168 log.retention.check.interval.ms=300000 log.segment.bytes=1073741824 delete.topic.enable=true # ZK configuration zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 # Socket server configuration num.io.threads=8 num.network.threads=8 socket.request.max.bytes=104857600 socket.receive.buffer.bytes=1048576 socket.send.buffer.bytes=1048576 queued.max.requests=16 fetch.purgatory.purge.interval.requests=100 producer.purgatory.purge.interval.requests=100
注意将kafka1换成你机器的host
#!/bin/bash name=$HOSTNAME folder=securityDemo cd /root rm -rf $folder mkdir $folder cd $folder printf "zbdba94\nzbdba94\kafka1\ntest\ntest\ntest\ntest\ntest\nyes\n\n" | keytool -keystore kafka.server.keystore.jks -alias $name -validity 365 -genkey printf "te\ntest\ntest\ntest\ntest\kafka1\nasdf@test.com\n" | openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -passout pass:zbdba94 echo "done" printf "zbdba94\nzbdba94\nyes\n" | keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert printf "zbdba94\nzbdba94\nyes\n" | keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert printf "zbdba94\n" | keytool -keystore kafka.server.keystore.jks -alias $name -certreq -file cert-file openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:zbdba94 printf "zbdba94\nyes\n" | keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert printf "zbdba94\nyes\n" | keytool -keystore kafka.server.keystore.jks -alias $name -import -file cert-signed #producer.propeties rm -rf producer.properties printf $PWD echo "bootstrap.servers=$name:9093" >> producer.properties echo "security.protocol=SSL" >> producer.properties echo "ssl.truststore.location=$PWD/kafka.client.truststore.jks">> producer.properties echo "ssl.truststore.password=zbdba94">> producer.properties echo "ssl.keystore.location=$PWD/kafka.server.keystore.jks">> producer.properties echo "ssl.keystore.password=zbdba94">> producer.properties echo "ssl.key.password=zbdba94">> producer.properties
client1.sh
- #!/bin/bash
- name=$HOSTNAME
- cd /root
- dirname=securityDemo
- rm -rf $dirname
- mkdir $dirname
- cd $dirname
-
- printf "zbdba94\nzbdba94\n$name\ntest\ntest\ntest\ntest\ntest\nyes\n\n" | keytool -keystore kafka.client.keystore.jks -alias $name -validity 36 -genkey
- printf "zbdba94\nzbdba94\nyes\n" |keytool -keystore kafka.client.keystore.jks -alias $name -certreq -file cert-file
-
- cp cert-file cert-file-$name
- #!/bin/bash
- name=$HOSTNAME
- cd /root
- openssl x509 -req -CA /root/kafka1/ca-cert -CAkey /root/kafka1/ca-key -in /root/securityDemo/cert-file-$name -out /root/securityDemo/cert-signed-$name -days 36 -CAcreateserial -passin pass:zbdba94
同理kafka3节点安装kafka2节点进行配置
#!/bin/sh name=$HOSTNAME cd /root/securityDemo printf "zbdba94\nyes\n" | keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file /root/kafka1/ca-cert printf "zbdba94\nyes\n" | keytool -keystore kafka.client.keystore.jks -alias $name -import -file /root/securityDemo/cert-signed-$name #producer.propeties rm -rf producer.properties printf $PWD echo "bootstrap.servers=localhost:9093" >> producer.properties echo "security.protocol=SSL" >> producer.properties echo "ssl.truststore.location=$PWD/kafka.client.keystore.jks">> producer.properties echo "ssl.truststore.password=zbdba94">> producer.properties echo "ssl.keystore.location=$PWD/kafka.client.keystore.jks">> producer.properties echo "ssl.keystore.password=zbdba94">> producer.properties echo "ssl.key.password=zbdba94">> producer.properties
- INFO Registered broker 100 at path /brokers/ids/100 with addresses: SSL -> EndPoint(192.168.56.100,9093,SSL) (kafka.utils.ZkUtils)
- INFO Registered broker 101 at path /brokers/ids/101 with addresses: SSL -> EndPoint(192.168.56.101,9093,SSL) (kafka.utils.ZkUtils)
- INFO Registered broker 102 at path /brokers/ids/102 with addresses: SSL -> EndPoint(192.168.56.102,9093,SSL) (kafka.utils.ZkUtils)
/data1/kafka-0.9.0.1/bin/kafka-console-producer.sh --broker-list kafka1:9093 --topic jingbotest5 --producer.config /root/securityDemo/producer.properties
在kafka2消费消息:
/data1/kafka-0.9.0.1/bin/kafka-console-consumer.sh --bootstrap-server kafka2:9093 --topic jingbotest5 --new-consumer --consumer.config /root/securityDemo/producer.properties
package com.jingbo.test; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.config.SslConfigs; public class ProducerZbdba { public static void main(String[] args) { Properties props = new Properties(); Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.102:9093"); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "myApiKey"); producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "C:/Users/zbdba/Downloads/kafka.client.keystore.jks"); producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "zbdba94"); producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "C:/Users/zbdba/Downloads/kafka.client.keystore.jks"); producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "zbdba94"); producerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS"); producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer producer = new KafkaProducer(producerProps); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("jingbotest5", Integer.toString(i), Integer.toString(i))); System.out.println("test"); producer.close(); } }
我按照配置,最后出现了如下错误:
- [2016-09-05 06:32:35,144] ERROR [KafkaApi-100] error when handling request Name:UpdateMetadataRequest;Version:1;Controller:100;ControllerEpoch:39;CorrelationId:116;ClientId:100;AliveBrokers:102 : (EndPoint(192.168.56.102,9093,SSL)),101 : (EndPoint(192.168.56.101,9093,SSL)),100 : (EndPoint(192.168.56.100,9093,SSL));PartitionState:[jingbotest5,2] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:42,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,101,100),[jingbotest5,5] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:42,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,100,101),[jingbotest5,8] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:40,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,101,100) (kafka.server.KafkaApis)
- kafka.common.ClusterAuthorizationException: Request Request(1,192.168.56.100:9093-192.168.56.100:43909,Session(User:CN=zbdba2,OU=test,O=test,L=test,ST=test,C=test,zbdba2/192.168.56.100),null,1473071555140,SSL) is not authorized.
- at kafka.server.KafkaApis.authorizeClusterAction(KafkaApis.scala:910)
- at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:158)
- at kafka.server.KafkaApis.handle(KafkaApis.scala:74)
- at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
- at java.lang.Thread.run(Thread.java:744)
- [2016-09-05 06:32:35,310] ERROR [ReplicaFetcherThread-2-101], Error for partition [jingbotest5,4] to broker 101:org.apache.kafka.common.errors.AuthorizationException: Topic authorization failed. (kafka.server.ReplicaFetcherThread)
作者也咨询了各大厂商,用的人比较少。还有的准备要上。我们也在考虑是否要上,业务需求比较大。
以上的脚本作者整理一下并且放入到了github中:
https://github.com/zbdba/Kafka-SSL-config
参考链接:
https://github.com/confluentinc/securing-kafka-blog 这里面通过Vagrant整合全自动配置
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。