赞
踩
一、切换到存储证书的路径
- 我这里在家目录中的创建了ssl文件夹
- mkdir ssl && cd ssl
二、生成服务端密钥库
keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey
验证证书:
keytool -list -v -keystore server.keystore.jks
三、创建CA 并将CA导入到truststore中
- 生成CA:
- openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
-
- 将CA导入到truststore中:
- keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
- keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
四、从KeyStore导出证书,并用上述步骤生成的CA来签名生成的证书:
- 导出证书:
- keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
-
- 签名:
- openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:ka123456
五、将证书导入KeyStore
- keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
- keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
六、生成客户端密钥库
keytool -keystore client.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
七、配置zookeeper认证:
1、zookeeper是homebrew安装kafka是安装,配置文件路径为:/usr/local/opt/kafka/libexec/config/,在此目录下创建zookeeper-server-jaas.conf配置文件,定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名="用户密码",内容如下:
- Server {
- org.apache.zookeeper.server.auth.DigestLoginModule required
- user_admin="qaz!@#QAZ"
- user_kafka="qaz123!@#QAZ";
- };
2、zookeeper配置文件zookeeper.properties中新增SASL认证配置,如下:
- dataDir=/usr/local/var/lib/zookeeper
- # the port at which the clients will connect
- clientPort=2181
- # disable the per-ip limit on the number of connections since this is a non-production config
- maxClientCnxns=0
- # Disable the adminserver by default to avoid port conflicts.
- # Set the port to something non-conflicting if choosing to enable this
- admin.enableServer=false
- # admin.serverPort=8080
- authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
- requireClientAuthScheme=sasl
- jaasLoginRenew=3600000
3、在/usr/local/opt/zookeeper/libexec/bin/zkEnv.sh脚本中新增jvm参数,让其启动时加载jaas配置文件
- export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/usr/local/opt/kafka/libexec/config/zookeeper-server-jaas.conf"
-
- # default heap for zookeeper client
- ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-256}"
- export CLIENT_JVMFLAGS="-Xmx${ZK_CLIENT_HEAP}m $CLIENT_JVMFLAGS"
八、配置kafka安全认证:
1、在/usr/local/opt/kafka/libexec/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:
- kafka-server-jaas.conf:
- KafkaServer {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="admin"
- password="qazQAZ123!@#"
- user_admin="qazQAZ123!@#"
- user_kafka="qaz!@#123";
- };
-
- Client {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="kafka"
- password="qaz123!@#QAZ";
- };
-
-
-
- kafka-client-jaas.conf:
- KafkaClient {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="kafka"
- password="qaz!@#123";
- };
2、在kafka启动脚本(/usr/local/opt/kafka/libexec/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:
-Djava.security.auth.login.config=/usr/local/Cellar/kafka/3.2.0/libexec/config/kafka-server-jaas.conf
3、修改kafka配置文件:/usr/local/opt/kafka/libexec/config/server.properties
九、java端代码
将步骤三生成的客户信任库client.truststore.jks复制到到java项目中,以便client可以信任这个CA
生产者:
- // 1. 创建 kafka 生产者的配置对象
- Properties props = new Properties();
-
- // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_SSL");
- props.put(SaslConfigs.SASL_MECHANISM,"PLAIN"); props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka' password='qaz!@#123';");
- props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,""); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,DEFAULT_BASE_CONF_PATH+"client.truststore.jks");
- props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,"ka123456");
- // 3. 创建 kafka 生产者对象
- KafkaProducer<String, String> producer = new KafkaProducer<>(props);
- System.out.println("开始发送数据");
-
- String msg = "Hello World";
- // 4. 调用 send 方法,发送消息
- producer.send(new ProducerRecord<String, String>("topic1","value ",msg));
- // 5. 关闭资源
- producer.close();
消费者:
- String filePath = file.getAbsolutePath();
- // 1.创建消费者的配置对象
- Properties props = new Properties();
- // 2.给消费者配置对象添加参数
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093");
- // 配置序列化 必须
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- // 配置消费者组(组名任意起名) 必须
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1");
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //关闭自动提交
- props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_SSL");
- props.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
- props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka' password='qaz!@#123';");
-
- props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,""); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,filePath+"/conf/client.truststore.jks");
- props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,"ka123456");
-
- // 创建消费者对象
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- // 注册要消费的主题(可以消费多个主题)
- ArrayList<String> topics = new ArrayList<>();
- topics.add("topic1");
- consumer.subscribe(topics);
- System.out.println("准备接收数据.........");
- // 拉取数据打印
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
- for(ConsumerRecord<String,String> record : records){
- String value = record.value();
- System.out.println(value);
- }
- if(records.count() > 0) {
- //手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
- consumer.commitAsync(new OffsetCommitCallback() {
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
- if (exception != null) {
- System.err.println("Commit falide for" + offsets);
- System.err.println("Commit falide exception" + exception.getStackTrace());
- }
- }
- });
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。