当前位置:   article > 正文

Kerberos安全认证-连载12-Kafka Kerberos安全配置及访问

kafka kerberos

目录

1. Kafka配置Kerberos

2. 客户端操作Kafka

​​​​​​​3. Java API操作Kafka

4. StructuredStreaming操作Kafka

5. Flink 操作Kafka


技术连载系列,前面内容请参考前面连载11内容:​​​​​​​​​​​​​​Kerberos安全认证-连载11-HBase Kerberos安全配置及访问_IT贫道的博客-CSDN博客

1. Kafka配置Kerberos

Kafka也支持通过Kerberos进行认证,避免非法用户操作读取Kafka中的数据,对Kafka进行Kerberos认证可以按照如下步骤实现。

1) 创建Kafka服务Princial主体并写入到keytab文件

在kerberos服务端node1节点执行如下命令创建Kafka服务主体:

  1. [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 kafka/node1"
  2. [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 kafka/node2"
  3. [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 kafka/node3"

在kerberos服务端node1节点执行如下命令将Kafka服务主体写入到keytab文件。

  1. #node1节点执行命令,将主体写入到keytab
  2. [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/kafka.service.keytab kafka/node1@EXAMPLE.COM"
  3. [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/kafka.service.keytab kafka/node2@EXAMPLE.COM"
  4. [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/kafka.service.keytab kafka/node3@EXAMPLE.COM"

以上命令执行完成后,在node1节点/home/keytabs目录下生成kafka.service.keytab文件,将该文件分发到各个节点并赋权,这里可以只发送到node1~node3 Kafka所在节点,为了保证各个大数据集群节点的keytabs一致,这里分发到所有节点。

  1. [root@node1 ~]# scp /home/keytabs/kafka.service.keytab node2:/home/keytabs/
  2. [root@node1 ~]# scp /home/keytabs/kafka.service.keytab node3:/home/keytabs/
  3. [root@node1 ~]# scp /home/keytabs/kafka.service.keytab node4:/home/keytabs/
  4. [root@node1 ~]# scp /home/keytabs/kafka.service.keytab node5:/home/keytabs/

分发完成后,在集群各个节点上执行如下命令,修改kafka.service.keytab密钥文件访问权限:

chmod 770 /home/keytabs/kafka.service.keytab

​​​​​​​​​​​​​​2) 修改配置server.properties文件

在Kafka各个节点KAFKA_HOME/config/server.properties文件中加入如下配置以支持Kerberos安全认证。

  1. #在node1~node3所有节点单独配置
  2. listeners=SASL_PLAINTEXT://:9092
  3. inter.broker.listener.name=SASL_PLAINTEXT
  4. sasl.mechanism.inter.broker.protocol=GSSAPI
  5. sasl.enabled.mechanisms=GSSAPI
  6. sasl.kerberos.service.name=kafka
  7. authorizer.class.name=kafka.security.authorizer.AclAuthorizer
  8. zookeeper.set.acl=false
  9. allow.everyone.if.no.acl.found=true

该配置需要在node1~node3所有节点配置,以上参数解释如下:

  • listeners=SASL_PLAINTEXT://:9092

指定kafka监听的协议和端口,SASL_PLAINTEXT表示使用SASL(Simple Authentication and Security Layer)机制进行认证和加密通信。

  • inter.broker.listener.name=SASL_PLAINTEXT

指定Kafka Broker 之间通信使用SASL_PLAINTEXT机制进行认证和加密通信。

  • sasl.mechanism.inter.broker.protocol=GSSAPI

指定broker之间通信使用SASL机制,GSSAPI是一种基于Kerberos的SASL机制,它使用GSS-API(Generic Security Services Application Programming Interface)进行认证。

  • sasl.enabled.mechanisms=GSSAPI

只启用了GSSAPI机制,表示Kafka只接受使用Kerberos进行认证的连接。

  • sasl.kerberos.service.name=kafka

指定Kafka在Kerberos中注册的服务名称,以便用于进行身份验证和授权。

  • authorizer.class.name=kafka.security.authorizer.AclAuthorizer

指定Kafka使用的授权器类。

  • zookeeper.set.acl=false

是否在ZooKeeper中设置ACL(Access Control List),这里设置为false,表示不对ZooKeeper节点设置ACL。

  • allow.everyone.if.no.acl.found=true

指定当没有匹配的ACL规则时,是否允许所有用户访问。

3) 准备kafka_jaas.conf文件

在node1~node3各个节点中准备kafka_jaas.conf文件,该文件配置kafka服务端和zookeeper客户端的身份验证和授权配置,由于zookeeper开启了Kerberos认证,所以这里需要进行zookeeper客户端的身份验证配置。

这里在各个kafka节点KAFKA_HOME/config/目录中创建kafka_jaas.conf文件,内容如下:

  1. KafkaServer {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. storeKey=true
  5. keyTab="/home/keytabs/kafka.service.keytab"
  6. serviceName="kafka"
  7. principal="kafka/node1@EXAMPLE.COM";
  8. };
  9. Client {
  10. com.sun.security.auth.module.Krb5LoginModule required
  11. useKeyTab=true
  12. storeKey=true
  13. serviceName="zookeeper"
  14. keyTab="/home/keytabs/zookeeper.service.keytab" principal="zookeeper/node3@EXAMPLE.COM";
  15. };

以上在可以在node1 kafka客户端配置完成后,分发到node2~node3节点中,如下:

  1. #在node1节点分发,分发后在node2、node3节点配置对应Server principal
  2. [root@node1 config]# scp /software/kafka_2.12-3.3.1/config/kafka_jaas.conf node2:/software/kafka_2.12-3.3.1/config/
  3. [root@node1 config]# scp /software/kafka_2.12-3.3.1/config/kafka_jaas.conf node3:/software/kafka_2.12-3.3.1/config/

注意:在node1~node3各个kafka节点中该文件中的KafkaServer对应的principal不同,为对应各个节点hostname。

4) 准备kafka_client_jaas.conf

在各个kafka节点配置kafka_client_jaas.conf配置文件,该文件作用主要是对kafka 客户端进行身份认证。这里在node1~node3节点KAFKA_HOME/config/中创建kafka_client_jaas.conf文件,内容如下:

  1. KafkaClient {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. storeKey=true
  5. keyTab="/home/keytabs/kafka.service.keytab"
  6. serviceName="kafka"
  7. principal="kafka/node1@EXAMPLE.COM";
  8. };

可以在node1节点配置该文件后分发到node2~node3节点中:

  1. #分发到node2、node3节点,需要在对应节点配置对应的principal
  2. [root@node1 config]# scp /software/kafka_2.12-3.3.1/config/kafka_client_jaas.conf node2:/software/kafka_2.12-3.3.1/config/
  3. [root@node1 config]# scp /software/kafka_2.12-3.3.1/config/kafka_client_jaas.conf node3:/software/kafka_2.12-3.3.1/config/

以上文件分发完成后,需要在对应节点修改配置对应的Principal信息为对应的hostname。

5) 修改启动脚本kafka-server-start.sh

在kafka各个节点中配置KAFKA_HOME/bin/kafka-server-start.sh启动脚本,在该脚本中加入kafka_jaas.conf配置,加入的内容如下:

  1. #在node1~node3各个节点都要配置kafka-server-start.sh
  2. export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/software/kafka_2.12-3.3.1/config/kafka_jaas.conf"

​​​​​​​​​​​​​​6) 修改kafka操作脚本

在当前集群中node1~node3节点是kafka服务端,同时如果在这3个节点上进行kafka 命令操作,这三个节点也是kafka客户端。在操作kafka时我们通常会操作KAFKA_HOME中的kafka-topic.sh、kafka-console-producer.sh、kafka-console-consumer.sh脚本,这些脚本需要进行kerberos认证,可以通过前面配置的kafka_client_jaas.conf文件进行Kerberos认证,所以这里在各个脚本中加入如下配置,避免在操作对应脚本时没有进行认证从而没有操作权限。

在node1~node3各个kafka 客户端配置以上脚本,可以先在node1节点进行配置各文件然后分发到其他kafka客户端,对应操作文件增加如下配置:

  1. #vim /software/kafka_2.12-3.3.1/bin/kafka-topics.sh
  2. export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/software/kafka_2.12-3.3.1/config/kafka_client_jaas.conf"
  3. #vim /software/kafka_2.12-3.3.1/bin/kafka-console-producer.sh
  4. export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/software/kafka_2.12-3.3.1/config/kafka_client_jaas.conf"
  5. #vim /software/kafka_2.12-3.3.1/bin/kafka-console-consumer.sh
  6. export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/software/kafka_2.12-3.3.1/config/kafka_client_jaas.conf"

在node1 kafka 客户端配置完成后,将配置后的各个脚本文件分发到其他kafka客户端节点上:

  1. [root@node1 ~]# cd /software/kafka_2.12-3.3.1/bin/
  2. [root@node1 bin]# scp ./kafka-topics.sh ./kafka-console-producer.sh ./kafka-console-consumer.sh node2:`pwd`
  3. [root@node1 bin]# scp ./kafka-topics.sh ./kafka-console-producer.sh ./kafka-console-consumer.sh node3:`pwd`

​​​​​​​​​​​​​​7) 准备client.properites

在node1~node3各个kafka 客户端中准备client.properties配置文件,该配置文件内容如下,这里将该文件创建在各个节点的/root目录下。

  1. security.protocol=SASL_PLAINTEXT
  2. sasl.mechanism=GSSAPI
  3. sasl.kerberos.service.name=kafka

当Kafka通过Kerberos认证后,在执行KAFKA_HOME/bin目录下的脚本时,需要使用正确的协议、SASL机制及kerberos服务,所以这里将以上信息配置到client.properties文件中,在执行各个脚本时需要通过参数指定该文件,这样客户端可以和服务端正常通信。

在node1配置完成/root/client.properties文件后,分发到node2~node3节点中:

  1. [root@node1 ~]# scp ./client.properties node2:`pwd`
  2. [root@node1 ~]# scp ./client.properties node3:`pwd`

​​​​​​​8) 启动kafka集群

启动kafka集群前需要先启动Zookeeper,然后在各个kafka服务节点启动kafka,完成kafka集群启动。操作如下:

  1. #node3~node5各节点启动zookeeper
  2. zkServer.sh start
  3. #node1~node3各节点启动kafka
  4. startKafka.sh

​​​​​​​2. 客户端操作Kafka

启动Kafka集群后,通过如下命令在Kafka集群查询、创建topic以及向topic中写入数据,以下命令可以执行在node1~node3各个kafka 客户端中。

  1. #创建kafka topic
  2. kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --create --topic test --partitions 3 --replication-factor 1 --command-config /root/client.properties
  3. #查看集群topic
  4. kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list --command-config /root/client.properties
  5. #向kafka topic中写入数据
  6. [root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test --producer.config /root/client.properties
  7. >1
  8. >2
  9. >3
  10. >4
  11. >5
  12. #读取kafka topic中的数据
  13. [root@node1 ~]# kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test --from-beginning --consumer.config /root/client.properties
  14. 1
  15. 2
  16. 3
  17. 4
  18. 5

​​​​​​​3. Java API操作Kafka

可以按照如下步骤实现Java API操作Kerberos认证的Kafka数据。

1) 准备krb5.conf文件

将node1 kerberos服务端/etc/krb5.conf文件存放在IDEA项目中的resources资源目录中或者本地Window固定的某个目录中,用于编写代码时指定访问Kerberos的Realm。

2) 准备用户keytab文件

在kerberos服务端node1节点上将生成的kafka.server.keytab文件存入到window路径中,这里放在项目resource资源目录下,后续需要该文件进行客户端认证。

3) 准备kafka_client_jaas.conf文件

将Kafka 中kafka_client_jaas.conf文件放在window中某个路径中,并修改该文件中keytab路径为window路径:

  1. KafkaClient {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. storeKey=true
  5. keyTab="D:/idea_space/KerberosAuth/KerberosAuthKafka/src/main/resources/kafka.service.keytab"
  6. serviceName="kafka"
  7. principal="kafka/node1@EXAMPLE.COM";
  8. };

特别需要注意的是该文件中指定window路径时使用“/”或“\\”隔开各目录,否则客户端认证时读取不到keytab文件。这里将修改后的kafka_client_jaas.conf文件存入到项目resource资源目录下。

4) 编写Java代码向Kafka topic中读写数据

编写代码前,需要在项目pom.xml中引入如下依赖:

  1. <!-- kafka client依赖包 -->
  2. <dependency>
  3. <groupId>org.apache.kafka</groupId>
  4. <artifactId>kafka-clients</artifactId>
  5. <version>3.3.1</version>
  6. </dependency>

Java API读写Kafka Topic代码如下:

  1. /**
  2. * Java API 操作Kerbros认证的Kafka
  3. * 使用 JAAS 来进行 Kerberos 认证
  4. * 注意:kafka_client_jaas.conf文件中的keytab文件路径需要使用双斜杠或者反单斜杠
  5. */
  6. public class OperateAuthKafka {
  7. public static void main(String[] args) {
  8. //准备JAAS配置文件路径
  9. String kafkaClientJaasFile = "D:\\idea_space\\KerberosAuth\\KerberosAuthKafka\\src\\main\\resources\\kafka_client_jaas.conf";
  10. // Kerberos配置文件路径
  11. String krb5FilePath = "D:\\idea_space\\KerberosAuth\\KerberosAuthKafka\\src\\main\\resources\\krb5.conf";
  12. System.setProperty("java.security.auth.login.config", kafkaClientJaasFile);
  13. System.setProperty("java.security.krb5.conf", krb5FilePath);
  14. Properties props = new Properties();
  15. props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
  16. props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  17. props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  18. //kerberos安全认证
  19. props.setProperty("security.protocol", "SASL_PLAINTEXT");
  20. props.setProperty("sasl.mechanism", "GSSAPI");
  21. props.setProperty("sasl.kerberos.service.name", "kafka");
  22. //向Kafka topic中发送消息
  23. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
  24. kafkaProducer.send(new ProducerRecord<>("test", "100"));
  25. kafkaProducer.send(new ProducerRecord<>("test", "200"));
  26. kafkaProducer.send(new ProducerRecord<>("test", "300"));
  27. kafkaProducer.send(new ProducerRecord<>("test", "400"));
  28. kafkaProducer.send(new ProducerRecord<>("test", "500"));
  29. kafkaProducer.close();
  30. System.out.println("消息发送成功");
  31. /**
  32. * 从Kafka topic中消费消息
  33. */
  34. props.setProperty("group.id", "test"+ UUID.randomUUID());
  35. //设置消费的位置,earliest表示从头开始消费,latest表示从最新的位置开始消费
  36. props.setProperty("auto.offset.reset", "earliest");
  37. props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  38. props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  39. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
  40. kafkaConsumer.subscribe(Arrays.asList("test"));
  41. while (true) {
  42. // 拉取数据
  43. ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100));
  44. for (ConsumerRecord<String, String> record : consumerRecords) {
  45. // 获取数据对应的分区号
  46. int partition = record.partition();
  47. // 对应数据值
  48. String value = record.value();
  49. //对应数据的偏移量
  50. long lastoffset = record.offset();
  51. //对应数据发送的key
  52. String key = record.key();
  53. System.out.println("数据的key为:"+ key +
  54. ",数据的value为:" + value +
  55. ",数据的offset为:"+ lastoffset +
  56. ",数据的分区为:"+ partition);
  57. }
  58. }
  59. }
  60. }

4. StructuredStreaming操作Kafka

StructuredStreaming操作Kafka时同样需要准备krb5.conf、kafka.service.keytab、kafka_client_jaas.conf配置文件,步骤参考Java API操作Kafka部分。

编写代码前,需要在项目pom.xml中引入如下依赖:

  1. <!-- SparkSQL -->
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-sql_2.12</artifactId>
  5. <version>3.4.0</version>
  6. </dependency>
  7. <!-- Kafka 0.10+ Source For Structured Streaming-->
  8. <dependency>
  9. <groupId>org.apache.spark</groupId>
  10. <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
  11. <version>3.4.0</version>
  12. </dependency>

StructuredStreaming操作Kafka代码如下:

  1. /**
  2. * StructuredStreaming 读取Kerberos 认证的Kafka数据
  3. */
  4. public class StructuredStreamingReadAuthKafka {
  5. public static void main(String[] args) throws TimeoutException, StreamingQueryException {
  6. //准备JAAS配置文件路径
  7. String kafkaClientJaasFile = "D:\\idea_space\\KerberosAuth\\KerberosAuthKafka\\src\\main\\resources\\kafka_client_jaas.conf";
  8. // Kerberos配置文件路径
  9. String krb5FilePath = "D:\\idea_space\\KerberosAuth\\KerberosAuthKafka\\src\\main\\resources\\krb5.conf";
  10. System.setProperty("java.security.auth.login.config", kafkaClientJaasFile);
  11. System.setProperty("java.security.krb5.conf", krb5FilePath);
  12. //1.创建对象
  13. SparkSession spark = SparkSession.builder()
  14. .master("local")
  15. .appName("kafka source")
  16. .config("spark.sql.shuffle.partitions", 1)
  17. .getOrCreate();
  18. spark.sparkContext().setLogLevel("Error");
  19. //2.读取kafka 数据
  20. Dataset<Row> df = spark.readStream()
  21. .format("kafka")
  22. .option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092")
  23. .option("subscribe", "test")
  24. .option("startingOffsets", "earliest")
  25. //kerberos安全认证
  26. .option("kafka.security.protocol", "SASL_PLAINTEXT")
  27. .option("kafka.sasl.mechanism", "GSSAPI")
  28. .option("kafka.sasl.kerberos.service.name", "kafka")
  29. .load();
  30. Dataset<Row> result = df.selectExpr("cast (key as string)", "cast (value as string)");
  31. StreamingQuery query = result.writeStream()
  32. .format("console")
  33. .start();
  34. query.awaitTermination();
  35. }
  36. }

5. Flink 操作Kafka

Flink操作Kafka时同样需要准备krb5.conf、kafka.service.keytab、kafka_client_jaas.conf配置文件,步骤参考Java API操作Kafka部分。

编写代码前,需要在项目pom.xml中引入如下依赖:

  1. <!-- Flink批和流开发依赖包 -->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-clients</artifactId>
  5. <version>1.16.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java</artifactId>
  10. <version>1.16.0</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-connector-kafka</artifactId>
  15. <version>1.16.0</version>
  16. </dependency>

Flink操作Kafka代码如下:

  1. /**
  2. * Flink 读取Kerberos 认证的Kafka数据
  3. */
  4. public class FlinkReadAuthKafka {
  5. public static void main(String[] args) throws Exception {
  6. //准备JAAS配置文件路径
  7. String kafkaClientJaasFile = "D:\\idea_space\\KerberosAuth\\KerberosAuthKafka\\src\\main\\resources\\kafka_client_jaas.conf";
  8. // Kerberos配置文件路径
  9. String krb5FilePath = "D:\\idea_space\\KerberosAuth\\KerberosAuthKafka\\src\\main\\resources\\krb5.conf";
  10. System.setProperty("java.security.auth.login.config", kafkaClientJaasFile);
  11. System.setProperty("java.security.krb5.conf", krb5FilePath);
  12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. KafkaSource<Tuple2<String, String>> kafkaSource = KafkaSource.<Tuple2<String, String>>builder()
  14. .setBootstrapServers("node1:9092,node2:9092,node3:9092") //设置Kafka 集群节点
  15. .setTopics("test") //设置读取的topic
  16. .setGroupId("my-test-group") //设置消费者组
  17. //kerberos安全认证
  18. .setProperty("security.protocol", "SASL_PLAINTEXT")
  19. .setProperty("sasl.mechanism", "GSSAPI")
  20. .setProperty("sasl.kerberos.service.name", "kafka")
  21. .setStartingOffsets(OffsetsInitializer.earliest()) //设置读取数据位置
  22. .setDeserializer(new KafkaRecordDeserializationSchema<Tuple2<String, String>>() {
  23. //设置key ,value 数据获取后如何处理
  24. @Override
  25. public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<Tuple2<String, String>> collector) throws IOException {
  26. String key = null;
  27. String value = null;
  28. if(consumerRecord.key() != null){
  29. key = new String(consumerRecord.key(), "UTF-8");
  30. }
  31. if(consumerRecord.value() != null){
  32. value = new String(consumerRecord.value(), "UTF-8");
  33. }
  34. collector.collect(Tuple2.of(key, value));
  35. }
  36. //设置置返回的二元组类型
  37. @Override
  38. public TypeInformation<Tuple2<String, String>> getProducedType() {
  39. return TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
  40. });
  41. }
  42. })
  43. .build();
  44. DataStreamSource<Tuple2<String, String>> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");
  45. kafkaDS.print();
  46. env.execute();
  47. }
  48. }

欢迎点赞、评论、收藏,关注IT贫道,获取IT技术知识!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/950987
推荐阅读
相关标签
  

闽ICP备14008679号