赞
踩
在日常开发当中,经常会遇到需要消费的topic不在同一个kafka集群内,这时候需要配置多个数据源,如何优雅地完成这个配置呢?
可以参考最新版本:最新版本
接前文,我们已经在项目里集成了kafka。本文将介绍如何优雅地整合多个kafka数据源。系列文章完整目录
1、使用早期文章快速创建项目。
《搭建大型分布式服务(十二)Docker搭建开发环境安装Kafka和zookeeper》
《搭建大型分布式服务(十三)SpringBoot整合kafka》
《搭建大型分布式服务(十八)Maven自定义项目脚手架》
2、创建Ticket项目
mvn archetype:generate -DgroupId="com.mmc.lesson" -DartifactId="ticket" -Dversion=1.0-SNAPSHOT -Dpackage="com.mmc.lesson" -DarchetypeArtifactId=member-archetype -DarchetypeGroupId=com.mmc.lesson -DarchetypeVersion=1.0.0-SNAPSHOT -B
1、编写KafkaPropertiesConfiguration.java,用来接收两个kafka配置。
@Configuration public class KafkaPropertiesConfiguration { /** * one的kafka配置. */ @Bean("oneKafkaProperties") @ConfigurationProperties("spring.one.kafka") public CustomKafkaProperties oneKafkaProperties() { return new CustomKafkaProperties(); } /** * two的kafka配置. */ @Bean("twoKafkaProperties") @ConfigurationProperties("spring.two.kafka") public CustomKafkaProperties twoKafkaProperties() { return new CustomKafkaProperties(); } @Data public static class CustomKafkaProperties { private final Consumer consumer = new Consumer(); /** * Create an initial map of consumer properties from the state of this instance. * <p> * This allows you to add additional properties, if necessary, and override the * default kafkaConsumerFactory bean. * * @return the consumer properties initialized with the customizations defined on this * instance */ public Map<String, Object> buildConsumerProperties() { return new HashMap<>(this.consumer.buildProperties()); } } }
2、编写KafkaConsumerConfiguration.java,定义两个消费工厂。
@Configuration @AutoConfigureAfter(KafkaPropertiesConfiguration.class) public class KafkaConsumerConfiguration { @Value("${spring.kafka.listener.type:batch}") private String listenerType; /** * 消费one数据. */ @Bean("oneContainerFactory") public ConcurrentKafkaListenerContainerFactory<Object, Object> oneContainerFactory( @Qualifier("oneKafkaProperties") CustomKafkaProperties kafkaProperties) { ConcurrentKafkaListenerContainerFactory<Object, Object> container = new ConcurrentKafkaListenerContainerFactory<>(); container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties())); // 设置并发量,小于或等于Topic的分区数 // container.setConcurrency(5); // 设置为批量监听 container.setBatchListener("batch".equalsIgnoreCase(listenerType)); return container; } /** * 消费two数据. */ @Bean("twoContainerFactory") public ConcurrentKafkaListenerContainerFactory<Object, Object> twoContainerFactory( @Qualifier("twoKafkaProperties") CustomKafkaProperties kafkaProperties) { ConcurrentKafkaListenerContainerFactory<Object, Object> container = new ConcurrentKafkaListenerContainerFactory<>(); container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties())); // 设置并发量,小于或等于Topic的分区数 // container.setConcurrency(5); // 设置为批量监听 container.setBatchListener("batch".equalsIgnoreCase(listenerType)); return container; } }
3、编写KafkaReceiver.java,消费两个topic的数据。
@Service @Slf4j public class KafkaReceiver { @KafkaListener(id = "kafka-one-demo", topics = Const.KAFKA_ONE_DEMO_TOPIC, groupId = "oneGroup", containerFactory = "oneContainerFactory") public void receiveOne(ConsumerRecord<String, String> record) { if (null == record || !StringUtils.hasText(record.value())) { log.warn("KafkaReceiver record is null or record.value is empty."); return; } String reqJson = record.value(); log.info("one KafkaReceiver {}", reqJson); } @KafkaListener(id = "kafka-two-demo", topics = Const.KAFKA_TWO_DEMO_TOPIC, groupId = "twoGroup", containerFactory = "twoContainerFactory") public void receiveTwo(ConsumerRecord<String, String> record) { if (null == record || !StringUtils.hasText(record.value())) { log.warn("KafkaReceiver record is null or record.value is empty."); return; } String reqJson = record.value(); log.info("two KafkaReceiver {}", reqJson); } }
4、修改application-dev.properties 增加双kafka配置,其它环境同理(可以放到Apollo托管)。
#################### KAFKA #################### ## 以下为消费者配置 spring.kafka.listener.type=single spring.kafka.listener.missing-topics-fatal=false spring.one.kafka.consumer.bootstrapServers=127.0.0.1:9092 spring.one.kafka.consumer.auto-offset-reset=latest spring.one.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.one.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.one.kafka.consumer.max-poll-records=50 spring.two.kafka.consumer.bootstrapServers=127.0.0.1:9092 spring.two.kafka.consumer.auto-offset-reset=latest spring.two.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.two.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.two.kafka.consumer.max-poll-records=50 ## 以下为单元测试生产者配置 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.buffer-memory=524288 spring.kafka.producer.batch-size=65536 spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
1、修改并运行单元测试
@Slf4j @ActiveProfiles("dev") @ExtendWith(SpringExtension.class) @SpringBootTest class KafkaSenderTest { @Resource private KafkaSender kafkaSender; @Test void sendMessage() throws IOException { String json = "hello"; for (int i = 0; i < 1; i++) { kafkaSender.sendMessage(Const.KAFKA_ONE_DEMO_TOPIC, json); kafkaSender.sendMessage(Const.KAFKA_TWO_DEMO_TOPIC, json); } System.in.read(); } }
2、测试通过。
[2021-08-19 17:09:02.203] [kafka-two-demo-0-C-1] [INFO] [o.s.kafka.listener.KafkaMessageListenerContainer:292] - twoGroup: partitions assigned: [kafka-single-demo-topic-0]
[2021-08-19 17:09:02.203] [kafka-one-demo-0-C-1] [INFO] [o.s.kafka.listener.KafkaMessageListenerContainer:292] - oneGroup: partitions assigned: [kafka-single-demo-topic-0]
[2021-08-19 17:09:09.721] [kafka-two-demo-0-C-1] [INFO] [com.mmc.lesson.kafka.KafkaReceiver:?] - two KafkaReceiver hello
[2021-08-19 17:09:12.736] [kafka-one-demo-0-C-1] [INFO] [com.mmc.lesson.kafka.KafkaReceiver:?] - one KafkaReceiver hello
至此,我们就优雅地整合多个kafka消费者数据源,小伙伴们可以发挥自己的动手能力,配置多个生产者哦。下一篇《搭建大型分布式服务(二十三)SpringBoot 如何整合比GuavaCache性能好n倍的Caffeine并根据名称设置不同的失效时间?》
加我加群一起交流学习!更多干货下载和大厂内推等着你
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。