当前位置:   article > 正文

SpringBoot整合并简单使用Kerberos认证的Kafka_springboot kafka kerberos

springboot kafka kerberos

早餐一个白鸡蛋吃得俺打嗝  -_-|||


准备工作

1、启动需要Kerberos认证的kafka。

注:具体配置方式可详见https://blog.csdn.net/justry_deng/article/details/88386114

2、在本地将C:\Windows\System32\drivers\etc\hosts中指定kafka所在broker的ip地址以及其对应的主机名。

提示:本人的Java代码,测试时是写在Windows上的,而kafka是部署在虚拟机Linux上的。

3、配置客户端的kafka_client_jaas.conf认证文件。

创建一个kafka_client_jaas.conf文件,并令其内容为:

  1. KafkaClient {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="bob"
  4. password="bob-pwd";
  5. };

注:本人放置客户端jaas认证文件的位置在:C:/Users/JustryDeng/Desktop/kerberos/kafka_client_jaas.conf。

注:这里指定了当前用户为bob。且此处的用户不能乱配置,只能使用(服务端)配置Kerberos认证时在
        kafka_server_jaas.conf认证文件中配置了的用户之一


SpringBoot整合并简单使用Kerberos认证的Kafka

第一步:在application.properties系统配置文件中配置Kafka相关参数。

  1. # -> -> -> -> -> -> -> -> -> -> -> -> kafka 基础配置
  2. # 指定kafka 代理地址,可以多个,以逗号分隔
  3. spring.kafka.bootstrap-servers=kafka-single:9095
  4. # -> -> -> -> -> -> -> -> -> -> -> -> kafka 消息生产者
  5. # 重试次数
  6. spring.kafka.producer.retries=0
  7. # -> -> -> -> -> -> -> -> -> -> -> -> kafka 消息消费者
  8. # 偏移量策略设置
  9. # earliest(当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费)
  10. # latest(当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据)
  11. # none(topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常)
  12. spring.kafka.consumer.auto-offset-reset=earliest
  13. # 允许自动提交消费者偏移量
  14. spring.kafka.consumer.enable-auto-commit=true
  15. # -> -> -> -> -> -> -> -> -> -> -> -> Kerberos
  16. # Kerberos基础配合配置
  17. spring.kafka.jaas.enabled=true
  18. spring.kafka.properties.sasl.mechanism = PLAIN
  19. spring.kafka.properties.security.protocol = SASL_PLAINTEXT

第二步:在pom.xml中引入相关依赖。

  1. <!-- 需要引入与所安装的kafka对应版本的依赖 -->
  2. <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
  3. <dependency>
  4. <groupId>org.springframework.kafka</groupId>
  5. <artifactId>spring-kafka</artifactId>
  6. <version>2.2.2.RELEASE</version>
  7. </dependency>

注:引入什么版本的依赖需要看安装的是什么版本的Kafka。

更多对应关系可详见http://spring.io/projects/spring-kafka

注:本人安装的Kafka版本是2.1.0的,所以会这里引入的依赖时2.2.x的spring-kafka。

这里给出完整的pom.xml:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.1.2.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.aspire</groupId>
  12. <artifactId>kafka</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>kafka</name>
  15. <description>java使用kafka,简单示例</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-test</artifactId>
  27. <scope>test</scope>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.projectlombok</groupId>
  31. <artifactId>lombok</artifactId>
  32. <optional>true</optional>
  33. </dependency>
  34. <!-- 需要引入与所安装的kafka对应版本的依赖 -->
  35. <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
  36. <dependency>
  37. <groupId>org.springframework.kafka</groupId>
  38. <artifactId>spring-kafka</artifactId>
  39. <version>2.2.2.RELEASE</version>
  40. </dependency>
  41. </dependencies>
  42. <build>
  43. <plugins>
  44. <plugin>
  45. <groupId>org.springframework.boot</groupId>
  46. <artifactId>spring-boot-maven-plugin</artifactId>
  47. </plugin>
  48. </plugins>
  49. </build>
  50. </project>

第三步:编写消费者生产者等相关类

项目结构说明

注:从上往下分别是消费者、生产者、启动项目时会进行消息发送的Runner类、启动类、配置文件、pom.xml文件。

生产者(示例)

  1. package com.single;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * 生产者
  6. *
  7. * @author JustryDeng
  8. * @date 2019/2/15 9:44
  9. */
  10. @Component
  11. public class MyProducer {
  12. private final KafkaTemplate<String, String> kafkaTemplate;
  13. public MyProducer(KafkaTemplate<String, String> kafkaTemplate) {
  14. this.kafkaTemplate = kafkaTemplate;
  15. }
  16. /**
  17. * 发送消息
  18. * 注:kafkaTemplate.send(param...)是一个异步的方法,其发送结果可以通过Future的实现来获得。
  19. *
  20. * @author JustryDeng
  21. * @date 2019/2/15 12:01
  22. */
  23. public void sendMessage(String message){
  24. String topic = "topicOne";
  25. // 将消息发送到指定的主题下;这里可以是一个不存在的主题,会自动创建(注:自动创建的主题默认的分区数量是1个)
  26. kafkaTemplate.send(topic, message);
  27. }
  28. }

注:KafkaTemplate提供了多个发送消息的方法,上例所示只是其一。

消费者

  1. package com.single;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * 消费者
  6. *
  7. * @author JustryDeng
  8. * @date 2019/2/15 9:44
  9. */
  10. @Component
  11. public class MyConsumer {
  12. /**
  13. * 注:@KafkaListener注解的属性很多,包括 匹配topic、错误处理、分组等等,
  14. * 实际使用时根据项目情况灵活运用即可。
  15. *
  16. * @author JustryDeng
  17. * @date 2019/2/15 12:30
  18. */
  19. @KafkaListener(topics = {"topicOne", "topicB"}, groupId = "group-01")
  20. public void consumerOne(String message) {
  21. System.out.println("consumerOne消费了消息 -> " + message);
  22. }
  23. /**
  24. * 注:@KafkaListener注解的属性很多,包括 匹配topic、错误处理、分组等等,
  25. * 实际使用时根据项目情况灵活运用即可。
  26. *
  27. * @author JustryDeng
  28. * @date 2019/2/15 12:30
  29. */
  30. @KafkaListener(topics = {"topicOne", "topicB"}, groupId = "group-01")
  31. public void consumerTwo(String message) {
  32. System.out.println("consumerTwo消费了消息 -> " + message);
  33. }
  34. /**
  35. * 注:@KafkaListener注解的属性很多,包括 匹配topic、错误处理、分组等等,
  36. * 实际使用时根据项目情况灵活运用即可。
  37. *
  38. * @author JustryDeng
  39. * @date 2019/2/15 12:30
  40. */
  41. @KafkaListener(topics = {"topicOne", "topicB"}, groupId = "group-02")
  42. public void consumerThree(String message) {
  43. System.out.println("consumerThree消费了消息 -> " + message);
  44. }
  45. }

注:更多的时候,我们会使用ConsumerRecord<K, V>来接收消息,因为我们可以从该模型中获取到更多的信息,如:

ConsumerRecord<K, V>中封装的信息有:

启动类

  1. package com;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. /**
  5. * 启动类
  6. *
  7. * @author JustryDeng
  8. * @date 2019/2/15 9:43
  9. */
  10. @SpringBootApplication
  11. public class KafkaApplication {
  12. public static void main(String[] args) {
  13. systemPropertisConfig();
  14. SpringApplication.run(KafkaApplication.class, args);
  15. }
  16. /**
  17. * 系统环境属性 --- 设置
  18. *
  19. * 注:因为是系统参数,多出地方都要使用;所以直接写在启动类里面
  20. *
  21. * 注:设置系统环境属性 的 方式较多,这只是其中的一种
  22. *
  23. * @author JustryDeng
  24. * @date 2019/2/24 10:31
  25. */
  26. private static void systemPropertisConfig(){
  27. System.setProperty("java.security.auth.login.config",
  28. "C:/Users/JustryDeng/Desktop/kerberos/kafka_client_jaas.conf");
  29. }
  30. }

注:本人放置客户端jaas认证文件的位置在:C:/Users/JustryDeng/Desktop/kerberos/kafka_client_jaas.conf。

Runner测试类

  1. package com.single;
  2. import org.springframework.boot.ApplicationArguments;
  3. import org.springframework.boot.ApplicationRunner;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * 测试消息生产者与消息消费者
  7. *
  8. *
  9. * @author JustryDeng
  10. * @date 2019/2/15 12:35
  11. */
  12. @Component
  13. public class MyRunner implements ApplicationRunner {
  14. private final MyProducer myProducer;
  15. public MyRunner(MyProducer myProducer) {
  16. this.myProducer = myProducer;
  17. }
  18. @Override
  19. public void run(ApplicationArguments args) {
  20. int count = 10;
  21. for (int i = 0; i < count; i++) {
  22. myProducer.sendMessage("我是消息, 我被发送出去啦!" + i);
  23. }
  24. }
  25. }

测试一下

启动SpringBoot启动类,控制台输出:

 

注:【topicOne】主题是本人重新创建的只有一个分区的主题。所以同一个消费者组下,这批消息的消费只能
         有一个(consumerOne和consumerTwo属于同一个消费者组,consumerThree属于另一个消费者组。所以
         如上图所示,要么只有consumerOne和consumerThree,要么只有consumerOne和consumerThree)。

可见示例成功!

 

声明:本文为学习笔记,学习自51CTO,《Kafka消息中间件》,讲师李兴华。

^_^ 如有不当之处,欢迎指正

^_^ 学习视频:
              《Kafka消息中间件》,讲师李兴华

^_^ 测试代码托管链接:
              https://github.com/JustryDeng/CommonRepository

^_^ 本文已经被收录进《程序员成长笔记(四)》,笔者JustryDeng

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/951015
推荐阅读
相关标签
  

闽ICP备14008679号