当前位置:   article > 正文

笔记:配置多个kafka生产者和消费者_kafka配置多个消费组groupid

kafka配置多个消费组groupid

如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为spring.kafka.xxx;

本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。

适用场景:需要消费来源于不同kafka的消息、需要在不同的kafka生产消息。

1、配置自定义Kafka Properties信息

  1. custom.kafka.test.bootstrap-servers = my-server1,my-server2
  2. custom.kafka.test.consumer.group-id = my-consumer
  3. custom.kafka.test.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer
  4. custom.kafka.test.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
  5. custom.kafka.test.consumer.enable-auto-commit = false
  6. custom.kafka.test.consumer.auto-offset-reset = latest
  7. custom.kafka.test.producer.key-serializer = org.apache.kafka.common.serialization.LongSerializer
  8. custom.kafka.test.producer.value-serializer = org.springframework.kafka.support.serializer.JsonSerializer
  9. custom.kafka.test.listener.ack-mode = manual

2、代码定义生产者和消费者

当然也可以只定义生产者或者只定义消费者,按需进行,以下示例是同时定义生产者和消费者。

  1. import org.apache.commons.lang.StringUtils;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.springframework.beans.factory.ObjectProvider;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.beans.factory.annotation.Qualifier;
  6. import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
  7. import org.springframework.boot.context.properties.ConfigurationProperties;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import org.springframework.context.annotation.Primary;
  11. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  12. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  13. import org.springframework.kafka.core.ConsumerFactory;
  14. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  15. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  16. import org.springframework.kafka.core.KafkaTemplate;
  17. import org.springframework.kafka.core.ProducerFactory;
  18. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  19. import org.springframework.kafka.listener.ContainerProperties;
  20. import org.springframework.kafka.support.ProducerListener;
  21. import org.springframework.kafka.support.converter.RecordMessageConverter;
  22. import org.springframework.util.ObjectUtils;
  23. import java.util.Map;
  24. /**
  25. * 自定义kafka配置
  26. */
  27. @Configuration
  28. public class CustomKafkaConfig {
  29. /** 生产者 */
  30. @Bean("kafkaCustomTemplate")
  31. public KafkaTemplate<String, Object> kafkaCustomTemplate(
  32. @Autowired @Qualifier("customKafkaProperties") KafkaProperties customKafkaProperties) {
  33. return new KafkaTemplate<>(producerFactory(customKafkaProperties));
  34. }
  35. private ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
  36. kafkaProperties.getProducer().setAcks(StringUtils.isBlank(kafkaProperties.getProducer().getAcks()) ? "all" : kafkaProperties.getProducer().getAcks());
  37. Map<String, Object> properties = kafkaProperties.buildProducerProperties();
  38. properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
  39. return new DefaultKafkaProducerFactory<>(properties);
  40. }
  41. /** 消费者 */
  42. @Bean("kafkaCustomContainerFactory")
  43. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Long, String>> kafkaCustomContainerFactory(
  44. @Autowired @Qualifier("customKafkaProperties") KafkaProperties customKafkaProperties) {
  45. ConcurrentKafkaListenerContainerFactory<Long, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  46. factory.setConsumerFactory(consumerFactory(customKafkaProperties));
  47. factory.setConcurrency(ObjectUtils.isEmpty(customKafkaProperties.getListener().getConcurrency()) ?
  48. Runtime.getRuntime().availableProcessors() : customKafkaProperties.getListener().getConcurrency());
  49. factory.getContainerProperties().setAckMode(ObjectUtils.isEmpty(customKafkaProperties.getListener().getAckMode()) ?
  50. ContainerProperties.AckMode.MANUAL : customKafkaProperties.getListener().getAckMode());
  51. return factory;
  52. }
  53. private ConsumerFactory<Long, String> consumerFactory(KafkaProperties kafkaProperties) {
  54. return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
  55. }
  56. /** 配置文件*/
  57. @ConfigurationProperties(prefix = "custom.kafka.test")
  58. @Bean("customKafkaProperties")
  59. public KafkaProperties customKafkaProperties() {
  60. return new KafkaProperties();
  61. }
  62. // @Primary 要指定一下,不然启动会因为存在多个KafkaProperties,而导致kafka的自动装配不懂要选哪个而报错
  63. @Primary
  64. @ConfigurationProperties(prefix = "spring.kafka")
  65. @Bean
  66. public KafkaProperties kafkaProperties() {
  67. return new KafkaProperties();
  68. }
  69. @Primary
  70. @Bean
  71. public KafkaTemplate<?, ?> kafkaTemplate(@Autowired ProducerFactory<?, ?> kafkaProducerFactory, @Autowired KafkaProperties kafkaProperties,
  72. ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {
  73. KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
  74. messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
  75. kafkaTemplate.setProducerListener(kafkaProducerListener);
  76. kafkaTemplate.setDefaultTopic(kafkaProperties.getTemplate().getDefaultTopic());
  77. return kafkaTemplate;
  78. }
  79. }

Tips:

1)消费者

     需要实现自定义的KafkaListenerContainerFactory Bean

2)生产者

     需要实现自定义的KafkaTemplate Bean

3)@Primary

     @Autowired注解默认是根据类型Type来自动注入的,当有多个相同类型的bean时,使用@Primary来赋予bean更高的优先级。

3、应用

1)消费者

  1. @Component
  2. @Slf4j
  3. public class TestKafkaListener {
  4. @KafkaListener(
  5. topics = {"myTestTopic"},
  6. containerFactory = "kafkaCustomContainerFactory")
  7. public void testReceive(ConsumerRecord<Long, String> record, Acknowledgment ack) {
  8. // 业务代码 start
  9. // ...
  10. // 业务代码 end
  11. ack.acknowledge();
  12. }
  13. }

2)生产者

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.cloud.context.config.annotation.RefreshScope;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. import org.springframework.stereotype.Service;
  7. @RefreshScope
  8. @Service
  9. @Transactional
  10. @Slf4j
  11. public class TestKafkaService implements TestKafkaServiceI {
  12. @Qualifier("kafkaCustomTemplate")
  13. @Autowired
  14. private KafkaTemplate<String, Object> kafkaCustomTemplate;
  15. @Override
  16. public void testSend(String jsonParam) {
  17. // 发送kafka消息
  18. TestKafkaEvent<String> event = new TestKafkaEvent<>(jsonParam);
  19. try {
  20. kafkaCustomTemplate.send(event.getProducerRecord());
  21. }
  22. catch (Exception e) {
  23. throw new RuntimeException("发送消息失败");
  24. }
  25. }
  26. }
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import java.io.Serializable;
  4. @Slf4j
  5. public class TestKafkaEvent<T extends Serializable> {
  6. private T source;
  7. @Override
  8. public ProducerRecord<String, Object> getProducerRecord() {
  9. log.info("发送消息: {}", getSource());
  10. return new ProducerRecord<>("my-tes-topic", getSource());
  11. }
  12. private TestKafkaEvent(){}
  13. public TestKafkaEvent(T source) {
  14. this.source = source;
  15. }
  16. public T getSource() {
  17. return this.source;
  18. }
  19. public void setSource(T source) {
  20. this.source = source;
  21. }
  22. }

参考:聊聊在springboot项目中如何配置多个kafka消费者

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号