当前位置:   article > 正文

Kafka两种配置文件方式_consumer clientid=consumer-grpofbdmp-1, groupid=gr

consumer clientid=consumer-grpofbdmp-1, groupid=grpofbdmp] bootstrap broker

1.yml配置文件(简单配置)

  1. spring:
  2. kafka:
  3. bootstrap-servers: ip:端口
  4. consumer:
  5. group-id: group-test
  6. enable-auto-commit: true
  7. auto-commit-interval: 1000ms
  8. auto-offset-reset: latest
  9. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  10. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2.类配置方式(使用了SASL/PLAINTEXT安全认证协议)

出现[Consumer clientId=consumer-1, groupId=group1] Bootstrap broker xxx (id: -1 rack: null) disconnected报错信息就需要用这种配置方式,加入安全认证

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.kafka.annotation.EnableKafka;
  4. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  5. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  6. import org.springframework.kafka.core.ConsumerFactory;
  7. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  8. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. /**
  12. * @author 向振华
  13. * @date 2021/05/10 15:58
  14. */
  15. @Configuration
  16. @EnableKafka
  17. public class KafkaConsumerConfig {
  18. @Bean
  19. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  20. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  21. factory.setConsumerFactory(consumerFactory());
  22. factory.setConcurrency(10);
  23. factory.getContainerProperties().setPollTimeout(1500);
  24. return factory;
  25. }
  26. public ConsumerFactory<String, String> consumerFactory() {
  27. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  28. }
  29. public Map<String, Object> consumerConfigs() {
  30. Map<String, Object> propsMap = new HashMap<>(16);
  31. // 服务地址
  32. propsMap.put("bootstrap.servers", "ip:端口");
  33. // 安全认证协议
  34. propsMap.put("security.protocol", "SASL_PLAINTEXT");
  35. propsMap.put("sasl.mechanism", "PLAIN");
  36. // 填充安全认证用户名和密码
  37. propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"usnm\" password=\"pwd\";");
  38. propsMap.put("group.id", "group-test");
  39. propsMap.put("enable.auto.commit", true);
  40. propsMap.put("auto.commit.interval.ms", 1000);
  41. // latest: 从最新的偏移量开始消费
  42. propsMap.put("auto.offset.reset", "latest");
  43. // 反序列化方式
  44. propsMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  45. propsMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  46. return propsMap;
  47. }
  48. }

再加一个配置文件sasl.jaas.config到resource

  1. KafkaServer {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="kafkaadmin"
  4. password="kafkaadminpwd"
  5. user_kafkaadmin="kafkaadminpwd"
  6. user_kafkaclient1="kafkaclient1pwd"
  7. user_kafkaclient2="kafkaclient2pwd";
  8. };

使用:

  1. @Slf4j
  2. @Component
  3. public class KafkaConsumer {
  4. @KafkaListener(topics = {"xxx"})
  5. public void listener(ConsumerRecord<?, ?> record) {
  6. log.info("收到消息 ---> " + record.value().toString());
  7. }
  8. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/271381
推荐阅读
  

闽ICP备14008679号