当前位置:   article > 正文

kerberos api java_Java Api Consumer 连接启用Kerberos认证的Kafka

jdk17 kerberos认证

java程序连接到一个需要Kerberos认证的kafka集群上,消费生产者生产的信息,kafka版本是2.10-0.10.0.1;

Java程序以maven构建,(怎么构建maven工程,可去问下度娘:“maven工程入门示例”)

先上pom.xml文件

4.0.0

com.ht

kafkaTest

1.0

org.apache.kafka

kafka-clients

0.10.0.1

org.apache.maven.plugins

maven-compiler-plugin

1.7

1.7

然后是Jave代码,先上图,一一解释图中标识:

1d2c7cb9e6c74e510c4ea93ab2526787.png

注释:

1:可以将所需的配置文件加载到程序;(参见:度娘--“JDK 运行参数 JAVA -Dxxx与System.setProperty()的关系”)

2:新版本的Producter和Consumer都可以直接连接brocker,不用再配置zookeeper的相关信息,所以这里是要连接的kafka的主机ip和端口号

3:设置的topic的组Id

4:设置偏移量

5:设置认证配置

6:设置所要读取的主题Topic

import java.util.Arrays;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;public classConsumerTest {public static voidmain(String[] args) {//System.setProperty("java.security.auth.login.config", "/home/kafka/kafka_client_jaas.conf");//System.setProperty("java.security.krb5.conf", "/home/kafka/krb5.conf");//环境变量添加,需要输入配置文件的路径System.out.println("===================配置文件地址"+fsPath+"\\conf\\cons_client_jaas.conf");

Properties props = newProperties();

props.put("bootstrap.servers", "192.168.132.130:9092");

props.put("group.id", "group-1111");

props.put("enable.auto.commit", "false");

props.put("auto.commit.interval.ms", "1000");

props.put("auto.offset.reset", "earliest");

props.put("session.timeout.ms", "30000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("security.protocol", "SASL_PLAINTEXT");

props.put("sasl.mechanism", "GSSAPI");

props.put("sasl.kerberos.service.name", "kafka");

KafkaConsumer kafkaConsumer= new KafkaConsumer<>(props);

kafkaConsumer.subscribe(Arrays.asList("cust_info"));while (true) {

ConsumerRecords records = kafkaConsumer.poll(1);for (ConsumerRecordrecord : records)

System.out.println("Partition:" + record.partition() + "Offset:" + record.offset() + "Value:" + record.value() + "ThreadID:" +Thread.currentThread().getId());

}

}

}

以上就是所有配置,将工程通过导出为Runnable JAR file 导出为jar文件

直接运行   java -jar jar包名.jar  即可;

如果程序里没有设置1相关的配置文件,也可以运行下列命令:

java -Djava.security.auth.login.config=/home/kafka/kafka_client_jaas.conf   -Djava.security.krb5.conf=/home/kafka/krb5.conf  -jar  jar包名.jar

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/950979
推荐阅读
相关标签
  

闽ICP备14008679号