当前位置:   article > 正文

基于spring多Kafka实例配置_配置多个kafka sasl.jaas.config

配置多个kafka sasl.jaas.config

1、多实例配置原理和单实例配置的区别

项目中如果使用了不同实例的kafka就需要多配置,单个实例的话使用springboot的yaml配置自动装配即可。

1.1 单实例

spring:
  kafka:
    bootstrap-servers: server
    consumer:
      group-id: 消费者你的groupId
      enable-auto-commit: false
      auto-offset-reset: earliest
      #用于链接带密码的kafka  配置,如果kafka没有密码需要注释掉
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="用户" password="密码";
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #根据需要配置生产者还是消费者
	#producer:
    listener:
      ack-mode: manual
      concurrency: 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

1.2 多kafka实例

  • yaml配置
spring:
  kafka:
    bootstrap-servers: server
    consumer:
      group-id: 消费者你的groupId
      enable-auto-commit: false
      auto-offset-reset: earliest
      #用于链接带密码的kafka  配置,如果kafka没有密码需要注释掉
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="用户" password="密码";
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #根据需要配置生产者还是消费者
	#producer:
    listener:
      ack-mode: manual
      concurrency: 1

  kafka1:
    bootstrap-servers: server1
    consumer:
      group-id: 消费者你的groupId
      enable-auto-commit: false
      auto-offset-reset: earliest
      #用于链接带密码的kafka  配置,如果kafka没有密码需要注释掉
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="server1的用户" password="密码";
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #根据需要配置生产者还是消费者
	#producer:
    listener:
      ack-mode: manual
      concurrency: 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

此处第一个kafka会被spring自动装配,所以只需要对kafka2进行配置即可。

  • Java配置

注意最好手动指定数据源factory的bean名称,在消费者端会用到。

package com.cmbchina.archman.mprogramcore.config;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.util.StringUtils;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfiguration {
    @Value("${spring.kafka2.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka2.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka2.consumer.properties.security.protocol}")
    private String kafkaSecurityProtocol;
    @Value("${spring.kafka2.consumer.properties.sasl.mechanism}")
    private String kafkaSASLMechanism;
    @Value("${spring.kafka2.consumer.properties.sasl.jaas.config}")
    private String kafkaConsumerSASLJaasConfig;
	
	//此处最好手动指定数据源factory的bean名称,在消费者端会用到
    @Bean("myKafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<?> batchFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new
                ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setBatchListener(true); // 开启批量监听
        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> config = new HashMap<>();
        //kafka地址
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //组id
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        if (!StringUtils.isEmpty(kafkaSecurityProtocol) && !StringUtils.isEmpty(kafkaSASLMechanism)
                && !StringUtils.isEmpty(kafkaConsumerSASLJaasConfig)) {
            config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
            config.put(SaslConfigs.SASL_MECHANISM, kafkaSASLMechanism);
            config.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
        }
        return config;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

  • 消费者监听

使用@KafkaListener注解即可完成消费者监听,此处声明刚才定义的factory名称即可。

@KafkaListener(topics = {CREATE_MPRO_TOPIC}, containerFactory = "myKafkaListenerContainerFactory")
public void createMp(ConsumerRecord msg, Acknowledgment ack){
    try{
        Optional<?> value = Optional.ofNullable(msg.value());
        log.info("接收消息:"+ msg);
        if (value.isPresent()){
           //业务代码
        }
    }catch (Exception e){
        log.error("消息异常:" + e);
    }
    finally{
		//签收消息
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/419819
推荐阅读
相关标签
  

闽ICP备14008679号