赞
踩
java程序连接到一个需要Kerberos认证的kafka集群上,消费生产者生产的信息,kafka版本是2.10-0.10.0.1;
Java程序以maven构建,(怎么构建maven工程,可去问下度娘:“maven工程入门示例”)
先上pom.xml文件
4.0.0
com.ht
kafkaTest
1.0
org.apache.kafka
kafka-clients
0.10.0.1
org.apache.maven.plugins
maven-compiler-plugin
1.7
1.7
然后是Jave代码,先上图,一一解释图中标识:
注释:
1:可以将所需的配置文件加载到程序;(参见:度娘--“JDK 运行参数 JAVA -Dxxx与System.setProperty()的关系”)
2:新版本的Producter和Consumer都可以直接连接brocker,不用再配置zookeeper的相关信息,所以这里是要连接的kafka的主机ip和端口号
3:设置的topic的组Id
4:设置偏移量
5:设置认证配置
6:设置所要读取的主题Topic
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;public classConsumerTest {public static voidmain(String[] args) {//System.setProperty("java.security.auth.login.config", "/home/kafka/kafka_client_jaas.conf");//System.setProperty("java.security.krb5.conf", "/home/kafka/krb5.conf");//环境变量添加,需要输入配置文件的路径System.out.println("===================配置文件地址"+fsPath+"\\conf\\cons_client_jaas.conf");
Properties props = newProperties();
props.put("bootstrap.servers", "192.168.132.130:9092");
props.put("group.id", "group-1111");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "kafka");
KafkaConsumer kafkaConsumer= new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("cust_info"));while (true) {
ConsumerRecords records = kafkaConsumer.poll(1);for (ConsumerRecordrecord : records)
System.out.println("Partition:" + record.partition() + "Offset:" + record.offset() + "Value:" + record.value() + "ThreadID:" +Thread.currentThread().getId());
}
}
}
以上就是所有配置,将工程通过导出为Runnable JAR file 导出为jar文件
直接运行 java -jar jar包名.jar 即可;
如果程序里没有设置1相关的配置文件,也可以运行下列命令:
java -Djava.security.auth.login.config=/home/kafka/kafka_client_jaas.conf -Djava.security.krb5.conf=/home/kafka/krb5.conf -jar jar包名.jar
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。