赞
踩
首先通过@EnableKafka注解,注入KafkaBootstrapConfiguration类
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
// @Import是spring提供的注入类的方式
@Import(KafkaBootstrapConfiguration.class)
public @interface EnableKafka {
}
再注入KafkaListenerAnnotationBeanPostProcessor
@Configuration public class KafkaBootstrapConfiguration { @SuppressWarnings("rawtypes") @Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationProcessor() { // 注入KafkaListenerAnnotationBeanPostProcessor return new KafkaListenerAnnotationBeanPostProcessor(); } @Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() { return new KafkaListenerEndpointRegistry(); } }
可以看到KafkaListenerAnnotationBeanPostProcessor实现了BeanPostProcessor,那自然存在postProcessBeforeInitialization和postProcessAfterInitialization两个关键方法
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
// 代码省略...
}
@Override public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { if (!this.nonAnnotatedClasses.contains(bean.getClass())) { Class<?> targetClass = AopUtils.getTargetClass(bean); //收集注解方法 Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass); final boolean hasClassLevelListeners = classLevelListeners.size() > 0; final List<Method> multiMethods = new ArrayList<Method>(); Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() { @Override public Set<KafkaListener> inspect(Method method) { Set<KafkaListener> listenerMethods = findListenerAnnotations(method); return (!listenerMethods.isEmpty() ? listenerMethods : null); } }); if (hasClassLevelListeners) { Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass, new ReflectionUtils.MethodFilter() { @Override public boolean matches(Method method) { return AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null; } }); multiMethods.addAll(methodsWithHandler); } if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(bean.getClass()); if (this.logger.isTraceEnabled()) { this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass()); } } else { // Non-empty set of methods for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); for (KafkaListener listener : entry.getValue()) { //遍历处理 processKafkaListener(listener, method, bean, beanName); } } if (this.logger.isDebugEnabled()) { this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '" + beanName + "': " + annotatedMethods); } } if (hasClassLevelListeners) { processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName); } } return bean; }
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<K, V>();
endpoint.setMethod(methodToUse);
endpoint.setBeanFactory(this.beanFactory);
processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
}
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, Object adminTarget, String beanName) { //把kafkaListener中的属性封装成对象 endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(kafkaListener)); endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener)); endpoint.setTopics(resolveTopics(kafkaListener)); endpoint.setTopicPattern(resolvePattern(kafkaListener)); String group = kafkaListener.containerGroup(); if (StringUtils.hasText(group)) { Object resolvedGroup = resolveExpression(group); if (resolvedGroup instanceof String) { endpoint.setGroup((String) resolvedGroup); } } KafkaListenerContainerFactory<?> factory = null; String containerFactoryBeanName = resolve(kafkaListener.containerFactory()); if (StringUtils.hasText(containerFactoryBeanName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); try { factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class); } catch (NoSuchBeanDefinitionException ex) { throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget + "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName() + " with id '" + containerFactoryBeanName + "' was found in the application context", ex); } } //交给KafkaListenerEndpointRegistry处理 this.registrar.registerEndpoint(endpoint, factory); }
public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must be set");
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
// Factory may be null, we defer the resolution right before actually creating the container
KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
this.endpointDescriptors.add(descriptor);
}
}
}
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory, boolean startImmediately) { Assert.notNull(endpoint, "Endpoint must not be null"); Assert.notNull(factory, "Factory must not be null"); String id = endpoint.getId(); Assert.hasText(id, "Endpoint id must not be empty"); synchronized (this.listenerContainers) { Assert.state(!this.listenerContainers.containsKey(id), "Another endpoint is already registered with id '" + id + "'"); //创建ConcurrentMessageListenerContainer MessageListenerContainer container = createListenerContainer(endpoint, factory); this.listenerContainers.put(id, container); if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) { List<MessageListenerContainer> containerGroup; if (this.applicationContext.containsBean(endpoint.getGroup())) { containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class); } else { containerGroup = new ArrayList<MessageListenerContainer>(); this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup); } containerGroup.add(container); } if (startImmediately) { //开启监听入口 startIfNecessary(container); } } }
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
@Override
public final void start() {
synchronized (this.lifecycleMonitor) {
Assert.isTrue(
this.containerProperties.getMessageListener() instanceof KafkaDataListener,
"A " + KafkaDataListener.class.getName() + " implementation must be provided");
doStart();
}
}
@Override protected void doStart() { if (!isRunning()) { ContainerProperties containerProperties = getContainerProperties(); TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions(); if (topicPartitions != null && this.concurrency > topicPartitions.length) { this.logger.warn("When specific partitions are provided, the concurrency must be less than or " + "equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length); this.concurrency = topicPartitions.length; } setRunning(true); //根据主题中的分区数和concurrency数计算,决定创建多少KafkaMessageListenerContainer,也就是开启多少线程 for (int i = 0; i < this.concurrency; i++) { KafkaMessageListenerContainer<K, V> container; if (topicPartitions == null) { container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties); } else { container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties, partitionSubset(containerProperties, i)); } if (getBeanName() != null) { container.setBeanName(getBeanName() + "-" + i); } if (getApplicationEventPublisher() != null) { container.setApplicationEventPublisher(getApplicationEventPublisher()); } container.setClientIdSuffix("-" + i); //KafkaMessageListenerContainer的doStart() container.start(); this.containers.add(container); } } }
@Override protected void doStart() { if (isRunning()) { return; } ContainerProperties containerProperties = getContainerProperties(); if (!this.consumerFactory.isAutoCommit()) { AckMode ackMode = containerProperties.getAckMode(); if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) { Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0"); } if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME)) && containerProperties.getAckTime() == 0) { containerProperties.setAckTime(5000); } } Object messageListener = containerProperties.getMessageListener(); Assert.state(messageListener != null, "A MessageListener is required"); if (messageListener instanceof GenericAcknowledgingMessageListener) { this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener; } else if (messageListener instanceof GenericMessageListener) { this.listener = (GenericMessageListener<?>) messageListener; } else { throw new IllegalStateException("messageListener must be 'MessageListener' " + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName()); } if (containerProperties.getConsumerTaskExecutor() == null) { SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-C-"); containerProperties.setConsumerTaskExecutor(consumerExecutor); } if (containerProperties.getListenerTaskExecutor() == null) { SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-L-"); containerProperties.setListenerTaskExecutor(listenerExecutor); } //创建ListenerConsumer this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener); setRunning(true); //开启ListenerConsumer线程 this.listenerConsumerFuture = containerProperties .getConsumerTaskExecutor() .submitListenable(this.listenerConsumer); }
@Override public void run() { if (this.theListener instanceof ConsumerSeekAware) { ((ConsumerSeekAware) this.theListener).registerSeekCallback(this); } this.count = 0; this.last = System.currentTimeMillis(); if (isRunning() && this.definedPartitions != null) { initPartitionsIfNeeded(); // we start the invoker here as there will be no rebalance calls to // trigger it, but only if the container is not set to autocommit // otherwise we will process records on a separate thread if (!this.autoCommit) { startInvoker(); } } long lastReceive = System.currentTimeMillis(); long lastAlertAt = lastReceive; while (isRunning()) { try { if (!this.autoCommit) { processCommits(); } processSeeks(); if (this.logger.isTraceEnabled()) { this.logger.trace("Polling (paused=" + this.paused + ")..."); } //拉取消息 ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout()); if (records != null && this.logger.isDebugEnabled()) { this.logger.debug("Received: " + records.count() + " records"); } if (records != null && records.count() > 0) { if (this.containerProperties.getIdleEventInterval() != null) { lastReceive = System.currentTimeMillis(); } // if the container is set to auto-commit, then execute in the // same thread // otherwise send to the buffering queue if (this.autoCommit) { //自动提交直接反射调用@kafkaListener注解的方法 invokeListener(records); } else { //非自动提交把消息放入队列中 if (sendToListener(records)) { if (this.assignedPartitions != null) { // avoid group management rebalance due to a slow // consumer this.consumer.pause(this.assignedPartitions); this.paused = true; this.unsent = records; } } } } else { if (this.containerProperties.getIdleEventInterval() != null) { long now = System.currentTimeMillis(); if (now > lastReceive + this.containerProperties.getIdleEventInterval() && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) { publishIdleContainerEvent(now - lastReceive); lastAlertAt = now; if (this.theListener instanceof ConsumerSeekAware) { seekPartitions(getAssignedPartitions(), true); } } } } this.unsent = checkPause(this.unsent); } catch (WakeupException e) { this.unsent = checkPause(this.unsent); } catch (Exception e) { if (this.containerProperties.getGenericErrorHandler() != null) { this.containerProperties.getGenericErrorHandler().handle(e, null); } else { this.logger.error("Container exception", e); } } } if (this.listenerInvokerFuture != null) { stopInvoker(); commitManualAcks(); } try { this.consumer.unsubscribe(); } catch (WakeupException e) { // No-op. Continue process } this.consumer.close(); if (this.logger.isInfoEnabled()) { this.logger.info("Consumer stopped"); } }
源码内容比较多,只截图主流程的业务,有兴趣可以了解过主流程后再细看。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。