当前位置:   article > 正文

【Kafka】SASL认证的Kafka客户端代码示例(spring-kafka和原生客户端)_spring.kafka.properties.sasl.jaas.config

spring.kafka.properties.sasl.jaas.config

spring-kafka

添加依赖:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.6.3</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

添加spring-kafka相关配置:

#=============== 集群通用配置 ================
spring.kafka.bootstrap-servers=kafka-dyskevxt-headless.kafka-uat.svc.xke.test.xdf.cn:29092
spring.kafka.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="zhurunhua" password="pwd";

#=============== producer  =================
spring.kafka.producer.retries=5
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=1000
spring.kafka.producer.buffer-memory=1000000
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  ==================
spring.kafka.consumer.group-id=zhurunhua-test-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

消费者:

@Component
public class TestPlainConsumer {

    @KafkaListener(topics = {"zhurunhua-test-topic"})
    public void consumer(ConsumerRecord<String, String> record) {
        System.out.println(record.value());
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

topic可以从配置文件读取,代码中通过${}的方式获取配置的topic:

@Component
public class TestPlainConsumer {

    @KafkaListener(topics = {"${kafka.subscribe.topic}"})
    public void consumer(ConsumerRecord<String, String> record) {
        System.out.println(record.value());
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

生产者:

@Component
public class TestPlainProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.subscribe.topic}")
    private String topic;

    public void send(String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.err.println(throwable);
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                System.out.println("发送结果:" + sendResult);
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

原生客户端

引入依赖

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.6.0</version>
      <scope>compile</scope>
    </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

客户端代码

public class TestSaslClient {

    private final static String TOPIC = "zhurunhua-test-topic";

    private final static String BROKERS = "kafka-dyskevxt-headless.kafka-uat.svc.xke.test.xdf.cn:29092";

    private static KafkaConsumer<String, String> consumer;

    private static KafkaProducer<String, String> producer;

    static {
        Properties c = initConfig();
        consumer = new KafkaConsumer<>(c);
        producer = new KafkaProducer<>(c);
    }

    /**
     * 初始化配置
     *
     * @return java.util.Properties
     * @date 2023/04/17 5:51 下午
     */
    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "zhurunhua-test-1");

        // SASL/PLAIN 认证配置
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT);
        props.put(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"zhurunhua\" password=\"pwd\";");

        // 可选设置属性
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // 自动提交offset,每1s提交一次
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        return props;
    }

    /**
     * 消费者示例
     *
     * @date 2023/04/17 5:51 下午
     */
    public void subscribe() {
        consumer.subscribe(Collections.singleton(TOPIC));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record);
                }
            }
        } catch (Exception e) {
            System.out.println("consumer error : " + e);
        } finally {
            consumer.close();
        }
    }

    /**
     * 生产者示例
     *
     * @date 2023/04/17 5:52 下午
     */
    public void send() {
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, 0, "zhurunhua", "测试消息");
        producer.send(record, (recordMetadata, e) -> {
            if (Objects.nonNull(e)) {
                System.out.println("send error: " + e);
            } else {
                System.out.println(recordMetadata);
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79

Tips

  1. SecurityProtocol定义在枚举:org.apache.kafka.common.security.auth.SecurityProtocol中:

  1. sasl.mechanism配置项有:
    1. GSSAPI
    2. PLAIN
    3. SCRAM-SHA-256
    4. SCRAM-SHA-512
    5. OAUTHBEARER

更多安全和认证相关资料,参考:https://kafka.apache.org/documentation/#security_overview

  1. 若使用ssl,相关配置示例:

    spring.kafka.ssl.trust-store-type=JKS
    spring.kafka.ssl.key-store-type=JKS
    spring.kafka.ssl.protocol=SSL
    spring.kafka.ssl.key-store-password=rxu4G5kPyAqTkERk
    spring.kafka.ssl.trust-store-password=rxu4G5kPyAqTkERk
    spring.kafka.ssl.key-store-location=/kafka-jks/keystore.jks
    spring.kafka.ssl.trust-store-location=/kafka-jks/keystore.jks
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  2. 使用不同的sasl.mechanism时,需要注意sasl.jaas.config的配置中,LoginModule不同:

    1. PLAIN对应的是:PlainLoginModule
    2. SCRAM相关对应的是:ScramLoginModule
  3. kafka认证的参数相对来说比较复杂,需要理解每个参数的含义,错一个就会失败,启动日志会打印客户端配置,可用于协助排查问题

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/840398
推荐阅读
相关标签
  

闽ICP备14008679号