赞
踩
好久没有写博客了,正好最近在工作的时候,使用 spring-kafka 消费消息时候遇到一个关于批量消息处理的问题,通过阅读 spring-kafka 源码,才理解产生问题的原因,以及解决方法。
最近在开发一个需求中,有一个场景是需要接受第三方公司数据回调,我们系统需要提供一个接口接受和处理数据。这个接口处理的业务逻辑比较复杂,耗时比较长,为了不让调用方等待接口调用过长时间,考虑采用将数据发送到 kafka,然后异步监听消费这个数据。这样接口只负责将数据发送到kafka,然后就迅速返回响应。
这里我们使用的是 spring-kafka 框架,消费者和生产者的 key、value 序列化都是 string 类型。
在代码开发完毕,进行测试的时候发现,当并发的调用接收接口时,会出现 kafka 监听器消费数据不完整的情况。
可以理解为调用一次接口,发送一条数据到 kafka,然后kafka 监听器方法消费一条消息时,会对应创建一条业务数据;调用接口是一次一次的有间隔的调用,即非并发下是没有问题的,消费数据产生的业务数据正常;但是并发调用接口的情况下,业务数据和 kafka 消息数量不一致。
先贴上 kafka 配置以及相关代码(代码和日志都经过了脱敏处理)。
项目使用 springboot 搭建,spring-kafka 的版本是 2.3.9.RELEASE。
application.properties 中的 kafka 配置参数代码
- ##=============== provider 生产者 =======================
- spring.kafka.producer.retries=0
- # 每次批量发送消息的数量
- spring.kafka.producer.batch-size=16384
- spring.kafka.producer.buffer-memory=33554432
- # 指定消息key和消息体的编解码方式
- spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
-
- #=============== consumer 消费者 =======================
- spring.kafka.consumer.group-id=ezr_group
- spring.kafka.consumer.auto-offset-reset=latest
- #是否开启自动提交
- spring.kafka.consumer.enable-auto-commit=true
- #批量消费一次最大拉取的数据量
- spring.kafka.consumer.max-poll-records=100
- #自动提交的间隔时间ms
- spring.kafka.consumer.auto-commit-interval=5000
- #连接超时时间
- spring.kafka.consumer.session-timeout=6000
- #是否开启批量消费,true表示批量消费
- spring.kafka.listener.batch-listener=true
- #设置消费的线程数
- spring.kafka.listener.concurrency=3
- #如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
- #如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
- spring.kafka.listener.poll-timeout=1500
- # 指定消息key和消息体的编解码方式
- spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

kafka 的生产者和消费者的生产和消费类型为批量消费,key 和 value 的序列化与反序列化类型为 String。
kafka 消息消费监听器接口
- @KafkaListener(topics = "${xxx.xxx.xxx}", groupId = "xxx-group")
- public void onXXXMessage(String message) {
- log.info("[onXXXMessage]\t[message:{}]", message);
- // 解析 String 消息为 Bean
- XXXBean xxxBean = JacksonUtil.toObject(message, XXXBean.class);
- // 保存请求对象
- saveXXXBean
- // 其他业务
- doSomething(xxxBean);
- }
JacksonUtil 类
- @Slf4j
- public class JacksonUtil {
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- static {
- //设置转换时的配置,具体视情况配置,如果通过此处配置,使用全局。也可以通过在bean的属性上添加注解配置对应的bean
- // 统一返回数据的输出风格
- OBJECT_MAPPER.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
- // 时区
- OBJECT_MAPPER.setTimeZone(TimeZone.getTimeZone("GMT+8"));
- //序列化的时候序列对象的所有属性
- OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.ALWAYS);
- //反序列化的时候如果多了其他属性,不抛出异常
- OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- //如果是空对象的时候,不抛异常
- OBJECT_MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
- //取消时间的转化格式,默认是时间戳,可以取消,同时需要设置要表现的时间格式
- OBJECT_MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
- OBJECT_MAPPER.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
- // Enum用index输出
- OBJECT_MAPPER.configure(SerializationFeature.WRITE_ENUMS_USING_INDEX, true);
- }
-
- public static <T> T toObject(String json, Class<T> clazz) {
- try {
- T data = OBJECT_MAPPER.readValue(json, clazz);
- return data;
- } catch (IOException e) {
- log.error("[JacksonUtil toObject fail]", e);
- return null;
- }
- }
-
- }

一次一次的调用接口情况下的消息消费的日志内容如下:
[onCouponPushMessage] [message:{"BatchId":"3","CoupGrpId":1200043,"Brand":"LOOK_test","IsDiscountLimit":false,"GenDate":"2020-07-28 20:12:27"}]
并发下情况调用接口情况下的消息消费的日志内容如下:
[onCouponPushMessage] [message:{"BatchId":"1","CoupGrpId":1200043,"Brand":"LOOK_test","IsDiscountLimit":false,"GenDate":"2020-07-28 20:12:25"},{"BatchId":"2","CoupGrpId":1200043,"Brand":"LOOK_test","IsDiscountLimit":false,"GenDate":"2020-07-28 20:12:26"}]
Kafka 的生产者和消费者类型都是批量消费,kafka 监听器接口的入参是 String 类型,两种操作接口的方式,返回的数据不一样的。
通过查看 API 接口请求日志,以及 kafka 消费数据的日志发现,kafka 消费时,并发情况下调用接口情况下,消费数据被合并到一起,用逗号分割连接到一起来了,这样其实是相当于两条数据了,但是经过 JacksonUtil 解析为 XXXBean 之后,生成了一个对象,进行保存,很明显这样就导致到发送的数据与转换的数据不一致的问题。
既然知道了问题,那么我们就可以很容易的解决了。
这里有两种方案。
选择第二种方法解决问题。
上面的问题解决了,但是我们不知道会出现这样的问题,即在消费者批量处理消费的模式下,监听器方法的入参参数类型是 String 的时候,为什么会出现数据通过逗号拼接在一起的问题呢?
要解决这个问题,必须要看源码了!好了废话不多说,我们直接开干,撸源码!
我们其实是要找的代码逻辑,是监听器方法入参的参数值设置相关的代码逻辑。
我们先从 @KafkaListener 这个注解先看起,进入到它的类上,说它是用标记指定的 topic 从而被生成一个 kafka 监听器容器,然后监听器容器就负责拉取 kafka 消息并且转换处理以及绑定消息到监听器指定的目标方法,然后执行目标方法,最后根据结果以及异常情况作相应的处理,包括提交、回滚、重试、异常处理等等操作。
接着我们开始看到如何构建一个监听器容器类的。
KafkaListenerEndpointRegistrar 这个类是一个实现了 InitializingBean 接口的类,它在 KafkaListenerAnnotationBeanPostProcessor 类中以实例变量被创建、设置属性、注册所有的监听器。
它的注册所有监听器方法的时序图如下所示。
- sequenceDiagram
- KafkaListenerEndpointRegistrar->>KafkaListenerEndpointRegistrar:afterPropertiesSet() 初始化 bean
- KafkaListenerEndpointRegistrar->>KafkaListenerEndpointRegistrar:registerAllEndpoints() 注册所有端点
- loop endpointDescriptors 端点描述器
- KafkaListenerEndpointRegistrar->>KafkaListenerEndpointRegistry:registerListenerContainer() 注册监听器容器
- KafkaListenerEndpointRegistry->>KafkaListenerEndpointRegistry:createListenerContainer() 创建监听器容器
- end
- KafkaListenerEndpointRegistrar->>KafkaListenerEndpointRegistrar:this.startImmediately = true 设置立即启动标识
这个类是一个监听器端点注册表,它用来创建监听器容器,同时管理监听器容器的生命周期。
它的创建监听器容器的方法执行时序图如下
- sequenceDiagram
-
- # 创建与注册监听器容器
- KafkaListenerEndpointRegistry->>KafkaListenerEndpointRegistry:createListenerContainer() 创建监听器容器
- KafkaListenerEndpointRegistry->>ConcurrentKafkaListenerContainerFactory:createListenerContainer() 创建监听器容器
- ConcurrentKafkaListenerContainerFactory->>ConcurrentKafkaListenerContainerFactory:createContainerInstance() 创建监听器容器
- ConcurrentKafkaListenerContainerFactory->>ConcurrentMessageListenerContainer:new 创建并发消息监听器容器实例
- ConcurrentMessageListenerContainer-->>ConcurrentKafkaListenerContainerFactory:返回实例
-
- # 设置消息监听器容器
- ConcurrentKafkaListenerContainerFactory->>MethodKafkaListenerEndpoint:setupListenerContainer() 设置监听器容器
-
- ConcurrentKafkaListenerContainerFactory->>ConcurrentKafkaListenerContainerFactory:initializeContainer() 初始化容器
- ConcurrentKafkaListenerContainerFactory->>ConcurrentKafkaListenerContainerFactory:customizeContainer() 自定义配置容器
-
- ConcurrentKafkaListenerContainerFactory-->>KafkaListenerEndpointRegistry:返回批量消息监听器适配对象

这个类是主要用于设置监听器容器的消息转换器。
它的设置监听器容器 setupListenerContainer 方法执行时序如下图。
- sequenceDiagram
- MethodKafkaListenerEndpoint->>MethodKafkaListenerEndpoint:setupMessageListener() 设置消息监听器
-
- # 消息监听器
- MethodKafkaListenerEndpoint->>MethodKafkaListenerEndpoint:createMessageListener() 创建消息监听器
- MethodKafkaListenerEndpoint->>MethodKafkaListenerEndpoint:createMessageListenerInstance() 创建消息监听器适配器
- MethodKafkaListenerEndpoint->>BatchMessagingMessageListenerAdapter:new 创建批量消息监听器、或者非批量消息监听器适配器
- BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:setBatchToRecordAdapter() 设置批量接收消息适配器
- BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:setBatchMessageConverter() 设置批量消息转换器适配器
- BatchMessagingMessageListenerAdapter-->>MethodKafkaListenerEndpoint:返回消息监听器适配器
-
- # 配置监听器处理器适配器
- MethodKafkaListenerEndpoint->>MethodKafkaListenerEndpoint:configureListenerAdapter() 配置监听器处理器适配器
-
- # 执行器方法
- BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:setHandlerMethod() 设置执行器方法(它用来处理设置监听器所在类方法的入参)
-
- BatchMessagingMessageListenerAdapter-->>MethodKafkaListenerEndpoint:返回消息监听器适配器对象
- MethodKafkaListenerEndpoint->>ConcurrentMessageListenerContainer:setupMessageListener() 设置消息监听器
- ConcurrentMessageListenerContainer->>ContainerProperties:setMessageListener() 设置消息转换器

这个类是实际的监听器端点实现类,它创建了处理器适配器、执行处理器方法,执行处理器方法是用于执行目标方法的处理器,用于通过解析参数,执行底层的目标业务方法。
它的 configureListenerAdapter 方法时序图如下。
- sequenceDiagram
- MethodKafkaListenerEndpoint->>MethodKafkaListenerEndpoint:configureListenerAdapter() 处理器适配器
- MethodKafkaListenerEndpoint->>KafkaListenerAnnotationBeanPostProcessor:createInvocableHandlerMethod() 创建执行处理器方法
- KafkaListenerAnnotationBeanPostProcessor->>KafkaListenerAnnotationBeanPostProcessor:getHandlerMethodFactory() 创建执行处理器方法工厂
-
- # 执行处理器方法工厂
- KafkaListenerAnnotationBeanPostProcessor->>KafkaListenerAnnotationBeanPostProcessor:createDefaultMessageHandlerMethodFactory() 创建默认的执行处理器方法工厂
- KafkaListenerAnnotationBeanPostProcessor->>DefaultMessageHandlerMethodFactory:new 创建默认执行处理器方法工厂
- KafkaListenerAnnotationBeanPostProcessor->>DefaultMessageHandlerMethodFactory:设置验证器、bean 工厂、格式化转换器 转化服务
- KafkaListenerAnnotationBeanPostProcessor->>DefaultMessageHandlerMethodFactory:设置消息转化器 GenericMessageConverter
- KafkaListenerAnnotationBeanPostProcessor->>DefaultMessageHandlerMethodFactory:设置 KafkaNullAwarePayloadArgumentResolver 等参数解析器
-
- DefaultMessageHandlerMethodFactory-->>KafkaListenerAnnotationBeanPostProcessor:返回默认执行处理器方法工厂
-
- DefaultMessageHandlerMethodFactory->>DefaultMessageHandlerMethodFactory:createInvocableHandlerMethod() 真正创建执行处理器方法
- DefaultMessageHandlerMethodFactory->>InvocableHandlerMethod:new 创建执行处理器方法
- InvocableHandlerMethod->>InvocableHandlerMethod:setMessageMethodArgumentResolvers() 设置消息方法参数解析器
- InvocableHandlerMethod-->>DefaultMessageHandlerMethodFactory:返回 执行处理器方法
- DefaultMessageHandlerMethodFactory-->>KafkaListenerAnnotationBeanPostProcessor:返回执行处理器方法
- DefaultMessageHandlerMethodFactory-->>MethodKafkaListenerEndpoint:返回执行处理器方法
-
- # HandlerAdapter 执行适配器
- MethodKafkaListenerEndpoint->>HandlerAdapter:new 创建执行器适配器
- HandlerAdapter-->>MethodKafkaListenerEndpoint:返回对象

这个类是监听器注解 bean 后置处理器,它是实现了 BeanPostProcessor 这个接口,用来在 bean 初始化完成之后执行相应的操作。比如这个类它负责获取所有的 bean 上使用 @KafkaListener 注解标记的方法,来通过 kafka 监听器容器工厂根据注解参数值来创建一个 kafka 监听器容器,用它来进行消费kafka 消息。
它的 postProcessAfterInitialization 方法的执行时序图如下。
- sequenceDiagram
- # 后置处理器处理
- KafkaListenerAnnotationBeanPostProcessor->>KafkaListenerAnnotationBeanPostProcessor:postProcessAfterInitialization() 后置处理器
- KafkaListenerAnnotationBeanPostProcessor->>MethodIntrospector:selectMethods() 寻找标记有 @KafkaListener 注解的方法
- MethodIntrospector-->>KafkaListenerAnnotationBeanPostProcessor:记有 @KafkaListener 注解的方法信息
-
- # 处理器监听器
- KafkaListenerAnnotationBeanPostProcessor->>KafkaListenerAnnotationBeanPostProcessor:processKafkaListener() 执行处理监听器
- KafkaListenerAnnotationBeanPostProcessor->>KafkaListenerAnnotationBeanPostProcessor:processListener() 主要的处理消费者监听
-
- # 设置端点属性
- KafkaListenerAnnotationBeanPostProcessor->>MethodKafkaListenerEndpoint:new 创建容器端点
- KafkaListenerAnnotationBeanPostProcessor->>MethodKafkaListenerEndpoint:设置端点属性(bean、MessageHandlerMethodFactory、groupID、topic、concurrency 等等)
-
- # 注册端点
- KafkaListenerAnnotationBeanPostProcessor->>KafkaListenerEndpointRegistrar:registerEndpoint() 注册端点
- KafkaListenerEndpointRegistrar->KafkaListenerEndpointRegistry:registerListenerContainer() 注册监听器容器
-
- # 创建监听器容器
- KafkaListenerEndpointRegistry-->>KafkaListenerEndpointRegistry:createListenerContainer() 创建监听器容器
- KafkaListenerEndpointRegistry-->>KafkaListenerEndpointRegistry:返回并发消息监听器容器 ConcurrentMessageListenerContainer
-
- # 启动容器
- KafkaListenerEndpointRegistry->>KafkaListenerEndpointRegistry:startIfNecessary() 立即开始

它的 startIfNecessary 启动容器方法的实行时序如下所示。
- sequenceDiagram
- # 启动容器
- KafkaListenerEndpointRegistry->>KafkaListenerEndpointRegistry:startIfNecessary() 立即开始
-
- # 这里进行启动容器
- KafkaListenerEndpointRegistry->>ConcurrentMessageListenerContainer:start() 启动监听器容器
- ConcurrentMessageListenerContainer->>ConcurrentMessageListenerContainer:doStart() 启动
-
- loop concurrency 并发量
- # 根据并发量设置 KafkaMessageListenerContainer 容器实例
- ConcurrentMessageListenerContainer->>ConcurrentMessageListenerContainer:constructContainer() 创建容器
- KafkaMessageListenerContainer-->>ConcurrentMessageListenerContainer:返回 KafkaMessageListenerContainer 容器实例
- KafkaMessageListenerContainer->>KafkaMessageListenerContainer:设置属性(bean、应用程序上下文、异常处理器、回滚处理器、各种消息前后置拦截器)
- KafkaMessageListenerContainer->>KafkaMessageListenerContainer:start() 启动容器
- KafkaMessageListenerContainer->>KafkaMessageListenerContainer:doStart() 启动容器
- KafkaMessageListenerContainer->>KafkaMessageListenerContainer:检查topics、检查 ack 模式、消费者线程执行器、消息监听器、创建监听器消费者任务
- KafkaMessageListenerContainer->>KafkaMessageListenerContainer:提交一个监听器消费者任务 ListenerConsumer
- end

这个类主要是负责拉取和消费消息的任务。
- sequenceDiagram
- ListenerConsumer->>ListenerConsumer:publishConsumerStartingEvent() 推送消费者启动中事件
- ListenerConsumer->>ListenerConsumer:initAssignedPartitions() 初始化指派的分区
- ListenerConsumer->>ListenerConsumer:publishConsumerStartedEvent() 发布消费者已启动事件
- loop isRunning()
- ListenerConsumer->>ListenerConsumer:pollAndInvoke() 拉取消息并处理
- ListenerConsumer->>ListenerConsumer:processCommits() 执行提交
- ListenerConsumer->>ListenerConsumer:invokeListener() 真正的执行消息消费
- ListenerConsumer->>ListenerConsumer:doInvokeBatchListener() 执行批量消息处理,这里有判断是否需要满的记录消息
- ListenerConsumer->>ListenerConsumer:invokeBatchOnMessage()
- ListenerConsumer->>ListenerConsumer:invokeBatchOnMessageWithRecordsOrList()
-
- # 转换 kafka 的消息为 spring-message 消息类型
- ListenerConsumer->>ListenerConsumer:doInvokeBatchOnMessage()
-
- # 这里处理消息
- ListenerConsumer->>BatchMessagingMessageListenerAdapter:onMessage() 处理消息
- end

这个类时批量消息监听器适配器,它实现了 BatchAcknowledgingConsumerAwareMessageListener 接口,用于接收处理批量消息,它的 onMessage 处理消息的方法时序图如下。
- sequenceDiagram
- ListenerConsumer->>BatchMessagingMessageListenerAdapter:onMessage() 处理消息
-
- # 转换消息为 Message<T> 类型
- BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:!isConsumerRecordList() 判断监听器方法参数是否为 List<ConsumerRecord> 类型
- BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:toMessagingMessage() 从 kafka 的原始 List<ConsumerRecord<K, V>> records 中提取消息,转化为 Message 消息,其中 payload 为 List<Object>
-
- # 执行消息转换
- # 将 message 消息绑定到监听器方法的入参上
- BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:invoke()
- BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:invokeHandler() 执行消息转换
-
- # 使用执行器方法执行
- BatchMessagingMessageListenerAdapter->>HandlerAdapter:invoke() 执行器适配器处理,绑定消息参数
- HandlerAdapter->>InvocableHandlerMethod:invoke() 执行器处理器方法执行
-
- # 解析与绑定方法入参参数
- InvocableHandlerMethod->>InvocableHandlerMethod:getMethodArgumentValues() 解析获取方法参数值
- InvocableHandlerMethod->>HandlerMethodArgumentResolverComposite:resolveArgument() 解析参数
- HandlerMethodArgumentResolverComposite->>HandlerMethodArgumentResolver:resolveArgument() 解析参数
-
- # 真正的解析参数
- HandlerMethodArgumentResolver->>KafkaNullAwarePayloadArgumentResolver:resolveArgument() 真正的解析参数
-
- HandlerMethodArgumentResolver-->>HandlerMethodArgumentResolverComposite:返回解析好的参数
- HandlerMethodArgumentResolverComposite->>InvocableHandlerMethod:返回解析好的参数
-
- # 执行监听器方法消费消息(处理业务逻辑)
- InvocableHandlerMethod->>InvocableHandlerMethod:doInvoke() 执行监听器方法,通过反射执行目标方法
- # 处理目标方法执行结果
- BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:handleResult() 处理执行结果
-
- # 异常处理
- BatchMessagingMessageListenerAdapter->>KafkaListenerErrorHandler:handleError() 异常处理

这个类就是真正的参数解析器类,它继承了 PayloadMethodArgumentResolver 类,进行了参数解析,它的参数解析方法的时序图如下。
- sequenceDiagram
- # 真正的解析参数
- HandlerMethodArgumentResolver->>KafkaNullAwarePayloadArgumentResolver:resolveArgument()
- KafkaNullAwarePayloadArgumentResolver->>PayloadMethodArgumentResolver:resolveArgument() 真正的解析参数
-
- PayloadMethodArgumentResolver->>Message:getPayload() 获取消息
- PayloadMethodArgumentResolver->>PayloadMethodArgumentResolver:resolveTargetClass() 解析目标类型与参数类型是否相同,如果相同则直接返回 payload
-
- PayloadMethodArgumentResolver->>GenericMessageConverter:fromMessage() 转换消息
- GenericMessageConverter->>GenericMessageConverter:getConverter() 根据目标参数类型与payload 类型,获取消息转换器
- GenericMessageConverter->>GenericConverter:convert() 转换
- GenericConverter-->GenericMessageConverter:返回转换之后的参数
- GenericMessageConverter-->>PayloadMethodArgumentResolver:返回转换之后的参数
- PayloadMethodArgumentResolver-->>KafkaNullAwarePayloadArgumentResolver:返回转换之后的参数
- KafkaNullAwarePayloadArgumentResolver->>HandlerMethodArgumentResolver:返回转换之后的参数
-
- HandlerMethodArgumentResolver-->>HandlerMethodArgumentResolverComposite:返回解析好的参数
- HandlerMethodArgumentResolverComposite->>InvocableHandlerMethod:返回解析好的参数

通过上面的几个类的方法执行时序图,我们知道了 spring-kafka 的以下几点流程:
那么回顾下我们上面做项目时遇到的问题——为什么spring-kafka 消费者批量模式下,使用 @KafkaListener 注解标记的方法入参处,参数类型为 String 时,会出现多条消息通过逗号进行拼接成字符串的问题?
很明显,通过上面源码的分析,我们可以看出,这个问题是出现在消息 BatchMessagingMessageListenerAdapter 消息解析、转换以及参数绑定的流程上。它是由 KafkaNullAwarePayloadArgumentResolver 类进行参数解析, 我们继续深入源码分析下,到底是怎么出现的这个问题的?
继续看 KafkaNullAwarePayloadArgumentResolver 参数解析器这个类,
这个类就是真正的参数解析器类,它继承了 PayloadMethodArgumentResolver 类,进行了参数解析
它源码为:
- /**
- * 这个参数解析器,是处理监听器方法入参的主要处理类,通过继承 PayloadMethodArgumentResolver 类来进行参数解析
- */
- private static class KafkaNullAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver {
-
- KafkaNullAwarePayloadArgumentResolver(MessageConverter messageConverter, Validator validator) {
- super(messageConverter, validator);
- }
-
- @Override
- public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception { // NOSONAR
- // 解析方法参数值,将 message 消息设置到参数上面去
- // 先使用 PayloadMethodArgumentResolver 类型进行解析参数值
- Object resolved = super.resolveArgument(parameter, message);
- /*
- * Replace KafkaNull list elements with null.
- */
- if (resolved instanceof List) {
- List<?> list = ((List<?>) resolved);
- for (int i = 0; i < list.size(); i++) {
- if (list.get(i) instanceof KafkaNull) {
- list.set(i, null);
- }
- }
- }
- return resolved;
- }
-
- @Override
- protected boolean isEmptyPayload(Object payload) {
- return payload == null || payload instanceof KafkaNull;
- }
-
- }

可以看出,它主要是执行了 PayloadMethodArgumentResolver 类型的参数解析。
- PayloadMethodArgumentResolver 类的参数解析方法
- ...
-
- @Override
- @Nullable
- public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
- Payload ann = parameter.getParameterAnnotation(Payload.class);
- if (ann != null && StringUtils.hasText(ann.expression())) {
- throw new IllegalStateException("@Payload SpEL expressions not supported by this resolver");
- }
- // 从 Message 中获取 payload,这里的 payload,如果消费者是批量消费,
- // 并且监听器方法参数类型。payload 是 List<String> 类型的。
- Object payload = message.getPayload();
-
- // 校验
- if (isEmptyPayload(payload)) {
- if (ann == null || ann.required()) {
- String paramName = getParameterName(parameter);
- BindingResult bindingResult = new BeanPropertyBindingResult(payload, paramName);
- bindingResult.addError(new ObjectError(paramName, "Payload value must not be empty"));
- throw new MethodArgumentNotValidException(message, parameter, bindingResult);
- }
- else {
- return null;
- }
- }
-
- // 解析目标参数类型
- Class<?> targetClass = resolveTargetClass(parameter, message);
- // payload 类型
- Class<?> payloadClass = payload.getClass();
- if (ClassUtils.isAssignable(targetClass, payloadClass)) {
- validate(message, parameter, payload);
- // 如果目标参数类型与payload 参数类型相等则直接返回
- return payload;
- }
- else {
- if (this.converter instanceof SmartMessageConverter) {
- SmartMessageConverter smartConverter = (SmartMessageConverter) this.converter;
- payload = smartConverter.fromMessage(message, targetClass, parameter);
- }
- else {
- // 这里使用 GenericMessageConverter 类型进行转换
- payload = this.converter.fromMessage(message, targetClass);
- }
- if (payload == null) {
- throw new MessageConversionException(message, "Cannot convert from [" +
- payloadClass.getName() + "] to [" + targetClass.getName() + "] for " + message);
- }
- validate(message, parameter, payload);
- return payload;
- }
- }
-
- ...

PayloadMethodArgumentResolver 类的 resolveArgument 方法中,主要使用了 GenericMessageConverter 进行消息转换。
- GenericMessageConverter类
- ...
- @Override
- @Nullable
- public Object fromMessage(Message<?> message, Class<?> targetClass) {
- Object payload = message.getPayload();
- if (this.conversionService.canConvert(payload.getClass(), targetClass)) {
- try {
- return this.conversionService.convert(payload, targetClass);
- }
- catch (ConversionException ex) {
- throw new MessageConversionException(message, "Failed to convert message payload '" +
- payload + "' to '" + targetClass.getName() + "'", ex);
- }
- }
- return (ClassUtils.isAssignableValue(targetClass, payload) ? payload : null);
- }
- ...

GenericMessageConverter 其中由包含了 ConversionService 转换器服务类,再看它的源码。
- public static void addCollectionConverters(ConverterRegistry converterRegistry) {
- ConversionService conversionService = (ConversionService) converterRegistry;
-
- converterRegistry.addConverter(new ArrayToCollectionConverter(conversionService));
- converterRegistry.addConverter(new CollectionToArrayConverter(conversionService));
-
- converterRegistry.addConverter(new ArrayToArrayConverter(conversionService));
- converterRegistry.addConverter(new CollectionToCollectionConverter(conversionService));
- converterRegistry.addConverter(new MapToMapConverter(conversionService));
-
- converterRegistry.addConverter(new ArrayToStringConverter(conversionService));
- converterRegistry.addConverter(new StringToArrayConverter(conversionService));
-
- converterRegistry.addConverter(new ArrayToObjectConverter(conversionService));
- converterRegistry.addConverter(new ObjectToArrayConverter(conversionService));
- # 看到这里的 Collection to String 的转换器
- converterRegistry.addConverter(new CollectionToStringConverter(conversionService));
- converterRegistry.addConverter(new StringToCollectionConverter(conversionService));
-
- converterRegistry.addConverter(new CollectionToObjectConverter(conversionService));
- converterRegistry.addConverter(new ObjectToCollectionConverter(conversionService));
-
- converterRegistry.addConverter(new StreamConverter(conversionService));
- }

可以看到,真正进行参数解析的转换器就是这个类 CollectionToStringConverter,它是负责把 Collection 集合转成 String 类型的参数的,我们看下它的转换方法。
- private static final String DELIMITER = ",";
-
- @Override
- @Nullable
- public Object convert(@Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
- if (source == null) {
- return null;
- }
- // 获取源类型
- Collection<?> sourceCollection = (Collection<?>) source;
- if (sourceCollection.isEmpty()) {
- return "";
- }
- // 创建通过逗号分割的字符串拼接器
- StringJoiner sj = new StringJoiner(DELIMITER);
- // 遍历每个源类型数据,用逗号进行拼接
- for (Object sourceElement : sourceCollection) {
- Object targetElement = this.conversionService.convert(
- sourceElement, sourceType.elementTypeDescriptor(sourceElement), targetType);
- sj.add(String.valueOf(targetElement));
- }
- return sj.toString();
- }

现在,我们就知道了为什么spring-kafka 消费者批量模式下,使用 @KafkaListener 注解标记的方法入参处,参数类型为 String 时,会出现多条消息通过逗号进行拼接成字符串的问题。
好了,既然知道了问题是怎么产生的,那如何解决也就很容易了,修改 @KafkaListener 注解标记的方法入参参数类型为 List<ConsumerRecord> 就可以解决问题了!
通过这次的源码分析总结,我感觉收获很多。首先从发现问题,带着疑问去阅读了相关问题框架的源码最后定位问题、解决问题、再总结。
以前阅读源码,都是简单的看下,不会有特殊的标记,那样做是不能做到深入理解和加深印象的。正确有效的做法是,首先我们需要下载框架源代码,然后需要带着一个问题去阅读,抓住主要流程,比如容器创建、启动、注册、运行,这些关键的流程,不然会面对成千上万行代码,因为没有目标而迷失自我;其次,在阅读源码的过程中,尝试着阅读英文文档,不要直接全文翻译,那样很不准确,只翻译不懂的单词然后尝试理解整个句子的含义,初期可能会感觉到很难,速度很慢,但是当你的词汇量积累到一定程度,慢慢的就会离开词典,阅读和理解英文文档会变得很快速,耐着性子坚持做下去,一段时间后会有非常大的收获;然后就是阅读过程中需要在关键的源码上写上注释,就相当于我们看书做的笔记一样,目的是为了加深印象,和快速标记方便下次查找;最后当阅读完一段源码或者一个核心流程,一定要将其方法的执行流程,通过时序图给画出来,这样会非常的直观。
从看源码到写完博客,花的时间还是挺长的,周末干了两个晚上写博客,虽然比较辛苦但是却感到很快乐充实。在前进的道路上,还需要继续努力!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。