赞
踩
项目中如果使用了不同实例的kafka就需要多配置,单个实例的话使用springboot的yaml配置自动装配即可。
spring: kafka: bootstrap-servers: server consumer: group-id: 消费者你的groupId enable-auto-commit: false auto-offset-reset: earliest #用于链接带密码的kafka 配置,如果kafka没有密码需要注释掉 properties: sasl.mechanism: PLAIN security.protocol: SASL_PLAINTEXT sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="用户" password="密码"; key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #根据需要配置生产者还是消费者 #producer: listener: ack-mode: manual concurrency: 1
spring: kafka: bootstrap-servers: server consumer: group-id: 消费者你的groupId enable-auto-commit: false auto-offset-reset: earliest #用于链接带密码的kafka 配置,如果kafka没有密码需要注释掉 properties: sasl.mechanism: PLAIN security.protocol: SASL_PLAINTEXT sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="用户" password="密码"; key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #根据需要配置生产者还是消费者 #producer: listener: ack-mode: manual concurrency: 1 kafka1: bootstrap-servers: server1 consumer: group-id: 消费者你的groupId enable-auto-commit: false auto-offset-reset: earliest #用于链接带密码的kafka 配置,如果kafka没有密码需要注释掉 properties: sasl.mechanism: PLAIN security.protocol: SASL_PLAINTEXT sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="server1的用户" password="密码"; key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #根据需要配置生产者还是消费者 #producer: listener: ack-mode: manual concurrency: 1
此处第一个kafka会被spring自动装配,所以只需要对kafka2进行配置即可。
注意最好手动指定数据源factory的bean名称,在消费者端会用到。
package com.cmbchina.archman.mprogramcore.config; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.util.StringUtils; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConfiguration { @Value("${spring.kafka2.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka2.consumer.group-id}") private String groupId; @Value("${spring.kafka2.consumer.properties.security.protocol}") private String kafkaSecurityProtocol; @Value("${spring.kafka2.consumer.properties.sasl.mechanism}") private String kafkaSASLMechanism; @Value("${spring.kafka2.consumer.properties.sasl.jaas.config}") private String kafkaConsumerSASLJaasConfig; //此处最好手动指定数据源factory的bean名称,在消费者端会用到 @Bean("myKafkaListenerContainerFactory") public KafkaListenerContainerFactory<?> batchFactory(){ ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); factory.setBatchListener(true); // 开启批量监听 return factory; } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> config = new HashMap<>(); //kafka地址 config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //组id config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); if (!StringUtils.isEmpty(kafkaSecurityProtocol) && !StringUtils.isEmpty(kafkaSASLMechanism) && !StringUtils.isEmpty(kafkaConsumerSASLJaasConfig)) { config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol); config.put(SaslConfigs.SASL_MECHANISM, kafkaSASLMechanism); config.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig); } return config; } }
使用@KafkaListener注解即可完成消费者监听,此处声明刚才定义的factory名称即可。
@KafkaListener(topics = {CREATE_MPRO_TOPIC}, containerFactory = "myKafkaListenerContainerFactory")
public void createMp(ConsumerRecord msg, Acknowledgment ack){
try{
Optional<?> value = Optional.ofNullable(msg.value());
log.info("接收消息:"+ msg);
if (value.isPresent()){
//业务代码
}
}catch (Exception e){
log.error("消息异常:" + e);
}
finally{
//签收消息
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。