赞
踩
在分布式服务的架构演进中,消息队列作为核心组件之一,承载着解耦、异步、削峰填谷等关键职责。Apache Kafka 作为业界广泛使用的分布式流处理平台,因其高吞吐、低延迟的特性被大量应用在各类大数据场景中。然而,随着业务的复杂度不断提升,如何在 SpringBoot 中高效地整合并管理多个 Kafka 数据源,成为了一个值得探讨的问题。
在过去的一段时间里,我们通过系列文章详细阐述了如何在 SpringBoot 中以零代码或极低的代码侵入方式,实现多 Kafka 数据源的整合。从基础的配置到高级特性如 protobuf 支持、Aware 模式以及亿级消息生产者的优化,我们希望通过这些内容帮助开发者更加便捷地应对复杂的业务场景。
今天,我们将这些内容凝练成一个全新的 SpringBoot 插件——MultiKafkaStarter,旨在进一步降低开发者整合多 Kafka 数据源的门槛,提升系统的可维护性和扩展性。
核心特点
国籍惯例,先上源码:Github源码
SpringBoot 零代码方式整合多个kafka数据源,支持任意kafka集群,已封装为一个小模块,集成所有kafka配置,让注意力重新回归业务本身。
1、引入最新依赖包,如果找不到依赖包,请到工程目录mvn clean package install
执行一下命令。
<dependency>
<groupId>io.github.vipjoey</groupId>
<artifactId>multi-kafka-starter</artifactId>
<version>2.2</version>
</dependency>
2、添加kafka地址等相关配置。
## json消息生产者
spring.kafka.four.enabled=true
spring.kafka.four.producer.count=1 ## 生产者数量,默认为1个
spring.kafka.four.producer.name=fourKafkaSender ## 设置bean的名称,方便后续引用。如果没有设置,默认值为xxxKafkaSender
spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers} ## 必须设置
spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
## pb 消息生产者
spring.kafka.five.enabled=true
spring.kafka.five.producer.name=fiveKafkaSender
spring.kafka.five.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.five.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.five.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
3、根据名称注入生产者MmcKafkaMultiSender
,就可以发送kafka消息。
@Resource(name = "fourKafkaSender") private MmcKafkaMultiSender mmcKafkaMultiSender; @Resource(name = "fiveKafkaSender") private MmcKafkaMultiSender mmcKafkaMultiSender; @Resource private MmcKafkaOutputContainer mmcKafkaOutputContainer; // 方式一 void produceMessage() { for (int i = 0; i < 10; i++) { DemoAwareMsg msg = new DemoAwareMsg(); msg.setRoutekey("routekey" + i); msg.setName("name" + i); msg.setTimestamp(System.currentTimeMillis()); String json = JsonUtil.toJsonStr(msg); mmcKafkaMultiSender.sendStringMessage(topicOne, "aaa", json); } } // 方式二 void produceMessage() { MmcKafkaSender sender = mmcKafkaOutputContainer.getOutputs().get("xxxKafkaSender"); sender.sendStringMessage(topic, sku.getRoutekey(), message); }
2、添加kafka地址等相关配置。
## topic1的kafka配置 spring.kafka.one.enabled=true spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers} spring.kafka.one.topic=mmc-topic-one spring.kafka.one.group-id=group-consumer-one spring.kafka.one.processor=你的处理类bean名称(例如:oneProcessor) spring.kafka.one.dupicate=true ## 如果为true表示对批次内的kafka消息去重,需要实现MmcKafkaMsg接口,默认为false spring.kafka.one.consumer.auto-offset-reset=latest spring.kafka.one.consumer.max-poll-records=10 spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer ## topic2的kafka配置 spring.kafka.two.enabled=true spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers} spring.kafka.two.topic=mmc-topic-two spring.kafka.two.group-id=group-consumer-two spring.kafka.two.processor=你的处理类bean名称 spring.kafka.two.consumer.auto-offset-reset=latest spring.kafka.two.consumer.max-poll-records=10 spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer ## protobuf类型的消息的kafka配置 spring.kafka.pb.enabled=true spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers} spring.kafka.pb.topic=mmc-topic-pb spring.kafka.pb.group-id=group-consumer-pb spring.kafka.pb.processor=pbProcessor spring.kafka.pb.consumer.auto-offset-reset=latest spring.kafka.pb.consumer.max-poll-records=10 spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
3、新建kafka消息对应的实体类,可以选择实现MmcMsgDistinctAware
接口,例如
@Data class DemoMsg implements MmcMsgDistinctAware { private String routekey; private String name; private Long timestamp; } 如果你配置了spring.kafka.xxx.duplicate=fale,则不需要实现MmcMsgDistinctAware接口。 PS:如果实现MmcMsgDistinctAware接口,就自动具备了消息去重能力
4、新建kafka消息处理类,要求继承MmcKafkaKafkaAbastrctProcessor
,然后就可以愉快地编写你的业务逻辑了。
@Slf4j @Service("oneProcessor") // 你的处理类bean名称,如果没有定义bean名称,那么默认就是首字母缩写的类名称 public class OneProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> { @Override protected void dealMessage(List<DemoMsg> datas) { // 下面开始编写你的业务代码 } } @Slf4j @Service("pbProcessor") public class PbProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> { @Override protected Stream<DemoMsg> doParseProtobuf(byte[] record) { try { DemoPb.PbMsg msg = DemoPb.PbMsg.parseFrom(record); DemoMsg demo = new DemoMsg(); BeanUtils.copyProperties(msg, demo); return Stream.of(demo); } catch (InvalidProtocolBufferException e) { log.error("parssPbError", e); return Stream.empty(); } } @Override protected void dealMessage(List<DemoMsg> datas) { System.out.println("PBdatas: " + datas); } }
1、支持单次拉取kafka的batch消息里去重,需要实现MmcMsgDistinctAware
的getRoutekey()和getTimestamp()方法;如果为false,则不要实现MmcMsgDistinctAware
接口。
spring.kafka.xxx.duplicate=true
2、支持字符串kafka消息,json是驼峰或者下划线
# 默认为支持驼峰的kafka消息,为ture则支持下划线的消息
spring.kafka.xxx.snakeCase=false
3、支持pb的kafka消息,需要自行重写父类的doParseProtobuf
方法;
@Override protected Stream<DemoMsg> doParseProtobuf(byte[] record) { try { DemoMsg msg = new DemoMsg(); DemoPb.PbMsg pb = DemoPb.PbMsg.parseFrom(record); BeanUtils.copyProperties(pb, msg); return Stream.of(msg); } catch (InvalidProtocolBufferException e) { log.error("doParseProtobuf error: {}", e.getMessage()); return Stream.empty(); } }
4、支持获取kafka的topic、offset属性,注入到实体类中,需要实现MmcMsgKafkaAware
接口
@Data class DemoAwareMsg implements MmcKafkaAware { private String routekey; private String name; private Long timestamp; private String topic; private long offset; }
加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。