当前位置:   article > 正文

spring-kafka通过@KafkaListener实现消费者监听流程分析

kafkalistener

主流程

处理EnableKafka注解

首先通过@EnableKafka注解,注入KafkaBootstrapConfiguration类

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
// @Import是spring提供的注入类的方式
@Import(KafkaBootstrapConfiguration.class)
public @interface EnableKafka {
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

再注入KafkaListenerAnnotationBeanPostProcessor

@Configuration
public class KafkaBootstrapConfiguration {

	@SuppressWarnings("rawtypes")
	@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationProcessor() {
		// 注入KafkaListenerAnnotationBeanPostProcessor
		return new KafkaListenerAnnotationBeanPostProcessor();
	}

	@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
	public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
		return new KafkaListenerEndpointRegistry();
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

实现BeanPostProcessor接口

可以看到KafkaListenerAnnotationBeanPostProcessor实现了BeanPostProcessor,那自然存在postProcessBeforeInitialization和postProcessAfterInitialization两个关键方法

public class KafkaListenerAnnotationBeanPostProcessor<K, V>
		implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
		// 代码省略...
}
  • 1
  • 2
  • 3
  • 4

postProcessAfterInitialization扫描@KafkaListener

	@Override
	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
			Class<?> targetClass = AopUtils.getTargetClass(bean);
			//收集注解方法
			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
			final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
			final List<Method> multiMethods = new ArrayList<Method>();
			Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {

						@Override
						public Set<KafkaListener> inspect(Method method) {
							Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
							return (!listenerMethods.isEmpty() ? listenerMethods : null);
						}

					});
			if (hasClassLevelListeners) {
				Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
						new ReflectionUtils.MethodFilter() {

							@Override
							public boolean matches(Method method) {
								return AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null;
							}

						});
				multiMethods.addAll(methodsWithHandler);
			}
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(bean.getClass());
				if (this.logger.isTraceEnabled()) {
					this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());
				}
			}
			else {
				// Non-empty set of methods
				for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
					Method method = entry.getKey();
					for (KafkaListener listener : entry.getValue()) {
						//遍历处理
						processKafkaListener(listener, method, bean, beanName);
					}
				}
				if (this.logger.isDebugEnabled()) {
					this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
							+ beanName + "': " + annotatedMethods);
				}
			}
			if (hasClassLevelListeners) {
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
		Method methodToUse = checkProxy(method, bean);
		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<K, V>();
		endpoint.setMethod(methodToUse);
		endpoint.setBeanFactory(this.beanFactory);
		processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean,
			Object adminTarget, String beanName) {
		//把kafkaListener中的属性封装成对象
		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(kafkaListener));
		endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
		endpoint.setTopics(resolveTopics(kafkaListener));
		endpoint.setTopicPattern(resolvePattern(kafkaListener));
		String group = kafkaListener.containerGroup();
		if (StringUtils.hasText(group)) {
			Object resolvedGroup = resolveExpression(group);
			if (resolvedGroup instanceof String) {
				endpoint.setGroup((String) resolvedGroup);
			}
		}

		KafkaListenerContainerFactory<?> factory = null;
		String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
		if (StringUtils.hasText(containerFactoryBeanName)) {
			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
			try {
				factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
			}
			catch (NoSuchBeanDefinitionException ex) {
				throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
						+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
						+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
			}
		}
		//交给KafkaListenerEndpointRegistry处理
		this.registrar.registerEndpoint(endpoint, factory);
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
		Assert.notNull(endpoint, "Endpoint must be set");
		Assert.hasText(endpoint.getId(), "Endpoint id must be set");
		// Factory may be null, we defer the resolution right before actually creating the container
		KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
		synchronized (this.endpointDescriptors) {
			if (this.startImmediately) { // Register and start immediately
				this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
						resolveContainerFactory(descriptor), true);
			}
			else {
				this.endpointDescriptors.add(descriptor);
			}
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

registerListenerContainer注册监听

public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
			boolean startImmediately) {
		Assert.notNull(endpoint, "Endpoint must not be null");
		Assert.notNull(factory, "Factory must not be null");

		String id = endpoint.getId();
		Assert.hasText(id, "Endpoint id must not be empty");
		synchronized (this.listenerContainers) {
			Assert.state(!this.listenerContainers.containsKey(id),
					"Another endpoint is already registered with id '" + id + "'");
			//创建ConcurrentMessageListenerContainer
			MessageListenerContainer container = createListenerContainer(endpoint, factory);
			this.listenerContainers.put(id, container);
			if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
				List<MessageListenerContainer> containerGroup;
				if (this.applicationContext.containsBean(endpoint.getGroup())) {
					containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
				}
				else {
					containerGroup = new ArrayList<MessageListenerContainer>();
					this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
				}
				containerGroup.add(container);
			}
			if (startImmediately) {
				//开启监听入口
				startIfNecessary(container);
			}
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
private void startIfNecessary(MessageListenerContainer listenerContainer) {
	if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
		listenerContainer.start();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
@Override
public final void start() {
	synchronized (this.lifecycleMonitor) {
		Assert.isTrue(
				this.containerProperties.getMessageListener() instanceof KafkaDataListener,
				"A " + KafkaDataListener.class.getName() + " implementation must be provided");
		doStart();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

concurrency参数决定线程数

@Override
protected void doStart() {
	if (!isRunning()) {
		ContainerProperties containerProperties = getContainerProperties();
		TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
		if (topicPartitions != null
				&& this.concurrency > topicPartitions.length) {
			this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
					+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
					+ topicPartitions.length);
			this.concurrency = topicPartitions.length;
		}
		setRunning(true);
		
		//根据主题中的分区数和concurrency数计算,决定创建多少KafkaMessageListenerContainer,也就是开启多少线程
		for (int i = 0; i < this.concurrency; i++) {
			KafkaMessageListenerContainer<K, V> container;
			if (topicPartitions == null) {
				container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
			}
			else {
				container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
						partitionSubset(containerProperties, i));
			}
			if (getBeanName() != null) {
				container.setBeanName(getBeanName() + "-" + i);
			}
			if (getApplicationEventPublisher() != null) {
				container.setApplicationEventPublisher(getApplicationEventPublisher());
			}
			container.setClientIdSuffix("-" + i);
			//KafkaMessageListenerContainer的doStart()
			container.start();
			this.containers.add(container);
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

开启ListenerConsumer线程

@Override
protected void doStart() {
	if (isRunning()) {
		return;
	}
	ContainerProperties containerProperties = getContainerProperties();

	if (!this.consumerFactory.isAutoCommit()) {
		AckMode ackMode = containerProperties.getAckMode();
		if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
			Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
		}
		if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
				&& containerProperties.getAckTime() == 0) {
			containerProperties.setAckTime(5000);
		}
	}

	Object messageListener = containerProperties.getMessageListener();
	Assert.state(messageListener != null, "A MessageListener is required");
	if (messageListener instanceof GenericAcknowledgingMessageListener) {
		this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
	}
	else if (messageListener instanceof GenericMessageListener) {
		this.listener = (GenericMessageListener<?>) messageListener;
	}
	else {
		throw new IllegalStateException("messageListener must be 'MessageListener' "
				+ "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
	}
	if (containerProperties.getConsumerTaskExecutor() == null) {
		SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
				(getBeanName() == null ? "" : getBeanName()) + "-C-");
		containerProperties.setConsumerTaskExecutor(consumerExecutor);
	}
	if (containerProperties.getListenerTaskExecutor() == null) {
		SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor(
				(getBeanName() == null ? "" : getBeanName()) + "-L-");
		containerProperties.setListenerTaskExecutor(listenerExecutor);
	}
	//创建ListenerConsumer
	this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
	setRunning(true);
	//开启ListenerConsumer线程
	this.listenerConsumerFuture = containerProperties
			.getConsumerTaskExecutor()
			.submitListenable(this.listenerConsumer);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

run方法调用kafka的poll方法拉取消息

@Override
public void run() {
	if (this.theListener instanceof ConsumerSeekAware) {
		((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
	}
	this.count = 0;
	this.last = System.currentTimeMillis();
	if (isRunning() && this.definedPartitions != null) {
		initPartitionsIfNeeded();
		// we start the invoker here as there will be no rebalance calls to
		// trigger it, but only if the container is not set to autocommit
		// otherwise we will process records on a separate thread
		if (!this.autoCommit) {
			startInvoker();
		}
	}
	long lastReceive = System.currentTimeMillis();
	long lastAlertAt = lastReceive;
	while (isRunning()) {
		try {
			if (!this.autoCommit) {
				processCommits();
			}
			processSeeks();
			if (this.logger.isTraceEnabled()) {
				this.logger.trace("Polling (paused=" + this.paused + ")...");
			}
			//拉取消息
			ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
			if (records != null && this.logger.isDebugEnabled()) {
				this.logger.debug("Received: " + records.count() + " records");
			}
			if (records != null && records.count() > 0) {
				if (this.containerProperties.getIdleEventInterval() != null) {
					lastReceive = System.currentTimeMillis();
				}
				// if the container is set to auto-commit, then execute in the
				// same thread
				// otherwise send to the buffering queue
				if (this.autoCommit) {
					//自动提交直接反射调用@kafkaListener注解的方法
					invokeListener(records);
				}
				else {
					//非自动提交把消息放入队列中
					if (sendToListener(records)) {
						if (this.assignedPartitions != null) {
							// avoid group management rebalance due to a slow
							// consumer
							this.consumer.pause(this.assignedPartitions);
							this.paused = true;
							this.unsent = records;
						}
					}
				}
			}
			else {
				if (this.containerProperties.getIdleEventInterval() != null) {
					long now = System.currentTimeMillis();
					if (now > lastReceive + this.containerProperties.getIdleEventInterval()
							&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
						publishIdleContainerEvent(now - lastReceive);
						lastAlertAt = now;
						if (this.theListener instanceof ConsumerSeekAware) {
							seekPartitions(getAssignedPartitions(), true);
						}
					}
				}
			}
			this.unsent = checkPause(this.unsent);
		}
		catch (WakeupException e) {
			this.unsent = checkPause(this.unsent);
		}
		catch (Exception e) {
			if (this.containerProperties.getGenericErrorHandler() != null) {
				this.containerProperties.getGenericErrorHandler().handle(e, null);
			}
			else {
				this.logger.error("Container exception", e);
			}
		}
	}
	if (this.listenerInvokerFuture != null) {
		stopInvoker();
		commitManualAcks();
	}
	try {
		this.consumer.unsubscribe();
	}
	catch (WakeupException e) {
		// No-op. Continue process
	}
	this.consumer.close();
	if (this.logger.isInfoEnabled()) {
		this.logger.info("Consumer stopped");
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98

源码内容比较多,只截图主流程的业务,有兴趣可以了解过主流程后再细看。

流程图

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/483980
推荐阅读
相关标签
  

闽ICP备14008679号