赞
踩
如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为spring.kafka.xxx;
本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。
适用场景:需要消费来源于不同kafka的消息、需要在不同的kafka生产消息。
- custom.kafka.test.bootstrap-servers = my-server1,my-server2
- custom.kafka.test.consumer.group-id = my-consumer
- custom.kafka.test.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer
- custom.kafka.test.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
- custom.kafka.test.consumer.enable-auto-commit = false
- custom.kafka.test.consumer.auto-offset-reset = latest
- custom.kafka.test.producer.key-serializer = org.apache.kafka.common.serialization.LongSerializer
- custom.kafka.test.producer.value-serializer = org.springframework.kafka.support.serializer.JsonSerializer
- custom.kafka.test.listener.ack-mode = manual
当然也可以只定义生产者或者只定义消费者,按需进行,以下示例是同时定义生产者和消费者。
- import org.apache.commons.lang.StringUtils;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.springframework.beans.factory.ObjectProvider;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Primary;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.config.KafkaListenerContainerFactory;
- import org.springframework.kafka.core.ConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaProducerFactory;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.core.ProducerFactory;
- import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
- import org.springframework.kafka.listener.ContainerProperties;
- import org.springframework.kafka.support.ProducerListener;
- import org.springframework.kafka.support.converter.RecordMessageConverter;
- import org.springframework.util.ObjectUtils;
-
- import java.util.Map;
-
- /**
- * 自定义kafka配置
- */
- @Configuration
- public class CustomKafkaConfig {
-
- /** 生产者 */
- @Bean("kafkaCustomTemplate")
- public KafkaTemplate<String, Object> kafkaCustomTemplate(
- @Autowired @Qualifier("customKafkaProperties") KafkaProperties customKafkaProperties) {
- return new KafkaTemplate<>(producerFactory(customKafkaProperties));
- }
-
- private ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
- kafkaProperties.getProducer().setAcks(StringUtils.isBlank(kafkaProperties.getProducer().getAcks()) ? "all" : kafkaProperties.getProducer().getAcks());
- Map<String, Object> properties = kafkaProperties.buildProducerProperties();
- properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
- return new DefaultKafkaProducerFactory<>(properties);
- }
-
- /** 消费者 */
- @Bean("kafkaCustomContainerFactory")
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Long, String>> kafkaCustomContainerFactory(
- @Autowired @Qualifier("customKafkaProperties") KafkaProperties customKafkaProperties) {
- ConcurrentKafkaListenerContainerFactory<Long, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory(customKafkaProperties));
- factory.setConcurrency(ObjectUtils.isEmpty(customKafkaProperties.getListener().getConcurrency()) ?
- Runtime.getRuntime().availableProcessors() : customKafkaProperties.getListener().getConcurrency());
- factory.getContainerProperties().setAckMode(ObjectUtils.isEmpty(customKafkaProperties.getListener().getAckMode()) ?
- ContainerProperties.AckMode.MANUAL : customKafkaProperties.getListener().getAckMode());
- return factory;
- }
-
- private ConsumerFactory<Long, String> consumerFactory(KafkaProperties kafkaProperties) {
- return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
- }
-
- /** 配置文件*/
- @ConfigurationProperties(prefix = "custom.kafka.test")
- @Bean("customKafkaProperties")
- public KafkaProperties customKafkaProperties() {
- return new KafkaProperties();
- }
-
- // @Primary 要指定一下,不然启动会因为存在多个KafkaProperties,而导致kafka的自动装配不懂要选哪个而报错
- @Primary
- @ConfigurationProperties(prefix = "spring.kafka")
- @Bean
- public KafkaProperties kafkaProperties() {
- return new KafkaProperties();
- }
-
- @Primary
- @Bean
- public KafkaTemplate<?, ?> kafkaTemplate(@Autowired ProducerFactory<?, ?> kafkaProducerFactory, @Autowired KafkaProperties kafkaProperties,
- ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {
- KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
- messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
- kafkaTemplate.setProducerListener(kafkaProducerListener);
- kafkaTemplate.setDefaultTopic(kafkaProperties.getTemplate().getDefaultTopic());
- return kafkaTemplate;
- }
-
- }

Tips:
1)消费者
需要实现自定义的KafkaListenerContainerFactory Bean
2)生产者
需要实现自定义的KafkaTemplate Bean
3)@Primary
@Autowired注解默认是根据类型Type来自动注入的,当有多个相同类型的bean时,使用@Primary来赋予bean更高的优先级。
1)消费者
- @Component
- @Slf4j
- public class TestKafkaListener {
-
- @KafkaListener(
- topics = {"myTestTopic"},
- containerFactory = "kafkaCustomContainerFactory")
- public void testReceive(ConsumerRecord<Long, String> record, Acknowledgment ack) {
- // 业务代码 start
- // ...
- // 业务代码 end
- ack.acknowledge();
- }
-
- }
2)生产者
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.cloud.context.config.annotation.RefreshScope;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Service;
-
-
- @RefreshScope
- @Service
- @Transactional
- @Slf4j
- public class TestKafkaService implements TestKafkaServiceI {
-
-
- @Qualifier("kafkaCustomTemplate")
- @Autowired
- private KafkaTemplate<String, Object> kafkaCustomTemplate;
-
- @Override
- public void testSend(String jsonParam) {
- // 发送kafka消息
- TestKafkaEvent<String> event = new TestKafkaEvent<>(jsonParam);
- try {
- kafkaCustomTemplate.send(event.getProducerRecord());
- }
- catch (Exception e) {
- throw new RuntimeException("发送消息失败");
- }
- }
- }

-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.io.Serializable;
-
- @Slf4j
- public class TestKafkaEvent<T extends Serializable> {
-
- private T source;
-
- @Override
- public ProducerRecord<String, Object> getProducerRecord() {
- log.info("发送消息: {}", getSource());
- return new ProducerRecord<>("my-tes-topic", getSource());
- }
- private TestKafkaEvent(){}
-
- public TestKafkaEvent(T source) {
- this.source = source;
- }
-
- public T getSource() {
- return this.source;
- }
-
- public void setSource(T source) {
- this.source = source;
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。