赞
踩
1.yml配置文件(简单配置)
- spring:
- kafka:
- bootstrap-servers: ip:端口
- consumer:
- group-id: group-test
- enable-auto-commit: true
- auto-commit-interval: 1000ms
- auto-offset-reset: latest
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2.类配置方式(使用了SASL/PLAINTEXT安全认证协议)
出现[Consumer clientId=consumer-1, groupId=group1] Bootstrap broker xxx (id: -1 rack: null) disconnected报错信息就需要用这种配置方式,加入安全认证
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.annotation.EnableKafka;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.config.KafkaListenerContainerFactory;
- import org.springframework.kafka.core.ConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * @author 向振华
- * @date 2021/05/10 15:58
- */
- @Configuration
- @EnableKafka
- public class KafkaConsumerConfig {
-
- @Bean
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- factory.setConcurrency(10);
- factory.getContainerProperties().setPollTimeout(1500);
- return factory;
- }
-
- public ConsumerFactory<String, String> consumerFactory() {
- return new DefaultKafkaConsumerFactory<>(consumerConfigs());
- }
-
- public Map<String, Object> consumerConfigs() {
- Map<String, Object> propsMap = new HashMap<>(16);
- // 服务地址
- propsMap.put("bootstrap.servers", "ip:端口");
- // 安全认证协议
- propsMap.put("security.protocol", "SASL_PLAINTEXT");
- propsMap.put("sasl.mechanism", "PLAIN");
- // 填充安全认证用户名和密码
- propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"usnm\" password=\"pwd\";");
- propsMap.put("group.id", "group-test");
- propsMap.put("enable.auto.commit", true);
- propsMap.put("auto.commit.interval.ms", 1000);
- // latest: 从最新的偏移量开始消费
- propsMap.put("auto.offset.reset", "latest");
- // 反序列化方式
- propsMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- propsMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- return propsMap;
- }
- }
再加一个配置文件sasl.jaas.config到resource
- KafkaServer {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="kafkaadmin"
- password="kafkaadminpwd"
- user_kafkaadmin="kafkaadminpwd"
- user_kafkaclient1="kafkaclient1pwd"
- user_kafkaclient2="kafkaclient2pwd";
- };
使用:
- @Slf4j
- @Component
- public class KafkaConsumer {
-
- @KafkaListener(topics = {"xxx"})
- public void listener(ConsumerRecord<?, ?> record) {
- log.info("收到消息 ---> " + record.value().toString());
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。