当前位置:   article > 正文

深入理解 spring-kafka 监听器创建与运行以及消息处理流程_springboot kafka动态创建监听容器

springboot kafka动态创建监听容器

1. 前言

好久没有写博客了,正好最近在工作的时候,使用 spring-kafka 消费消息时候遇到一个关于批量消息处理的问题,通过阅读 spring-kafka 源码,才理解产生问题的原因,以及解决方法。

2. 背景

最近在开发一个需求中,有一个场景是需要接受第三方公司数据回调,我们系统需要提供一个接口接受和处理数据。这个接口处理的业务逻辑比较复杂,耗时比较长,为了不让调用方等待接口调用过长时间,考虑采用将数据发送到 kafka,然后异步监听消费这个数据。这样接口只负责将数据发送到kafka,然后就迅速返回响应。

这里我们使用的是 spring-kafka 框架,消费者和生产者的 key、value 序列化都是 string 类型。

3. 问题

在代码开发完毕,进行测试的时候发现,当并发的调用接收接口时,会出现 kafka 监听器消费数据不完整的情况。

可以理解为调用一次接口,发送一条数据到 kafka,然后kafka 监听器方法消费一条消息时,会对应创建一条业务数据;调用接口是一次一次的有间隔的调用,即非并发下是没有问题的,消费数据产生的业务数据正常;但是并发调用接口的情况下,业务数据和 kafka 消息数量不一致。

4. 分析

先贴上 kafka 配置以及相关代码(代码和日志都经过了脱敏处理)。

项目使用 springboot 搭建,spring-kafka 的版本是 2.3.9.RELEASE。

application.properties 中的 kafka 配置参数代码

  1. ##=============== provider 生产者 =======================
  2. spring.kafka.producer.retries=0
  3. # 每次批量发送消息的数量
  4. spring.kafka.producer.batch-size=16384
  5. spring.kafka.producer.buffer-memory=33554432
  6. # 指定消息key和消息体的编解码方式
  7. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  8. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  9. #=============== consumer 消费者 =======================
  10. spring.kafka.consumer.group-id=ezr_group
  11. spring.kafka.consumer.auto-offset-reset=latest
  12. #是否开启自动提交
  13. spring.kafka.consumer.enable-auto-commit=true
  14. #批量消费一次最大拉取的数据量
  15. spring.kafka.consumer.max-poll-records=100
  16. #自动提交的间隔时间ms
  17. spring.kafka.consumer.auto-commit-interval=5000
  18. #连接超时时间
  19. spring.kafka.consumer.session-timeout=6000
  20. #是否开启批量消费,true表示批量消费
  21. spring.kafka.listener.batch-listener=true
  22. #设置消费的线程数
  23. spring.kafka.listener.concurrency=3
  24. #如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
  25. #如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
  26. spring.kafka.listener.poll-timeout=1500
  27. # 指定消息key和消息体的编解码方式
  28. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  29. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

kafka 的生产者和消费者的生产和消费类型为批量消费,key 和 value 的序列化与反序列化类型为 String。

kafka 消息消费监听器接口

  1. @KafkaListener(topics = "${xxx.xxx.xxx}", groupId = "xxx-group")
  2. public void onXXXMessage(String message) {
  3. log.info("[onXXXMessage]\t[message:{}]", message);
  4. // 解析 String 消息为 Bean
  5. XXXBean xxxBean = JacksonUtil.toObject(message, XXXBean.class);
  6. // 保存请求对象
  7. saveXXXBean
  8. // 其他业务
  9. doSomething(xxxBean);
  10. }

JacksonUtil 类

  1. @Slf4j
  2. public class JacksonUtil {
  3. private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
  4. static {
  5. //设置转换时的配置,具体视情况配置,如果通过此处配置,使用全局。也可以通过在bean的属性上添加注解配置对应的bean
  6. // 统一返回数据的输出风格
  7. OBJECT_MAPPER.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
  8. // 时区
  9. OBJECT_MAPPER.setTimeZone(TimeZone.getTimeZone("GMT+8"));
  10. //序列化的时候序列对象的所有属性  
  11. OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.ALWAYS);
  12. //反序列化的时候如果多了其他属性,不抛出异常  
  13. OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
  14. //如果是空对象的时候,不抛异常  
  15. OBJECT_MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
  16. //取消时间的转化格式,默认是时间戳,可以取消,同时需要设置要表现的时间格式  
  17. OBJECT_MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
  18. OBJECT_MAPPER.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
  19. // Enum用index输出
  20. OBJECT_MAPPER.configure(SerializationFeature.WRITE_ENUMS_USING_INDEX, true);
  21. }
  22. public static <T> T toObject(String json, Class<T> clazz) {
  23. try {
  24. T data = OBJECT_MAPPER.readValue(json, clazz);
  25. return data;
  26. } catch (IOException e) {
  27. log.error("[JacksonUtil toObject fail]", e);
  28. return null;
  29. }
  30. }
  31. }

一次一次的调用接口情况下的消息消费的日志内容如下:

[onCouponPushMessage]	[message:{"BatchId":"3","CoupGrpId":1200043,"Brand":"LOOK_test","IsDiscountLimit":false,"GenDate":"2020-07-28 20:12:27"}]

并发下情况调用接口情况下的消息消费的日志内容如下:

[onCouponPushMessage]	[message:{"BatchId":"1","CoupGrpId":1200043,"Brand":"LOOK_test","IsDiscountLimit":false,"GenDate":"2020-07-28 20:12:25"},{"BatchId":"2","CoupGrpId":1200043,"Brand":"LOOK_test","IsDiscountLimit":false,"GenDate":"2020-07-28 20:12:26"}]

Kafka 的生产者和消费者类型都是批量消费,kafka 监听器接口的入参是 String 类型,两种操作接口的方式,返回的数据不一样的。

通过查看 API 接口请求日志,以及 kafka 消费数据的日志发现,kafka 消费时,并发情况下调用接口情况下,消费数据被合并到一起,用逗号分割连接到一起来了,这样其实是相当于两条数据了,但是经过 JacksonUtil 解析为 XXXBean 之后,生成了一个对象,进行保存,很明显这样就导致到发送的数据与转换的数据不一致的问题。

5. 解决方案

既然知道了问题,那么我们就可以很容易的解决了。

这里有两种方案。

  • 第一种方案是将消息进行解析,分析消息是否为用逗号拼接而成的 json 数据,然后进行根据逗号分割再转化成 XXXBean 数据 list。
    • 这种方案没什么优点,只有缺点,就是是不好进行解析,它的是格式不像是 JsonArray 那样可以直接解析,而是由 Json 对象字符串用逗号拼接而来,需要特殊处理,需要自定义的解析器,利用栈来匹配 json 对象中的括号进行解析,编写代码比较麻烦。
  • 第二种方案是从源头上处理。即监听器方法入参处将参数类型由 String 改为 List<ConsumerRecord> 类型,这样获得的数据就是一个list,然后遍历 list 处理每条数据。
    • 这种方案比较简单,选择它。

选择第二种方法解决问题。

6. 源码深入分析

上面的问题解决了,但是我们不知道会出现这样的问题,即在消费者批量处理消费的模式下,监听器方法的入参参数类型是 String 的时候,为什么会出现数据通过逗号拼接在一起的问题呢?

要解决这个问题,必须要看源码了!好了废话不多说,我们直接开干,撸源码!

我们其实是要找的代码逻辑,是监听器方法入参的参数值设置相关的代码逻辑。

6.1 @KafkaListener 注解

我们先从 @KafkaListener 这个注解先看起,进入到它的类上,说它是用标记指定的 topic 从而被生成一个 kafka 监听器容器,然后监听器容器就负责拉取 kafka 消息并且转换处理以及绑定消息到监听器指定的目标方法,然后执行目标方法,最后根据结果以及异常情况作相应的处理,包括提交、回滚、重试、异常处理等等操作。

接着我们开始看到如何构建一个监听器容器类的。

6.2 KafkaListenerEndpointRegistrar 监听器端点登记员

KafkaListenerEndpointRegistrar 这个类是一个实现了 InitializingBean 接口的类,它在 KafkaListenerAnnotationBeanPostProcessor 类中以实例变量被创建、设置属性、注册所有的监听器。

它的注册所有监听器方法的时序图如下所示。

KafkaListenerEndpointRegistrar

  1. sequenceDiagram
  2. KafkaListenerEndpointRegistrar->>KafkaListenerEndpointRegistrar:afterPropertiesSet() 初始化 bean
  3. KafkaListenerEndpointRegistrar->>KafkaListenerEndpointRegistrar:registerAllEndpoints() 注册所有端点
  4. loop endpointDescriptors 端点描述器
  5. KafkaListenerEndpointRegistrar->>KafkaListenerEndpointRegistry:registerListenerContainer() 注册监听器容器
  6. KafkaListenerEndpointRegistry->>KafkaListenerEndpointRegistry:createListenerContainer() 创建监听器容器
  7. end
  8. KafkaListenerEndpointRegistrar->>KafkaListenerEndpointRegistrar:this.startImmediately = true 设置立即启动标识

6.3 KafkaListenerEndpointRegistry 监听器端点注册表

这个类是一个监听器端点注册表,它用来创建监听器容器,同时管理监听器容器的生命周期。

它的创建监听器容器的方法执行时序图如下

KafkaListenerEndpointRegistry

  1. sequenceDiagram
  2. # 创建与注册监听器容器
  3. KafkaListenerEndpointRegistry->>KafkaListenerEndpointRegistry:createListenerContainer() 创建监听器容器
  4. KafkaListenerEndpointRegistry->>ConcurrentKafkaListenerContainerFactory:createListenerContainer() 创建监听器容器
  5. ConcurrentKafkaListenerContainerFactory->>ConcurrentKafkaListenerContainerFactory:createContainerInstance() 创建监听器容器
  6. ConcurrentKafkaListenerContainerFactory->>ConcurrentMessageListenerContainer:new 创建并发消息监听器容器实例
  7. ConcurrentMessageListenerContainer-->>ConcurrentKafkaListenerContainerFactory:返回实例
  8. # 设置消息监听器容器
  9. ConcurrentKafkaListenerContainerFactory->>MethodKafkaListenerEndpoint:setupListenerContainer() 设置监听器容器
  10. ConcurrentKafkaListenerContainerFactory->>ConcurrentKafkaListenerContainerFactory:initializeContainer() 初始化容器
  11. ConcurrentKafkaListenerContainerFactory->>ConcurrentKafkaListenerContainerFactory:customizeContainer() 自定义配置容器
  12. ConcurrentKafkaListenerContainerFactory-->>KafkaListenerEndpointRegistry:返回批量消息监听器适配对象

6.4 MethodKafkaListenerEndpoint 监听器端点设置监听器容器

这个类是主要用于设置监听器容器的消息转换器。

它的设置监听器容器 setupListenerContainer 方法执行时序如下图。

MethodKafkaListenerEndpoint

  1. sequenceDiagram
  2. MethodKafkaListenerEndpoint->>MethodKafkaListenerEndpoint:setupMessageListener() 设置消息监听器
  3. # 消息监听器
  4. MethodKafkaListenerEndpoint->>MethodKafkaListenerEndpoint:createMessageListener() 创建消息监听器
  5. MethodKafkaListenerEndpoint->>MethodKafkaListenerEndpoint:createMessageListenerInstance() 创建消息监听器适配器
  6. MethodKafkaListenerEndpoint->>BatchMessagingMessageListenerAdapter:new 创建批量消息监听器、或者非批量消息监听器适配器
  7. BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:setBatchToRecordAdapter() 设置批量接收消息适配器
  8. BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:setBatchMessageConverter() 设置批量消息转换器适配器
  9. BatchMessagingMessageListenerAdapter-->>MethodKafkaListenerEndpoint:返回消息监听器适配器
  10. # 配置监听器处理器适配器
  11. MethodKafkaListenerEndpoint->>MethodKafkaListenerEndpoint:configureListenerAdapter() 配置监听器处理器适配器
  12. # 执行器方法
  13. BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:setHandlerMethod() 设置执行器方法(它用来处理设置监听器所在类方法的入参)
  14. BatchMessagingMessageListenerAdapter-->>MethodKafkaListenerEndpoint:返回消息监听器适配器对象
  15. MethodKafkaListenerEndpoint->>ConcurrentMessageListenerContainer:setupMessageListener() 设置消息监听器
  16. ConcurrentMessageListenerContainer->>ContainerProperties:setMessageListener() 设置消息转换器

6.5 MethodKafkaListenerEndpoint 设置执行器器方法

这个类是实际的监听器端点实现类,它创建了处理器适配器、执行处理器方法,执行处理器方法是用于执行目标方法的处理器,用于通过解析参数,执行底层的目标业务方法。

它的 configureListenerAdapter 方法时序图如下。

MethodKafkaListenerEndpoint 设置执行器器方法

  1. sequenceDiagram
  2. MethodKafkaListenerEndpoint->>MethodKafkaListenerEndpoint:configureListenerAdapter() 处理器适配器
  3. MethodKafkaListenerEndpoint->>KafkaListenerAnnotationBeanPostProcessor:createInvocableHandlerMethod() 创建执行处理器方法
  4. KafkaListenerAnnotationBeanPostProcessor->>KafkaListenerAnnotationBeanPostProcessor:getHandlerMethodFactory() 创建执行处理器方法工厂
  5. # 执行处理器方法工厂
  6. KafkaListenerAnnotationBeanPostProcessor->>KafkaListenerAnnotationBeanPostProcessor:createDefaultMessageHandlerMethodFactory() 创建默认的执行处理器方法工厂
  7. KafkaListenerAnnotationBeanPostProcessor->>DefaultMessageHandlerMethodFactory:new 创建默认执行处理器方法工厂
  8. KafkaListenerAnnotationBeanPostProcessor->>DefaultMessageHandlerMethodFactory:设置验证器、bean 工厂、格式化转换器 转化服务
  9. KafkaListenerAnnotationBeanPostProcessor->>DefaultMessageHandlerMethodFactory:设置消息转化器 GenericMessageConverter
  10. KafkaListenerAnnotationBeanPostProcessor->>DefaultMessageHandlerMethodFactory:设置 KafkaNullAwarePayloadArgumentResolver 等参数解析器
  11. DefaultMessageHandlerMethodFactory-->>KafkaListenerAnnotationBeanPostProcessor:返回默认执行处理器方法工厂
  12. DefaultMessageHandlerMethodFactory->>DefaultMessageHandlerMethodFactory:createInvocableHandlerMethod() 真正创建执行处理器方法
  13. DefaultMessageHandlerMethodFactory->>InvocableHandlerMethod:new 创建执行处理器方法
  14. InvocableHandlerMethod->>InvocableHandlerMethod:setMessageMethodArgumentResolvers() 设置消息方法参数解析器
  15. InvocableHandlerMethod-->>DefaultMessageHandlerMethodFactory:返回 执行处理器方法
  16. DefaultMessageHandlerMethodFactory-->>KafkaListenerAnnotationBeanPostProcessor:返回执行处理器方法
  17. DefaultMessageHandlerMethodFactory-->>MethodKafkaListenerEndpoint:返回执行处理器方法
  18. # HandlerAdapter 执行适配器
  19. MethodKafkaListenerEndpoint->>HandlerAdapter:new 创建执行器适配器
  20. HandlerAdapter-->>MethodKafkaListenerEndpoint:返回对象

6.6 KafkaListenerAnnotationBeanPostProcessor 监听器注解后置处理器

这个类是监听器注解 bean 后置处理器,它是实现了 BeanPostProcessor 这个接口,用来在 bean 初始化完成之后执行相应的操作。比如这个类它负责获取所有的 bean 上使用 @KafkaListener 注解标记的方法,来通过 kafka 监听器容器工厂根据注解参数值来创建一个 kafka 监听器容器,用它来进行消费kafka 消息。

它的 postProcessAfterInitialization 方法的执行时序图如下。

KafkaListenerAnnotationBeanPostProcessor 监听器注解后置处理器

  1. sequenceDiagram
  2. # 后置处理器处理
  3. KafkaListenerAnnotationBeanPostProcessor->>KafkaListenerAnnotationBeanPostProcessor:postProcessAfterInitialization() 后置处理器
  4. KafkaListenerAnnotationBeanPostProcessor->>MethodIntrospector:selectMethods() 寻找标记有 @KafkaListener 注解的方法
  5. MethodIntrospector-->>KafkaListenerAnnotationBeanPostProcessor:记有 @KafkaListener 注解的方法信息
  6. # 处理器监听器
  7. KafkaListenerAnnotationBeanPostProcessor->>KafkaListenerAnnotationBeanPostProcessor:processKafkaListener() 执行处理监听器
  8. KafkaListenerAnnotationBeanPostProcessor->>KafkaListenerAnnotationBeanPostProcessor:processListener() 主要的处理消费者监听
  9. # 设置端点属性
  10. KafkaListenerAnnotationBeanPostProcessor->>MethodKafkaListenerEndpoint:new 创建容器端点
  11. KafkaListenerAnnotationBeanPostProcessor->>MethodKafkaListenerEndpoint:设置端点属性(bean、MessageHandlerMethodFactory、groupID、topic、concurrency 等等)
  12. # 注册端点
  13. KafkaListenerAnnotationBeanPostProcessor->>KafkaListenerEndpointRegistrar:registerEndpoint() 注册端点
  14. KafkaListenerEndpointRegistrar->KafkaListenerEndpointRegistry:registerListenerContainer() 注册监听器容器
  15. # 创建监听器容器
  16. KafkaListenerEndpointRegistry-->>KafkaListenerEndpointRegistry:createListenerContainer() 创建监听器容器
  17. KafkaListenerEndpointRegistry-->>KafkaListenerEndpointRegistry:返回并发消息监听器容器 ConcurrentMessageListenerContainer
  18. # 启动容器
  19. KafkaListenerEndpointRegistry->>KafkaListenerEndpointRegistry:startIfNecessary() 立即开始

6.7 KafkaListenerEndpointRegistry 监听器端点注册表启动容器方法执行时序

它的 startIfNecessary 启动容器方法的实行时序如下所示。

KafkaListenerEndpointRegistry 监听器端点注册表启动容器方法执行时序

  1. sequenceDiagram
  2. # 启动容器
  3. KafkaListenerEndpointRegistry->>KafkaListenerEndpointRegistry:startIfNecessary() 立即开始
  4. # 这里进行启动容器
  5. KafkaListenerEndpointRegistry->>ConcurrentMessageListenerContainer:start() 启动监听器容器
  6. ConcurrentMessageListenerContainer->>ConcurrentMessageListenerContainer:doStart() 启动
  7. loop concurrency 并发量
  8. # 根据并发量设置 KafkaMessageListenerContainer 容器实例
  9. ConcurrentMessageListenerContainer->>ConcurrentMessageListenerContainer:constructContainer() 创建容器
  10. KafkaMessageListenerContainer-->>ConcurrentMessageListenerContainer:返回 KafkaMessageListenerContainer 容器实例
  11. KafkaMessageListenerContainer->>KafkaMessageListenerContainer:设置属性(bean、应用程序上下文、异常处理器、回滚处理器、各种消息前后置拦截器)
  12. KafkaMessageListenerContainer->>KafkaMessageListenerContainer:start() 启动容器
  13. KafkaMessageListenerContainer->>KafkaMessageListenerContainer:doStart() 启动容器
  14. KafkaMessageListenerContainer->>KafkaMessageListenerContainer:检查topics、检查 ack 模式、消费者线程执行器、消息监听器、创建监听器消费者任务
  15. KafkaMessageListenerContainer->>KafkaMessageListenerContainer:提交一个监听器消费者任务 ListenerConsumer
  16. end

6.8 监听器消费者任务 ListenerConsumer run 方法执行时序

这个类主要是负责拉取和消费消息的任务。

监听器消费者任务 ListenerConsumer run 方法执行时序

  1. sequenceDiagram
  2. ListenerConsumer->>ListenerConsumer:publishConsumerStartingEvent() 推送消费者启动中事件
  3. ListenerConsumer->>ListenerConsumer:initAssignedPartitions() 初始化指派的分区
  4. ListenerConsumer->>ListenerConsumer:publishConsumerStartedEvent() 发布消费者已启动事件
  5. loop isRunning()
  6. ListenerConsumer->>ListenerConsumer:pollAndInvoke() 拉取消息并处理
  7. ListenerConsumer->>ListenerConsumer:processCommits() 执行提交
  8. ListenerConsumer->>ListenerConsumer:invokeListener() 真正的执行消息消费
  9. ListenerConsumer->>ListenerConsumer:doInvokeBatchListener() 执行批量消息处理,这里有判断是否需要满的记录消息
  10. ListenerConsumer->>ListenerConsumer:invokeBatchOnMessage()
  11. ListenerConsumer->>ListenerConsumer:invokeBatchOnMessageWithRecordsOrList()
  12. # 转换 kafka 的消息为 spring-message 消息类型
  13. ListenerConsumer->>ListenerConsumer:doInvokeBatchOnMessage()
  14. # 这里处理消息
  15. ListenerConsumer->>BatchMessagingMessageListenerAdapter:onMessage() 处理消息
  16. end

6.9 BatchMessagingMessageListenerAdapter 批量消息处理器适配器的接收处理消息时序

这个类时批量消息监听器适配器,它实现了 BatchAcknowledgingConsumerAwareMessageListener 接口,用于接收处理批量消息,它的 onMessage 处理消息的方法时序图如下。

BatchMessagingMessageListenerAdapter 批量消息处理器适配器的接收处理消息时序

  1. sequenceDiagram
  2. ListenerConsumer->>BatchMessagingMessageListenerAdapter:onMessage() 处理消息
  3. # 转换消息为 Message<T> 类型
  4. BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:!isConsumerRecordList() 判断监听器方法参数是否为 List<ConsumerRecord> 类型
  5. BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:toMessagingMessage() 从 kafka 的原始 List<ConsumerRecord<K, V>> records 中提取消息,转化为 Message 消息,其中 payload 为 List<Object>
  6. # 执行消息转换
  7. # 将 message 消息绑定到监听器方法的入参上
  8. BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:invoke()
  9. BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:invokeHandler() 执行消息转换
  10. # 使用执行器方法执行
  11. BatchMessagingMessageListenerAdapter->>HandlerAdapter:invoke() 执行器适配器处理,绑定消息参数
  12. HandlerAdapter->>InvocableHandlerMethod:invoke() 执行器处理器方法执行
  13. # 解析与绑定方法入参参数
  14. InvocableHandlerMethod->>InvocableHandlerMethod:getMethodArgumentValues() 解析获取方法参数值
  15. InvocableHandlerMethod->>HandlerMethodArgumentResolverComposite:resolveArgument() 解析参数
  16. HandlerMethodArgumentResolverComposite->>HandlerMethodArgumentResolver:resolveArgument() 解析参数
  17. # 真正的解析参数
  18. HandlerMethodArgumentResolver->>KafkaNullAwarePayloadArgumentResolver:resolveArgument() 真正的解析参数
  19. HandlerMethodArgumentResolver-->>HandlerMethodArgumentResolverComposite:返回解析好的参数
  20. HandlerMethodArgumentResolverComposite->>InvocableHandlerMethod:返回解析好的参数
  21. # 执行监听器方法消费消息(处理业务逻辑)
  22. InvocableHandlerMethod->>InvocableHandlerMethod:doInvoke() 执行监听器方法,通过反射执行目标方法
  23. # 处理目标方法执行结果
  24. BatchMessagingMessageListenerAdapter->>BatchMessagingMessageListenerAdapter:handleResult() 处理执行结果
  25. # 异常处理
  26. BatchMessagingMessageListenerAdapter->>KafkaListenerErrorHandler:handleError() 异常处理

6.10 KafkaNullAwarePayloadArgumentResolver 参数解析器

这个类就是真正的参数解析器类,它继承了 PayloadMethodArgumentResolver 类,进行了参数解析,它的参数解析方法的时序图如下。

KafkaNullAwarePayloadArgumentResolver 参数解析器

  1. sequenceDiagram
  2. # 真正的解析参数
  3. HandlerMethodArgumentResolver->>KafkaNullAwarePayloadArgumentResolver:resolveArgument()
  4. KafkaNullAwarePayloadArgumentResolver->>PayloadMethodArgumentResolver:resolveArgument() 真正的解析参数
  5. PayloadMethodArgumentResolver->>Message:getPayload() 获取消息
  6. PayloadMethodArgumentResolver->>PayloadMethodArgumentResolver:resolveTargetClass() 解析目标类型与参数类型是否相同,如果相同则直接返回 payload
  7. PayloadMethodArgumentResolver->>GenericMessageConverter:fromMessage() 转换消息
  8. GenericMessageConverter->>GenericMessageConverter:getConverter() 根据目标参数类型与payload 类型,获取消息转换器
  9. GenericMessageConverter->>GenericConverter:convert() 转换
  10. GenericConverter-->GenericMessageConverter:返回转换之后的参数
  11. GenericMessageConverter-->>PayloadMethodArgumentResolver:返回转换之后的参数
  12. PayloadMethodArgumentResolver-->>KafkaNullAwarePayloadArgumentResolver:返回转换之后的参数
  13. KafkaNullAwarePayloadArgumentResolver->>HandlerMethodArgumentResolver:返回转换之后的参数
  14. HandlerMethodArgumentResolver-->>HandlerMethodArgumentResolverComposite:返回解析好的参数
  15. HandlerMethodArgumentResolverComposite->>InvocableHandlerMethod:返回解析好的参数

7. 源码流程总结

通过上面的几个类的方法执行时序图,我们知道了 spring-kafka 的以下几点流程:

  1. 监听器容器是由 KafkaListenerAnnotationBeanPostProcessor 后置处理器类,通过负责获取所有的被注册的 bean 中使用 @KafkaListener 注解标记的方法,来通过 kafka 监听器容器工厂根据注解参数值来创建一个 kafka 监听器容器,并且执行注册与启动的流程;
  2. 监听器容器 KafkaMessageListenerContainer 运行流程,通过创建一个 ListenerConsumer 监听器消费者任务来执行消息拉取与处理工作;
  3. 监听器容器运行时,通过 BatchMessagingMessageListenerAdapter 批量消息监听器适配器,来对消息进行解析、转化、以及参数绑定、的流程。

8. 问题分析

那么回顾下我们上面做项目时遇到的问题——为什么spring-kafka 消费者批量模式下,使用 @KafkaListener 注解标记的方法入参处,参数类型为 String 时,会出现多条消息通过逗号进行拼接成字符串的问题?

很明显,通过上面源码的分析,我们可以看出,这个问题是出现在消息 BatchMessagingMessageListenerAdapter 消息解析、转换以及参数绑定的流程上。它是由 KafkaNullAwarePayloadArgumentResolver 类进行参数解析, 我们继续深入源码分析下,到底是怎么出现的这个问题的?

继续看 KafkaNullAwarePayloadArgumentResolver 参数解析器这个类,

这个类就是真正的参数解析器类,它继承了 PayloadMethodArgumentResolver 类,进行了参数解析

它源码为:

  1. /**
  2. * 这个参数解析器,是处理监听器方法入参的主要处理类,通过继承 PayloadMethodArgumentResolver 类来进行参数解析
  3. */
  4. private static class KafkaNullAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver {
  5. KafkaNullAwarePayloadArgumentResolver(MessageConverter messageConverter, Validator validator) {
  6. super(messageConverter, validator);
  7. }
  8. @Override
  9. public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception { // NOSONAR
  10. // 解析方法参数值,将 message 消息设置到参数上面去
  11. // 先使用 PayloadMethodArgumentResolver 类型进行解析参数值
  12. Object resolved = super.resolveArgument(parameter, message);
  13. /*
  14. * Replace KafkaNull list elements with null.
  15. */
  16. if (resolved instanceof List) {
  17. List<?> list = ((List<?>) resolved);
  18. for (int i = 0; i < list.size(); i++) {
  19. if (list.get(i) instanceof KafkaNull) {
  20. list.set(i, null);
  21. }
  22. }
  23. }
  24. return resolved;
  25. }
  26. @Override
  27. protected boolean isEmptyPayload(Object payload) {
  28. return payload == null || payload instanceof KafkaNull;
  29. }
  30. }

可以看出,它主要是执行了 PayloadMethodArgumentResolver 类型的参数解析。

  1. PayloadMethodArgumentResolver 类的参数解析方法
  2. ...
  3. @Override
  4. @Nullable
  5. public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
  6. Payload ann = parameter.getParameterAnnotation(Payload.class);
  7. if (ann != null && StringUtils.hasText(ann.expression())) {
  8. throw new IllegalStateException("@Payload SpEL expressions not supported by this resolver");
  9. }
  10. // 从 Message 中获取 payload,这里的 payload,如果消费者是批量消费,
  11. // 并且监听器方法参数类型。payload 是 List<String> 类型的。
  12. Object payload = message.getPayload();
  13. // 校验
  14. if (isEmptyPayload(payload)) {
  15. if (ann == null || ann.required()) {
  16. String paramName = getParameterName(parameter);
  17. BindingResult bindingResult = new BeanPropertyBindingResult(payload, paramName);
  18. bindingResult.addError(new ObjectError(paramName, "Payload value must not be empty"));
  19. throw new MethodArgumentNotValidException(message, parameter, bindingResult);
  20. }
  21. else {
  22. return null;
  23. }
  24. }
  25. // 解析目标参数类型
  26. Class<?> targetClass = resolveTargetClass(parameter, message);
  27. // payload 类型
  28. Class<?> payloadClass = payload.getClass();
  29. if (ClassUtils.isAssignable(targetClass, payloadClass)) {
  30. validate(message, parameter, payload);
  31. // 如果目标参数类型与payload 参数类型相等则直接返回
  32. return payload;
  33. }
  34. else {
  35. if (this.converter instanceof SmartMessageConverter) {
  36. SmartMessageConverter smartConverter = (SmartMessageConverter) this.converter;
  37. payload = smartConverter.fromMessage(message, targetClass, parameter);
  38. }
  39. else {
  40. // 这里使用 GenericMessageConverter 类型进行转换
  41. payload = this.converter.fromMessage(message, targetClass);
  42. }
  43. if (payload == null) {
  44. throw new MessageConversionException(message, "Cannot convert from [" +
  45. payloadClass.getName() + "] to [" + targetClass.getName() + "] for " + message);
  46. }
  47. validate(message, parameter, payload);
  48. return payload;
  49. }
  50. }
  51. ...

PayloadMethodArgumentResolver 类的 resolveArgument 方法中,主要使用了 GenericMessageConverter 进行消息转换。

  1. GenericMessageConverter类
  2. ...
  3. @Override
  4. @Nullable
  5. public Object fromMessage(Message<?> message, Class<?> targetClass) {
  6. Object payload = message.getPayload();
  7. if (this.conversionService.canConvert(payload.getClass(), targetClass)) {
  8. try {
  9. return this.conversionService.convert(payload, targetClass);
  10. }
  11. catch (ConversionException ex) {
  12. throw new MessageConversionException(message, "Failed to convert message payload '" +
  13. payload + "' to '" + targetClass.getName() + "'", ex);
  14. }
  15. }
  16. return (ClassUtils.isAssignableValue(targetClass, payload) ? payload : null);
  17. }
  18. ...

GenericMessageConverter 其中由包含了 ConversionService 转换器服务类,再看它的源码。

  1. public static void addCollectionConverters(ConverterRegistry converterRegistry) {
  2. ConversionService conversionService = (ConversionService) converterRegistry;
  3. converterRegistry.addConverter(new ArrayToCollectionConverter(conversionService));
  4. converterRegistry.addConverter(new CollectionToArrayConverter(conversionService));
  5. converterRegistry.addConverter(new ArrayToArrayConverter(conversionService));
  6. converterRegistry.addConverter(new CollectionToCollectionConverter(conversionService));
  7. converterRegistry.addConverter(new MapToMapConverter(conversionService));
  8. converterRegistry.addConverter(new ArrayToStringConverter(conversionService));
  9. converterRegistry.addConverter(new StringToArrayConverter(conversionService));
  10. converterRegistry.addConverter(new ArrayToObjectConverter(conversionService));
  11. converterRegistry.addConverter(new ObjectToArrayConverter(conversionService));
  12. # 看到这里的 Collection to String 的转换器
  13. converterRegistry.addConverter(new CollectionToStringConverter(conversionService));
  14. converterRegistry.addConverter(new StringToCollectionConverter(conversionService));
  15. converterRegistry.addConverter(new CollectionToObjectConverter(conversionService));
  16. converterRegistry.addConverter(new ObjectToCollectionConverter(conversionService));
  17. converterRegistry.addConverter(new StreamConverter(conversionService));
  18. }

可以看到,真正进行参数解析的转换器就是这个类 CollectionToStringConverter,它是负责把 Collection 集合转成 String 类型的参数的,我们看下它的转换方法。

  1. private static final String DELIMITER = ",";
  2. @Override
  3. @Nullable
  4. public Object convert(@Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
  5. if (source == null) {
  6. return null;
  7. }
  8. // 获取源类型
  9. Collection<?> sourceCollection = (Collection<?>) source;
  10. if (sourceCollection.isEmpty()) {
  11. return "";
  12. }
  13. // 创建通过逗号分割的字符串拼接器
  14. StringJoiner sj = new StringJoiner(DELIMITER);
  15. // 遍历每个源类型数据,用逗号进行拼接
  16. for (Object sourceElement : sourceCollection) {
  17. Object targetElement = this.conversionService.convert(
  18. sourceElement, sourceType.elementTypeDescriptor(sourceElement), targetType);
  19. sj.add(String.valueOf(targetElement));
  20. }
  21. return sj.toString();
  22. }

现在,我们就知道了为什么spring-kafka 消费者批量模式下,使用 @KafkaListener 注解标记的方法入参处,参数类型为 String 时,会出现多条消息通过逗号进行拼接成字符串的问题。

好了,既然知道了问题是怎么产生的,那如何解决也就很容易了,修改 @KafkaListener 注解标记的方法入参参数类型为 List<ConsumerRecord> 就可以解决问题了!

9. 总结

通过这次的源码分析总结,我感觉收获很多。首先从发现问题,带着疑问去阅读了相关问题框架的源码最后定位问题、解决问题、再总结。

以前阅读源码,都是简单的看下,不会有特殊的标记,那样做是不能做到深入理解和加深印象的。正确有效的做法是,首先我们需要下载框架源代码,然后需要带着一个问题去阅读,抓住主要流程,比如容器创建、启动、注册、运行,这些关键的流程,不然会面对成千上万行代码,因为没有目标而迷失自我;其次,在阅读源码的过程中,尝试着阅读英文文档,不要直接全文翻译,那样很不准确,只翻译不懂的单词然后尝试理解整个句子的含义,初期可能会感觉到很难,速度很慢,但是当你的词汇量积累到一定程度,慢慢的就会离开词典,阅读和理解英文文档会变得很快速,耐着性子坚持做下去,一段时间后会有非常大的收获;然后就是阅读过程中需要在关键的源码上写上注释,就相当于我们看书做的笔记一样,目的是为了加深印象,和快速标记方便下次查找;最后当阅读完一段源码或者一个核心流程,一定要将其方法的执行流程,通过时序图给画出来,这样会非常的直观。

从看源码到写完博客,花的时间还是挺长的,周末干了两个晚上写博客,虽然比较辛苦但是却感到很快乐充实。在前进的道路上,还需要继续努力!

10. 参考链接

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/906327
推荐阅读
相关标签
  

闽ICP备14008679号