当前位置:   article > 正文

kafka sasl_ssl配置

kafka sasl_ssl

一、切换到存储证书的路径

  1. 我这里在家目录中的创建了ssl文件夹
  2. mkdir ssl && cd ssl

二、生成服务端密钥库

keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey

验证证书:

keytool -list -v -keystore server.keystore.jks

三、创建CA 并将CA导入到truststore中

  1. 生成CA:
  2. openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
  3. 将CA导入到truststore中:
  4. keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
  5. keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

四、从KeyStore导出证书,并用上述步骤生成的CA来签名生成的证书:

  1. 导出证书:
  2. keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
  3. 签名:
  4. openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:ka123456

 五、将证书导入KeyStore

  1. keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
  2. keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

六、生成客户端密钥库

keytool -keystore client.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey

七、配置zookeeper认证:

1、zookeeper是homebrew安装kafka是安装,配置文件路径为:/usr/local/opt/kafka/libexec/config/,在此目录下创建zookeeper-server-jaas.conf配置文件,定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名="用户密码",内容如下:

  1. Server {
  2. org.apache.zookeeper.server.auth.DigestLoginModule required
  3. user_admin="qaz!@#QAZ"
  4. user_kafka="qaz123!@#QAZ";
  5. };

2、zookeeper配置文件zookeeper.properties中新增SASL认证配置,如下:

  1. dataDir=/usr/local/var/lib/zookeeper
  2. # the port at which the clients will connect
  3. clientPort=2181
  4. # disable the per-ip limit on the number of connections since this is a non-production config
  5. maxClientCnxns=0
  6. # Disable the adminserver by default to avoid port conflicts.
  7. # Set the port to something non-conflicting if choosing to enable this
  8. admin.enableServer=false
  9. # admin.serverPort=8080
  10. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  11. requireClientAuthScheme=sasl
  12. jaasLoginRenew=3600000

3、在/usr/local/opt/zookeeper/libexec/bin/zkEnv.sh脚本中新增jvm参数,让其启动时加载jaas配置文件

  1. export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/usr/local/opt/kafka/libexec/config/zookeeper-server-jaas.conf"
  2. # default heap for zookeeper client
  3. ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-256}"
  4. export CLIENT_JVMFLAGS="-Xmx${ZK_CLIENT_HEAP}m $CLIENT_JVMFLAGS"

八、配置kafka安全认证:

1、在/usr/local/opt/kafka/libexec/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:

  1. kafka-server-jaas.conf:
  2. KafkaServer {
  3. org.apache.kafka.common.security.plain.PlainLoginModule required
  4. username="admin"
  5. password="qazQAZ123!@#"
  6. user_admin="qazQAZ123!@#"
  7. user_kafka="qaz!@#123";
  8. };
  9. Client {
  10. org.apache.kafka.common.security.plain.PlainLoginModule required
  11. username="kafka"
  12. password="qaz123!@#QAZ";
  13. };
  14. kafka-client-jaas.conf:
  15. KafkaClient {
  16. org.apache.kafka.common.security.plain.PlainLoginModule required
  17. username="kafka"
  18. password="qaz!@#123";
  19. };

2、在kafka启动脚本(/usr/local/opt/kafka/libexec/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:

-Djava.security.auth.login.config=/usr/local/Cellar/kafka/3.2.0/libexec/config/kafka-server-jaas.conf

3、修改kafka配置文件:/usr/local/opt/kafka/libexec/config/server.properties

九、java端代码

将步骤三生成的客户信任库client.truststore.jks复制到到java项目中,以便client可以信任这个CA

生产者:

  1. // 1. 创建 kafka 生产者的配置对象
  2. Properties props = new Properties();
  3. // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
  4. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  5. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  6. props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_SSL");
  7. props.put(SaslConfigs.SASL_MECHANISM,"PLAIN"); props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka' password='qaz!@#123';");
  8. props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,""); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,DEFAULT_BASE_CONF_PATH+"client.truststore.jks");
  9. props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,"ka123456");
  10. // 3. 创建 kafka 生产者对象
  11. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  12. System.out.println("开始发送数据");
  13. String msg = "Hello World";
  14. // 4. 调用 send 方法,发送消息
  15. producer.send(new ProducerRecord<String, String>("topic1","value ",msg));
  16. // 5. 关闭资源
  17. producer.close();

消费者:

  1. String filePath = file.getAbsolutePath();
  2. // 1.创建消费者的配置对象
  3. Properties props = new Properties();
  4. // 2.给消费者配置对象添加参数
  5. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093");
  6. // 配置序列化 必须
  7. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  8. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  9. // 配置消费者组(组名任意起名) 必须
  10. props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1");
  11. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
  12. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //关闭自动提交
  13. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);
  14. props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_SSL");
  15. props.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
  16. props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka' password='qaz!@#123';");
  17. props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,""); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,filePath+"/conf/client.truststore.jks");
  18. props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,"ka123456");
  19. // 创建消费者对象
  20. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  21. // 注册要消费的主题(可以消费多个主题)
  22. ArrayList<String> topics = new ArrayList<>();
  23. topics.add("topic1");
  24. consumer.subscribe(topics);
  25. System.out.println("准备接收数据.........");
  26. // 拉取数据打印
  27. while (true) {
  28. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
  29. for(ConsumerRecord<String,String> record : records){
  30. String value = record.value();
  31. System.out.println(value);
  32. }
  33. if(records.count() > 0) {
  34. //手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
  35. consumer.commitAsync(new OffsetCommitCallback() {
  36. @Override
  37. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  38. if (exception != null) {
  39. System.err.println("Commit falide for" + offsets);
  40. System.err.println("Commit falide exception" + exception.getStackTrace());
  41. }
  42. }
  43. });
  44. }
  45. }

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号