赞
踩
目录
Jaas(Java Authentication and Authorization Service )文件
spring-cloud-starter-bus-kafka 集成
下载统一版本Kafka服务包至三台不同的服务器上
文章使用版本为 kafka_2.13-3.5.0.tgz 下载地址
jdk版本为 Adopt JDK-17 OpenJDK17U-jdk_x64_linux_hotspot_17.0.7_7.tar.gz 下载地址
在kafka包解压目录下的 config 目录下新建zookeeper认证所需jaas文件,文件名随意,以 .conf 结尾即可
文件内容如下
user_{username}为固定写法 {username} 为用户名 密码为双引号内容
注意,这里zookeeper的jaas有三个用户名和密码分别对应着三台kafka broker去认证时使用的用户名和密码,每一台上的zookeeper的jaas文件内容建议完全相同
- Server {
- org.apache.zookeeper.server.auth.DigestLoginModule required
- user_super="super-sec"
- user_kafkabroker1="kafkabroker1-sec"
- user_kafkabroker2="kafkabroker2-sec"
- user_kafkabroker3="kafkabroker3-sec";
- };
在相同目录下建立kafka所需认证jaas文件
以下是三台服务器中其中一台的kafka jaas认证文件内容,Client内容为本台机器上的broker认证本台机器上的zookeeper的用户名和密码 ( 注意最后一行和倒数第二行需要有分号!! ) KafkaServer端有一对 username="kbroker1" password="kbroker1-sec" 是内部brokers之间进行认证所用账号密码但是本文内部broker配置为ssl链接,去掉应该也没事若不同则加一下
当然每个broker的KafkaServer段也需要有定义这个用户名和密码( 对应 user_kbroker1="kbroker1-sec" ) user_client="client-sec" 为外部客户端认证时所需用户名密码
这里为了方便,全部brokers共享一个账号,客户端user_client(也就是连接Kakfa时的producer、consumer或者编程语言SDK读取或配置客户端jaas文件时)也为统一用户名密码
- KafkaServer {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="kbroker1"
- password="kbroker1-sec"
- user_kbroker1="kbroker1-sec"
- user_client="client-sec";
- };
-
- Client {
- org.apache.zookeeper.server.auth.DigestLoginModule required
- username="kafkabroker3"
- password="kafkabroker3-sec";
- };
另外两台用户名密码如下
- KafkaServer {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="kbroker1"
- password="kbroker1-sec"
- user_kbroker1="kbroker1-sec"
- user_client="client-sec";
- };
-
- Client {
- org.apache.zookeeper.server.auth.DigestLoginModule required
- username="kafkabroker1"
- password="kafkabroker1-sec";
- };
- KafkaServer {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="kbroker1"
- password="kbroker1-sec"
- user_kbroker1="kbroker1-sec"
- user_client="client-sec";
- };
-
- Client {
- org.apache.zookeeper.server.auth.DigestLoginModule required
- username="kafkabroker2"
- password="kafkabroker2-sec";
- };
同样是在config目录下编辑zookeeper.properties文件
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # the directory where the snapshot is stored.
- dataDir=/opt/kafka/zookeeper-dir
- dataLogDir=/opt/kafka/zookeeper-log
- # the port at which the clients will connect
- clientPort=2181
- # disable the per-ip limit on the number of connections since this is a non-production config
- # 一个ip最多可以对这个zookeeper服务进行连接的数量
- maxClientCnxns=5
- # Disable the adminserver by default to avoid port conflicts.
- # Set the port to something non-conflicting if choosing to enable this
- admin.enableServer=false
- # admin.serverPort=8080
- tickTime=2000
- initLimit=5
- syncLimit=2
- server.1=1.1.1.1:2182:1999
- server.2=2.2.2.2:2182:1999
- server.3=3.3.3.3:2182:1999
-
- # security
- # 开启zookeeper sasl认证必须配置
- authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
- maxClientCnxns=5
- # 这里可以设置为false 同样设置zookeeper jaas认证也无效了
- sessionRequireClientSASLAuth=true
- jaasLoginRenew=360000000
注意 clientPort与 <外网ip>:<内部互联连端口>:<选举专用端口> 这些端口要区分开来 不然zookeeper服务启动会报错,三台配置基本一直
注意 server.<int>=<外网ip> 若是连接本机有问题,可以将<外网ip>换成0.0.0.0
zookeeper启动脚本如下
- #!/bin/bash
-
- export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka-server/config/zk_jaas.conf"
- nohup /opt/kafka/kafka-server/bin/zookeeper-server-start.sh /opt/kafka/kafka-server/config/zookeeper.properties > kafka-zookeeper-start.log 2>&1 &
使用export 导出KAFKA_OPTS中的变量,让zookeeper启动时加载jaas认证文件,参数key为
-Djava.security.auth.login.config
nohup 可以让zookeeper在后台运行,不占用终端,滚动日志可以在 kafka-zookeeper-start.log 文件查看,若想滚动查看日志可以用
tail -f kafka-zookeeper-start.log
这个地方卡住很久,遇到了些bug都是关于自签证书的问题,文章用下图说明SSL证书在kafka中的使用流程 每个keystore包含数字签名和证书
每台机器上都应该有自己的keystore与truststore文件,证书可以每台上都使用openssl生成一张,但是,要把每台机器上的证书必须互相导入到其他borkers的truststore与keystore中,而且每台机器上的keystore还需要多次导入所有证书签名之后生成的证书与数字签名。不然在brokers互相创建SSL隧道时会有各种问题,例如下图,将broker1机器上生成的kafka.client.truststore.jks直接 scp 传输到到broker2 后使用,broker1 与 broker2建立SSL隧道时,kafka config 目录下log4j.properties修改TRACE级别日志记录如下
参考
很多网站上面只是做单台机器或者单个证书的全部生成过程,这里记录下自己的创建流程
注意,全局只生成了一次CA ,仅包含一个 ca-cert 与 ca-key
参考如下
部署SSL 创建密钥与证书,创建自签名的颁发机构,证书签名
文件含义
keystore 可以存储私钥、证书和对称密钥的存储库。
引用stackoverflow的回答
ca-cert 具体证书
ca-key 证书私钥
ca-password 颁发机构密钥
cert-file 导出未签名的证书文件
cert-signed 带有数字签名的证书
首先在每个机器上面都要创建keystore密钥库
keytool命令无效可以去JAVA_HOME/bin目录下找
SSL hostname校验可通过两种方式配置
在kafka的配置文件中添加以下配置取消校验
ssl.endpoint.identification.algorithm=
或配置CN与SAN分别为hostname与FQDN什么是FQDN
文章采用前者,忽略SSL对hostname的认证并按照SAN格式创建keystore
keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey -storepass {keystore-pass} -keypass {key-pass} -dname {distinguished-name} -ext SAN=DNS:{hostname}
-alias 后面用 hostname,localhost与hostname都可以
{validity} 为过期时间 自签可以长一点 例 9999
{keystore-pass} 与 {key-pass} 为密码,建议设为同一个值
-ext SAN=DNS:{hostname} 注意,必须为hostname (终端 键入hostname查看)
这里my-host-name可用localhost代替 或者用VPS云服务器专属的hostname
kafka.ser.keystore.jks生成结束
创建CA证书
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
CN也要写hostname或直接用localhost
此时一共生成三个文件
注意!copy ca-cert 与 ca-key 文件到所有kafka broker机器上,(若是想在其他机器上连接也要把这两个文件拷贝过去,例如本地开发集成spring boot时),并放在固定位置
ca-cert 与 ca-key 代表一张CA
导入 ca-cert 到所有brokers的kafka.server.truststore.jks中,终端交互输入 yes
keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
原文中多一部将kafka.server.keystore.jks密钥库的证书导出才签名
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
证书签名
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:123456
此时一共生成六个文件
向密钥库 kafka.server.keystore.jks 导入证书与数字签名
- keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert
- keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed
查看kafka.server.keystore.jks包含的内容
keytool --list -v -keystore kafka.server.keystore.jks
全部命令如下,思路就是全局生成一张CA (包含ca-cert ca-key)
- keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey
- openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}
- keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert
- keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
- keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
- openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
- keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert
- keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed
现在全部文件如下(忽略 auto-create-kafka-ssl-keys.sh 这个是自动生成证书的脚本github连接 与 备份压缩包demo.tar)
编辑kafka服务配置文件 server.properties
advertised.listeners要和listeners对应,
advertised.listeners概述
advertised.listeners不要绑定0.0.0.0端口,这里配置SSL为内部访问所以使用云服务器的hostname, brokers之间如何根据彼此的hostname来寻找呢?Linux可编辑/etc/hosts文件
末尾加上 <ip>:<port>添加dns映射 windows在C:\Windows\System32\drivers\etc目录下找hosts文件
-
- broker.id=1
-
- ############################# Socket Server Settings #############################
-
- listeners=SSL://:9093,SASL_SSL://:9094
- # 注意 这里SSL是做内部brokers通信用的,外部暴露方式为SASL_SSL
- advertised.listeners=SSL://Your-Host-name:9093,SASL_SSL://:9094
-
-
- log.retention.check.interval.ms=300000
-
- ############################# Zookeeper #############################
- zookeeper.connect=1.1.1.1:2182,2.2.2.2:2182,3.3.3.3:2182
- # Timeout in ms for connecting to zookeeper
- zookeeper.connection.timeout.ms=18000
- ############################# Kafka Security ###########################
- ssl.endpoint.identification.algorithm=
- security.inter.broker.protocol=SSL
- ssl.client.auth=required
- # ssl加密协议选择
- ssl.enabled.protocols=TLSv1.3,TLSv1.1,TLSv1
- # Broker security settings
- sasl.enabled.mechanisms=PLAIN
- #ssl.truststore.password=123456
- ssl.truststore.password=123456
- ssl.truststore.location=/opt/kafka/crkeys/kafka.server.truststore.jks
- ssl.keystore.location=/opt/kafka/crkeys/kafka.server.keystore.jks
- ssl.keystore.password=123456
- ssl.key.password=123456
- ############################# Group Coordinator Settings #############################
-
- group.initial.rebalance.delay.ms=0
编辑一个脚本分别启动每一个broker上的zookeeper
kafka-zookeeper-quick-start.sh*
- #!/bin/bash
- # 让jaas文件被zookeeper加载到运行时环境
- # KAFKA_OPTS为固定用法
- export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka-server/config/zk_jaas.conf"
- nohup /opt/kafka/kafka-server/bin/zookeeper-server-start.sh /opt/kafka/kafka-server/config/zookeeper.properties > kafka-zookeeper-start.log 2>&1 &
zookeeper集群启动结束
编写一个脚本启动每一个broker 优先启动的broker会作为主节点
kafka-server-quick-start.sh
- #!/bin/bash
- export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"
- export KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/kafka-server/config/kafka_server_jaas.conf
- nohup /opt/kafka/kafka-server/bin/kafka-server-start.sh /opt/kafka/kafka-server/config/server.properties > kafka-server-start.log 2>&1 &
- #/opt/kafka/kafka-server/bin/kafka-server-start.sh /opt/kafka/kafka-server/config/server.properties
任意报错可以修改config目录下的log4j.properties 将所有logger设置成trace查看
注意 设为trace之后 非kafka主节点会疯狂滚动一个controller就绪日志
以下为主节点
什么是kafka controller
全局只有一个broker节点的controller会生效,暂不深究
ssl 生效测试
本文Kafka配置为 TLSv1.3,TLSv1.1,TLSv1 可加入 TLSv1.2
具体协议版本会与jdk版本有关
openssl s_client --debug -connect <ip>:<port> -tls1 次处 Verify return code: 0 代表最低版本tls协议生效
openssl s_client --debug -connect <ip>:<port> -tls1_1
openssl s_client --debug -connect <ip>:<port> -tls1_2
openssl s_client --debug -connect <ip>:<port> -tls1_3
随便登上一台机器或者在开发本地在一个固定目录下创建公用配置文件
client_security.properties
- ssl.endpoint.identification.algorithm=
- #security.protocol=SSL
- security.protocol=SASL_SSL
- ssl.truststore.location=/opt/kafka/crkeys/kafka.server.truststore.jks
- ssl.truststore.password=123456
- ssl.keystore.location=/opt/kafka/crkeys/kafka.server.keystore.jks
- ssl.keystore.password=123456
- sasl.mechanism=PLAIN
- sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
- username="client" \
- password="client-sec";
编写一个脚本创建topic
脚本内容
./bin/kafka-topics.sh --create --topic demo-topic-1 --command-config /opt/kafka/kafka-server/config/client_security.properties --partitions 3 --replication-factor 3 --bootstreap-server your-hostname-1:9094,your-hostname-2:9094,your-hostname-3:9094
编写一个脚本测试producer和consumer
脚本内容
/opt/kafka/kafka-server/bin/kafka-console-producer.sh --bootstrap-server hostname-1:9094,hostname-2:9094,hostname-3:9094 --topic demo-topic-1 --producer.config /opt/kafka/kafka-server/config/client_security.properties
启动consumer
脚本内容
- #!/bin/bash
- /opt/kafka/kafka-server/bin/kafka-console-consumer.sh --bootstrap-server hostname-1:9094,hostname-2:9094,hostname-3:9094 --topic demo-topic-1 --consumer.config /opt/kafka/kafka-server/config/client_security.properties --from-beginning
这里consumer 添加了 --from-beginning 选项,会从头读取producer写入的数据
使用spring-cloud-dependencies-2021.0.8 版本 spring-boot-dependencies-2.7.13
spring-cloud-starter-bus-kafka 包含两个依赖
注意,本地生成keystore与truststore步骤与上面的生成步骤一致,需要把全局唯一的ca-cert ca-key拷贝到本地来生成keystore与truststore
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-stream-kafka</artifactId>
- <version>3.2.4</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-bus</artifactId>
- <version>3.1.2</version>
- <scope>compile</scope>
- </dependency>
- </dependencies>
application配置
- server.port=13001
- server.servlet.context-path=/liquid/configs-dev
-
- # Kafka
- spring.kafka.bootstrap-servers=1.1.1.1:9094,2.2.2.2:9094,3.3.3.3:9094
- spring.kafka.security.protocol=SASL_SSL
- spring.kafka.ssl.key-store-location=kafka.server.keystore.jks
- spring.kafka.ssl.key-store-password=123456
- spring.kafka.ssl.key-store-type=jks
- spring.kafka.ssl.trust-store-location=kafka.server.truststore.jks
- spring.kafka.ssl.trust-store-password=123456
- spring.kafka.ssl.trust-store-type=jks
- spring.kafka.retry.topic.attempts=3
-
-
- # Kafka stream
- spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
- spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=
- spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=classpath:kafka.server.keystore.jks
- spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=123456
- spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=classpath:kafka.server.keystore.jks
- spring.cloud.stream.kafka.binder.configuration.ssl.ssl.truststore.password=123456
- spring.cloud.stream.kafka.binder.brokers=1.1.1.1:9094,2.2.2.2:9094,3.3.3.3:9094
- spring.kafka.streams.replication-factor=1
- spring.cloud.stream.kafka.binder.replication-factor=1
- spring.cloud.stream.kafka.binder.auto-create-topics=false
- # spring cloud config
- spring.cloud.config.server.git.uri=https://github.com/spring-cloud-samples/config-repo
创建一个Java Base Configuration
- package com.liquid.config.center.configs;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.CommonClientConfigs;
- import org.apache.kafka.clients.admin.AdminClientConfig;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.common.config.SaslConfigs;
- import org.apache.kafka.common.security.plain.PlainLoginModule;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
- import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Primary;
- import org.springframework.kafka.core.*;
- import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
- import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
- import org.springframework.kafka.support.serializer.JsonDeserializer;
- import org.springframework.kafka.support.serializer.JsonSerializer;
-
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
-
-
- @Slf4j
- @Configuration
- public class LiquidKafkaConfiguration {
-
-
- @Value("${spring.kafka.bootstrap-servers}")
- public String bootstrapServers;
-
- @Bean
- public KafkaAdmin kafkaAdmin() {
- Map<String, Object> configs = new HashMap<>();
- configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-
- configs.put("security.protocol", "SASL_SSL");
- configs.put("sasl.mechanism", "PLAIN");
- configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
- "username=client" +
- "password=client-sec;");
- log.info(">>> Loading Kafka Admin With Jaas String end");
- return new KafkaAdmin(configs);
- }
-
- @Bean
- public ProducerFactory<Object, Object> producerFactory() {
- Map<String, Object> props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
-
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
- props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
- props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
- "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "client", "client-sec"
- ));
-
- return new DefaultKafkaProducerFactory<>(props);
- }
-
- @Bean
- public ConsumerFactory<Object, Object> consumerFactory() {
- Map<String, Object> props = new HashMap<>();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
- props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
- props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
- props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
-
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
- props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
- props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
- "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "client", "client-sec"
- ));
-
- return new DefaultKafkaConsumerFactory<>(props);
- }
-
- @Bean("JaasLoginModuleConfiguration")
- public JaasLoginModuleConfiguration creatStreamJaasLoginModule() {
- Map<String, String> configs = new HashMap<>();
- configs.put("security.protocol", "SASL_SSL");
- configs.put("sasl.mechanism", "PLAIN");
- configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
- "username=client" +
- "password=client-sec;");
- log.info(">>> Loading Kafka Admin with jaas string end");
- JaasLoginModuleConfiguration jaasLoginModuleConfiguration = new JaasLoginModuleConfiguration();
- jaasLoginModuleConfiguration.setOptions(configs);
- return jaasLoginModuleConfiguration;
- }
-
- @Bean
- @Primary
- public KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties(KafkaBinderConfigurationProperties properties) {
- String saslJaasConfigString = String.format("%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "client", "client-sec");
- Map<String, String> configMap = properties.getConfiguration();
- configMap.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfigString);
- return properties;
- }
-
- }
最终启动后 日志
SSL handshake completed successfully with peerHost
至此 kafka内部ssl 客户端SASL_SSL认证成功
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。