当前位置:   article > 正文

RabbitMQ初步到精通-第十章-RabbitMQ之Spring客户端源码_the applicationcontext is closed and the connectio

the applicationcontext is closed and the connectionfactory can no longer cre

目录

第十章-RabbitMQ之Spring客户端源码

        1. 前言

2. 客户端消费代码

2.1 消费的实现方式

2.2 消费中注解解释

2.3 推测Spring实现过程

3.MQ消费源码分析

3.1 集成SpringBoot 启动过程

3.2 Broker投递消息给客户端过程

3.3 客户端消费过程

4. 总结


第十章-RabbitMQ之Spring客户端源码

1. 前言

经过前面前面的学习,我们已经掌握了rabbitmq的基本用法,高级用法延迟队列、死信队列等,已经研究过了amqp-client的java客户端源码,由于我们在使用的时候,一般还是以SpringBoot为主,那经过Spring封装后的客户端源码是是如何实现的呢?

同学们最好需要有研读过 Spring源码及SpringBoot 源码的经验,会更好衔接一下,不过关系也不大。

由于Spring 体系的庞大,封装的rabbit客户端实现的功能也很多,例 创建连接、生产者推送消息,事务,消费者消费等等内容,那我们这次只抽取rabbitmq消费的部分,进行研读。

集成starter

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2. 客户端消费代码

2.1 消费的实现方式

如之前我们提到的集成SpringBoot后的使用方式:

  1. @RabbitHandler
  2. @RabbitListener(queues = "SolarWaterHeater")
  1. @RabbitHandler
  2. @RabbitListener(queuesToDeclare = @Queue("SolarWaterHeater"))
  1. @RabbitHandler
  2. @RabbitListener(bindings = @QueueBinding(
  3. value = @Queue("SolarWaterHeater-RedWine"),
  4. key = "REDWINE",
  5. exchange = @Exchange(value = "routing-exchange", type = ExchangeTypes.DIRECT, durable = "false")))

2.2 消费中注解解释

这里面出现了两个注解

第一个:RabbitHandler 看下它的解释:

* Annotation that marks a method to be the target of a Rabbit message
* listener within a class that is annotated with {@link RabbitListener}.

如果一天类上面的注解是RabbitListener,那RabbitHandler标注的方法,即是Rabbit的消息监听者。

@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
这个注解只能标注到Method

第二个 RabbitListener

1. Annotation that marks a method to be the target of a Rabbit message listener 

标注的方法是一个消息监听者

2. When defined at the class level, a single message listener container is used to
* service all methods annotated with {@code @RabbitHandler}

如果标注到类上,那标注RabbitHandler的方法即是消息监听

链一个:@RabbitListener和@RabbitHandler的使用_sliver1836的博客-CSDN博客

2.3 推测Spring实现过程

所以,我们后续的源码分析即基于此两个注解开展。

在开始看代码之前,我们先想一想,我们之前的使用java amqp客户端开发消费逻辑的过程,

1、创建连接

2、创建Channel

3、声明队列、Exchange、绑定关系

4、监听方法实现 继承DefaultConumer

5、basic.Consume 注册到Broker

6、Broker消息推送,监听方法实现消费

那现在Spring就靠两个注解就帮我们实现了消息的消费,有没有很神奇。顿时感叹程序猿越来越幸福,写代码如此简单了呢?但有利就有弊,Spring帮我们封装的太多,而我们知道的底层却太少了。

闲话少说,到这,大家想一下,如果让你写个注解,就去实现上面6个步骤的内容,你该如何去做呢?

开发自定义注解大家都应该做过,大致的逻辑应该是不是可以,在系统启动的时候,我们就会抓取到标注注解的方法,有此类的方法时,我们认为需要使用mq,我们在后端服务中依次的去执行上面中的6个步骤。这样把注解的方法实现了监听,后续监听消息进行消费。

这里只是一个大概的推测,大家自己自行发挥想像。

3.MQ消费源码分析

从哪入手呢?首先点开 RabbitListener 的源码,然后Download源码。

到这个界面:

我们不再研读RabbitListener这个注解的功能了,大家自己看。

然后紧接着看到 RabbitListenerAnnotationBeanPostProcessor

这个类有什么特点呢?首先是处理RabbitListener 的处理类,然后呢是一个BeanPostProcessor继承了BeanPostProcessor 接口-读过Spring源码的同学,肯定就能得到最有效的信息了,这个类会在系统初始化的时候,执行postProcessAfterInitialization()这个方法。如果没读过Spring源码的话就先跟着节奏走吧。

从这开始了我们的切入。

3.1 集成SpringBoot 启动过程

接着上面的步骤呢,我们往上简单倒一下,

首先 这是一个SpringBoot 项目,通过SpringBoot 的启动类的Main 方法进行启动,然后开始扫描各个组件,初始化各种信息,这个不再细聊。【需要读SpringBoot源码】

其次呢,SpringBoot 只是对Spring 的封装,还是需要回到Spring 的类初始化的过程中去。【需要读Spring源码】

如下呢,即Spring 的核心初始化方法:无论Spring 再怎么升级,这几个核心方法基本不会怎么变化了,这里面我们找到 【registerBeanPostProcessors】,从这里面就会触发到我们上面所说的-

RabbitListenerAnnotationBeanPostProcessor

  1. @Override
  2. public void refresh() throws BeansException, IllegalStateException {
  3. synchronized (this.startupShutdownMonitor) {
  4. // Prepare this context for refreshing.
  5. prepareRefresh();
  6. // Tell the subclass to refresh the internal bean factory.
  7. ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
  8. // Prepare the bean factory for use in this context.
  9. prepareBeanFactory(beanFactory);
  10. try {
  11. // Allows post-processing of the bean factory in context subclasses.
  12. postProcessBeanFactory(beanFactory);
  13. // Invoke factory processors registered as beans in the context.
  14. invokeBeanFactoryPostProcessors(beanFactory);
  15. // Register bean processors that intercept bean creation.
  16. registerBeanPostProcessors(beanFactory);
  17. // Initialize message source for this context.
  18. initMessageSource();
  19. // Initialize event multicaster for this context.
  20. initApplicationEventMulticaster();
  21. // Initialize other special beans in specific context subclasses.
  22. onRefresh();
  23. // Check for listener beans and register them.
  24. registerListeners();
  25. // Instantiate all remaining (non-lazy-init) singletons.
  26. finishBeanFactoryInitialization(beanFactory);
  27. // Last step: publish corresponding event.
  28. finishRefresh();
  29. }
  30. catch (BeansException ex) {
  31. if (logger.isWarnEnabled()) {
  32. logger.warn("Exception encountered during context initialization - " +
  33. "cancelling refresh attempt: " + ex);
  34. }
  35. // Destroy already created singletons to avoid dangling resources.
  36. destroyBeans();
  37. // Reset 'active' flag.
  38. cancelRefresh(ex);
  39. // Propagate exception to caller.
  40. throw ex;
  41. }
  42. finally {
  43. // Reset common introspection caches in Spring's core, since we
  44. // might not ever need metadata for singleton beans anymore...
  45. resetCommonCaches();
  46. }
  47. }
  48. }

随着Spring 的启动,开始触发到了RabbitListenerAnnotationBeanPostProcessor 中的 

postProcessAfterInitialization 方法。

代码:

这就很好解释了,bean 就是我们的消费类,

解析到了 标有注解的方法 @RabbitListener,然后进行处理。processAmqpListener

  1. @Override
  2. public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
  3. Class<?> targetClass = AopUtils.getTargetClass(bean);
  4. final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
  5. for (ListenerMethod lm : metadata.listenerMethods) {
  6. for (RabbitListener rabbitListener : lm.annotations) {
  7. processAmqpListener(rabbitListener, lm.method, bean, beanName);
  8. }
  9. }
  10. if (metadata.handlerMethods.length > 0) {
  11. processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
  12. }
  13. return bean;
  14. }
  1. protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
  2. // 对应的消费方法
  3. Method methodToUse = checkProxy(method, bean);
  4. //封装对象
  5. MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
  6. endpoint.setMethod(methodToUse);
  7. // 继续处理
  8. processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
  9. }

继续:

  1. protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
  2. Object adminTarget, String beanName) {
  3. endpoint.setBean(bean);
  4. endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
  5. endpoint.setId(getEndpointId(rabbitListener));
  6. endpoint.setQueueNames(resolveQueues(rabbitListener));
  7. endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
  8. endpoint.setBeanFactory(this.beanFactory);
  9. endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
  10. Object errorHandler = resolveExpression(rabbitListener.errorHandler());
  11. if (errorHandler instanceof RabbitListenerErrorHandler) {
  12. endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler);
  13. }
  14. else if (errorHandler instanceof String) {
  15. String errorHandlerBeanName = (String) errorHandler;
  16. if (StringUtils.hasText(errorHandlerBeanName)) {
  17. endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
  18. }
  19. }
  20. else {
  21. throw new IllegalStateException("error handler mut be a bean name or RabbitListenerErrorHandler, not a "
  22. + errorHandler.getClass().toString());
  23. }
  24. String group = rabbitListener.group();
  25. if (StringUtils.hasText(group)) {
  26. Object resolvedGroup = resolveExpression(group);
  27. if (resolvedGroup instanceof String) {
  28. endpoint.setGroup((String) resolvedGroup);
  29. }
  30. }
  31. String autoStartup = rabbitListener.autoStartup();
  32. if (StringUtils.hasText(autoStartup)) {
  33. endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
  34. }
  35. endpoint.setExclusive(rabbitListener.exclusive());
  36. String priority = resolve(rabbitListener.priority());
  37. if (StringUtils.hasText(priority)) {
  38. try {
  39. endpoint.setPriority(Integer.valueOf(priority));
  40. }
  41. catch (NumberFormatException ex) {
  42. throw new BeanInitializationException("Invalid priority value for " +
  43. rabbitListener + " (must be an integer)", ex);
  44. }
  45. }
  46. // 以上 前面都完成了对 MethodRabbitListenerEndpoint 对象的封装,封装的也都是注解中的属性
  47. //此方法内部实际没执行 跳过
  48. resolveAdmin(endpoint, rabbitListener, adminTarget);
  49. //跳过
  50. RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, adminTarget, beanName);
  51. // 属性填充 放入List ,不重要
  52. this.registrar.registerEndpoint(endpoint, factory);
  53. }

程序回转:

这里面来到一个

public void afterSingletonsInstantiated() 方法,这是由于实现了接口SmartInitializingSingleton, 后续得到了处理。

这里面会涉及到两个类:

1. RabbitListenerEndpointRegistrar

2. RabbitListenerEndpointRegistry

有没有长得很像,这里面是把 RabbitListenerEndpointRegistry 手工注册到了RabbitListenerEndpointRegistrar 里面,然后进行了一系列初始化,

这里面不再详细展开了,但这个RabbitListenerEndpointRegistry 很重要,后面还会涉及到它

 RabbitListenerEndpointRegistry 实现了一个Lifecycle接口,后续会调用到它的实现start()

将对应的消费Class 做好了封装 ,返回,继续Spring的初始化过程。

来到Spring核心流程 

finishRefresh();
  1. /**
  2. * Finish the refresh of this context, invoking the LifecycleProcessor's
  3. * onRefresh() method and publishing the
  4. * {@link org.springframework.context.event.ContextRefreshedEvent}.
  5. */
  6. protected void finishRefresh() {
  7. // Clear context-level resource caches (such as ASM metadata from scanning).
  8. clearResourceCaches();
  9. // Initialize lifecycle processor for this context.
  10. initLifecycleProcessor();
  11. // Propagate refresh to lifecycle processor first.
  12. getLifecycleProcessor().onRefresh();
  13. // Publish the final event.
  14. publishEvent(new ContextRefreshedEvent(this));
  15. // Participate in LiveBeansView MBean, if active.
  16. LiveBeansView.registerApplicationContext(this);
  17. }

其中第三个方法

getLifecycleProcessor().onRefresh();

这个方法是获取 lifecycle的处理器,进行lifecycle接口实现类的处理,这就呼应到了上面的 RabbitListenerEndpointRegistry ,他实现了lifecycle的接口。

最终一番流转终于到了 这个Registry处理逻辑中:

  1. @Override
  2. public void start() {
  3. for (MessageListenerContainer listenerContainer : getListenerContainers()) {
  4. startIfNecessary(listenerContainer);
  5. }
  6. }
  1. /**
  2. * Start the specified {@link MessageListenerContainer} if it should be started
  3. * on startup or when start is called explicitly after startup.
  4. * @param listenerContainer the container.
  5. * @see MessageListenerContainer#isAutoStartup()
  6. */
  7. private void startIfNecessary(MessageListenerContainer listenerContainer) {
  8. if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
  9. listenerContainer.start();
  10. }
  11. }
MessageListenerContainer 也是在上面afterSingletonsInstantiated 处理好的,现在要启动这个监听者容器。

来到了 AbstractMessageListenerContainer 中的启动方法:

  1. /**
  2. * Start this container.
  3. * @see #doStart
  4. */
  5. @Override
  6. public void start() {
  7. if (isRunning()) {
  8. return;
  9. }
  10. if (!this.initialized) {
  11. synchronized (this.lifecycleMonitor) {
  12. if (!this.initialized) {
  13. afterPropertiesSet();
  14. }
  15. }
  16. }
  17. try {
  18. logger.debug("Starting Rabbit listener container.");
  19. configureAdminIfNeeded();
  20. checkMismatchedQueues();
  21. doStart();
  22. }
  23. catch (Exception ex) {
  24. throw convertRabbitAccessException(ex);
  25. }
  26. finally {
  27. this.lazyLoad = false;
  28. }
  29. }
configureAdminIfNeeded() 获取RabbitAdmin 
checkMismatchedQueues() 这个方法就很关键了,运行到此时打开我们的抓包工具,这里面开始创建Connection了。
  1. protected void checkMismatchedQueues() {
  2. if (this.mismatchedQueuesFatal && this.amqpAdmin != null) {
  3. try {
  4. this.amqpAdmin.initialize();
  5. }
  6. catch (AmqpConnectException e) {
  7. logger.info("Broker not available; cannot check queue declarations");
  8. }
  9. catch (AmqpIOException e) {
  10. if (RabbitUtils.isMismatchedQueueArgs(e)) {
  11. throw new FatalListenerStartupException("Mismatched queues", e);
  12. }
  13. else {
  14. logger.info("Failed to get connection during start(): " + e);
  15. }
  16. }
  17. }
  18. else {
  19. try {
  20. // 创建连接方法
  21. Connection connection = getConnectionFactory().createConnection(); // NOSONAR
  22. if (connection != null) {
  23. connection.close();
  24. }
  25. }
  26. catch (Exception e) {
  27. logger.info("Broker not available; cannot force queue declarations during start: " + e.getMessage());
  28. }
  29. }
  30. }

有没有很熟悉

Connection connection = getConnectionFactory().createConnection(); 
  1. @Override
  2. public final Connection createConnection() throws AmqpException {
  3. if (this.stopped) {
  4. throw new AmqpApplicationContextClosedException(
  5. "The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");
  6. }
  7. synchronized (this.connectionMonitor) {
  8. if (this.cacheMode == CacheMode.CHANNEL) {
  9. if (this.connection.target == null) {
  10. this.connection.target = super.createBareConnection();
  11. // invoke the listener *after* this.connection is assigned
  12. if (!this.checkoutPermits.containsKey(this.connection)) {
  13. this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
  14. }
  15. this.connection.closeNotified.set(false);
  16. getConnectionListener().onCreate(this.connection);
  17. }
  18. return this.connection;
  19. }
  20. else if (this.cacheMode == CacheMode.CONNECTION) {
  21. return connectionFromCache();
  22. }
  23. }
  24. return null; // NOSONAR - never reach here - exceptions
  25. }

运行完此步,如上的代码中,两个重要的点:

1. 此步直接就创建了Connection、

this.connection.target = super.createBareConnection();

看下抓包:

2. 继续这一步也很关键,创建完连接后,会把接下来的 Exchange、Queue、绑定关系根据注解配置中的内容,该创建的都创建一遍。

getConnectionListener().onCreate(this.connection);

直接运行到了

RabbitAdmin.initialize()

看方法头上的注释也很清晰

  1. /**
  2. * Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe
  3. * (but unnecessary) to call this method more than once.
  4. */
  5. @Override // NOSONAR complexity
  6. public void initialize() {
  7. if (this.applicationContext == null) {
  8. this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
  9. return;
  10. }
  11. this.logger.debug("Initializing declarations");
  12. Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
  13. this.applicationContext.getBeansOfType(Exchange.class).values());
  14. Collection<Queue> contextQueues = new LinkedList<Queue>(
  15. this.applicationContext.getBeansOfType(Queue.class).values());
  16. Collection<Binding> contextBindings = new LinkedList<Binding>(
  17. this.applicationContext.getBeansOfType(Binding.class).values());
  18. processLegacyCollections(contextExchanges, contextQueues, contextBindings);
  19. processDeclarables(contextExchanges, contextQueues, contextBindings);
  20. final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);
  21. final Collection<Queue> queues = filterDeclarables(contextQueues);
  22. final Collection<Binding> bindings = filterDeclarables(contextBindings);
  23. for (Exchange exchange : exchanges) {
  24. if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
  25. this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
  26. + exchange.getName()
  27. + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
  28. + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
  29. + "reopening the connection.");
  30. }
  31. }
  32. for (Queue queue : queues) {
  33. if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
  34. this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
  35. + queue.getName()
  36. + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
  37. + queue.isExclusive() + ". "
  38. + "It will be redeclared if the broker stops and is restarted while the connection factory is "
  39. + "alive, but all messages will be lost.");
  40. }
  41. }
  42. if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
  43. this.logger.debug("Nothing to declare");
  44. return;
  45. }
  46. this.rabbitTemplate.execute(channel -> {
  47. declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
  48. declareQueues(channel, queues.toArray(new Queue[queues.size()]));
  49. declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
  50. return null;
  51. });
  52. this.logger.debug("Declarations finished");
  53. }

由于我们只创建了Queue,使用默认的Exchange,代码不贴太多了,只贴声明Queue的内容:

  1. DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
  2. queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());

我们看下抓包情况:

 到此呢,Queue也声明好了。下面呢,下面就该basic.Consume 了吧,把消费者注册到Broker中去。

好,我们继续:

继续代码又倒回去,倒到:

  1. /**
  2. * Start this container.
  3. * @see #doStart
  4. */
  5. @Override
  6. public void start() {
  7. if (isRunning()) {
  8. return;
  9. }
  10. if (!this.initialized) {
  11. synchronized (this.lifecycleMonitor) {
  12. if (!this.initialized) {
  13. afterPropertiesSet();
  14. }
  15. }
  16. }
  17. try {
  18. logger.debug("Starting Rabbit listener container.");
  19. configureAdminIfNeeded();
  20. checkMismatchedQueues();
  21. doStart();
  22. }
  23. catch (Exception ex) {
  24. throw convertRabbitAccessException(ex);
  25. }
  26. finally {
  27. this.lazyLoad = false;
  28. }
  29. }
doStart(); 

一看doxxx,那一定是要干实际的事情的,很重要对吧,

我们进入到 

SimpleMessageListenerContainer

中的实现方法中:

  1. /**
  2. * Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer
  3. * to this container's task executor.
  4. */
  5. @Override
  6. protected void doStart() {
  7. checkListenerContainerAware();
  8. super.doStart();
  9. synchronized (this.consumersMonitor) {
  10. if (this.consumers != null) {
  11. throw new IllegalStateException("A stopped container should not have consumers");
  12. }
  13. int newConsumers = initializeConsumers();
  14. if (this.consumers == null) {
  15. logger.info("Consumers were initialized and then cleared " +
  16. "(presumably the container was stopped concurrently)");
  17. return;
  18. }
  19. if (newConsumers <= 0) {
  20. if (logger.isInfoEnabled()) {
  21. logger.info("Consumers are already running");
  22. }
  23. return;
  24. }
  25. Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
  26. for (BlockingQueueConsumer consumer : this.consumers) {
  27. AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
  28. processors.add(processor);
  29. getTaskExecutor().execute(processor);
  30. if (getApplicationEventPublisher() != null) {
  31. getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
  32. }
  33. }
  34. waitForConsumersToStart(processors);
  35. }
  36. }

前面几步意义不大,走到

int newConsumers = initializeConsumers();
  1. protected int initializeConsumers() {
  2. int count = 0;
  3. synchronized (this.consumersMonitor) {
  4. if (this.consumers == null) {
  5. this.cancellationLock.reset();
  6. this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
  7. for (int i = 0; i < this.concurrentConsumers; i++) {
  8. BlockingQueueConsumer consumer = createBlockingQueueConsumer();
  9. this.consumers.add(consumer);
  10. count++;
  11. }
  12. }
  13. }
  14. return count;
  15. }

重点来咯,

BlockingQueueConsumer consumer = createBlockingQueueConsumer();

这里把BlockingQueueConsumer做了一个初始化,相关的不再展开。

BlockingQueueConsumer -这将是后续我们非常重要的一个类

继续重点内容,回到我们上面代码块中的内容:

  1. for (BlockingQueueConsumer consumer : this.consumers) {
  2. AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
  3. processors.add(processor);
  4. getTaskExecutor().execute(processor);
  5. if (getApplicationEventPublisher() != null) {
  6. getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
  7. }
  8. }

这个for循环很重要了,由于我们是一个消费者,循环一次。

初始化一个

AsyncMessageProcessingConsumer

对象。这个对象点进去,大家看下这是个实现了Runnable接口的线程对象。哦哦,真正的核心哦。使用 SimpleAsyncTaskExecutor   来new的线程,这个执行器可不是线程池哦,来一个线程就会New一个,大家自行研究。

这里面我们可以得到一个结论,就是一个消费者,就会开启一个线程进行监听。

从此开启了新线程,【打断点记得Thread模式】

看线程的实现:

  1. @Override // NOSONAR - complexity - many catch blocks
  2. public void run() { // NOSONAR - line count
  3. if (!isActive()) {
  4. return;
  5. }
  6. boolean aborted = false;
  7. this.consumer.setLocallyTransacted(isChannelLocallyTransacted());
  8. String routingLookupKey = getRoutingLookupKey();
  9. if (routingLookupKey != null) {
  10. SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey); // NOSONAR both never null
  11. }
  12. if (this.consumer.getQueueCount() < 1) {
  13. if (logger.isDebugEnabled()) {
  14. logger.debug("Consumer stopping; no queues for " + this.consumer);
  15. }
  16. SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
  17. if (getApplicationEventPublisher() != null) {
  18. getApplicationEventPublisher().publishEvent(
  19. new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
  20. }
  21. this.start.countDown();
  22. return;
  23. }
  24. try {
  25. initialize();
  26. while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
  27. mainLoop();
  28. }
  29. }

摘出核心点:

1、initialize();

  1. private void initialize() throws Throwable { // NOSONAR
  2. try {
  3. redeclareElementsIfNecessary();
  4. this.consumer.start();
  5. this.start.countDown();
  6. }

初始化内容,

1.  redeclareElementsIfNecessary - 这个是再进行检查进行Exchange 、Queue、Binding的声明与前面声明的方法实现的共用。

2.this.consumer.start();  

  1. public void start() throws AmqpException {
  2. if (logger.isDebugEnabled()) {
  3. logger.debug("Starting consumer " + this);
  4. }
  5. this.thread = Thread.currentThread();
  6. try {
  7. this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,
  8. this.transactional);
  9. this.channel = this.resourceHolder.getChannel();
  10. ClosingRecoveryListener.addRecoveryListenerIfNecessary(this.channel); // NOSONAR never null here
  11. }
  12. catch (AmqpAuthenticationException e) {
  13. throw new FatalListenerStartupException("Authentication failure", e);
  14. }
  15. this.deliveryTags.clear();
  16. this.activeObjectCounter.add(this);
  17. passiveDeclarations();
  18. setQosAndreateConsumers();
  19. }
这里面我们看这个方法就行
setQosAndreateConsumers();

Qos是设定消费时每次抓取的数量

并CreadConsumers

  1. private void setQosAndreateConsumers() {
  2. if (!this.acknowledgeMode.isAutoAck() && !cancelled()) {
  3. // Set basicQos before calling basicConsume (otherwise if we are not acking the broker
  4. // will send blocks of 100 messages)
  5. try {
  6. this.channel.basicQos(this.prefetchCount);
  7. }
  8. catch (IOException e) {
  9. this.activeObjectCounter.release(this);
  10. throw new AmqpIOException(e);
  11. }
  12. }
  13. try {
  14. if (!cancelled()) {
  15. for (String queueName : this.queues) {
  16. if (!this.missingQueues.contains(queueName)) {
  17. consumeFromQueue(queueName);
  18. }
  19. }
  20. }
  21. }
  22. catch (IOException e) {
  23. throw RabbitExceptionTranslator.convertRabbitAccessException(e);
  24. }
  25. }

有没有很熟悉:

this.channel.basicQos(this.prefetchCount);

抓包:

继续:

consumeFromQueue(queueName);
  1. private void consumeFromQueue(String queue) throws IOException {
  2. InternalConsumer consumer = new InternalConsumer(this.channel, queue);
  3. String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
  4. (this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
  5. this.exclusive, this.consumerArgs,
  6. consumer);
  7. if (consumerTag != null) {
  8. this.consumers.put(queue, consumer);
  9. if (logger.isDebugEnabled()) {
  10. logger.debug("Started on queue '" + queue + "' with tag " + consumerTag + ": " + this);
  11. }
  12. }
  13. else {
  14. logger.error("Null consumer tag received for queue " + queue);
  15. }
  16. }

 有没有很熟悉:

String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
      (this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
      this.exclusive, this.consumerArgs,
      consumer);

那这里有有一个核心的类出现了。InternalConsumer

这里转向 3.2 Broker投递消息给客户端  解释

到这里呢,我们把消费者注册到了Broker中去了,看下抓包情况:

 到这呢,所以Broker也就能给我们投递消息了。

2、mainLoop();

  1. initialize();
  2. while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
  3. mainLoop();
  4. }

这里也有个mainLoop ,于是想到了,java 的amqp客户端也存在呢mainLoop ,这里的逻辑难道也和他的逻辑契合的?我们转向 3.3 客户端消费过程继续。

3.2 Broker投递消息给客户端过程

上面说到了,已经将消费者注册到了Broker中去了,但一定注意哦,注册到Broker 中的,可不是我们使用注解 RabbitListener 标注的实际消费方法哦,而是新创建了一个内部的消费者:InternalConsumer

我们看下他的一个实现

  1. private final class InternalConsumer extends DefaultConsumer {
  2. private final String queueName;
  3. boolean canceled;
  4. InternalConsumer(Channel channel, String queue) {
  5. super(channel);
  6. this.queueName = queue;
  7. }
  8. @Override
  9. public void handleConsumeOk(String consumerTag) {
  10. super.handleConsumeOk(consumerTag);
  11. if (logger.isDebugEnabled()) {
  12. logger.debug("ConsumeOK: " + BlockingQueueConsumer.this);
  13. }
  14. if (BlockingQueueConsumer.this.applicationEventPublisher != null) {
  15. BlockingQueueConsumer.this.applicationEventPublisher
  16. .publishEvent(new ConsumeOkEvent(this, this.queueName, consumerTag));
  17. }
  18. }
  19. @Override
  20. public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
  21. if (logger.isDebugEnabled()) {
  22. if (RabbitUtils.isNormalShutdown(sig)) {
  23. logger.debug("Received shutdown signal for consumer tag=" + consumerTag + ": " + sig.getMessage());
  24. }
  25. else {
  26. logger.debug("Received shutdown signal for consumer tag=" + consumerTag, sig);
  27. }
  28. }
  29. BlockingQueueConsumer.this.shutdown = sig;
  30. // The delivery tags will be invalid if the channel shuts down
  31. BlockingQueueConsumer.this.deliveryTags.clear();
  32. BlockingQueueConsumer.this.activeObjectCounter.release(BlockingQueueConsumer.this);
  33. }
  34. @Override
  35. public void handleCancel(String consumerTag) throws IOException {
  36. if (logger.isWarnEnabled()) {
  37. logger.warn("Cancel received for " + consumerTag + " ("
  38. + this.queueName
  39. + "); " + BlockingQueueConsumer.this);
  40. }
  41. BlockingQueueConsumer.this.consumers.remove(this.queueName);
  42. if (!BlockingQueueConsumer.this.consumers.isEmpty()) {
  43. basicCancel(false);
  44. }
  45. else {
  46. BlockingQueueConsumer.this.cancelled.set(true);
  47. }
  48. }
  49. @Override
  50. public void handleCancelOk(String consumerTag) {
  51. if (logger.isDebugEnabled()) {
  52. logger.debug("Received cancelOk for tag " + consumerTag + " ("
  53. + this.queueName
  54. + "); " + BlockingQueueConsumer.this);
  55. }
  56. this.canceled = true;
  57. }
  58. @Override
  59. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
  60. byte[] body) {
  61. if (logger.isDebugEnabled()) {
  62. logger.debug("Storing delivery for consumerTag: '"
  63. + consumerTag + "' with deliveryTag: '" + envelope.getDeliveryTag() + "' in "
  64. + BlockingQueueConsumer.this);
  65. }
  66. try {
  67. if (BlockingQueueConsumer.this.abortStarted > 0) {
  68. if (!BlockingQueueConsumer.this.queue.offer(
  69. new Delivery(consumerTag, envelope, properties, body, this.queueName),
  70. BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
  71. Channel channelToClose = super.getChannel();
  72. RabbitUtils.setPhysicalCloseRequired(channelToClose, true);
  73. // Defensive - should never happen
  74. BlockingQueueConsumer.this.queue.clear();
  75. if (!this.canceled) {
  76. getChannel().basicCancel(consumerTag);
  77. }
  78. try {
  79. channelToClose.close();
  80. }
  81. catch (@SuppressWarnings("unused") TimeoutException e) {
  82. // no-op
  83. }
  84. }
  85. }
  86. else {
  87. BlockingQueueConsumer.this.queue
  88. .put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
  89. }
  90. }
  91. catch (@SuppressWarnings("unused") InterruptedException e) {
  92. Thread.currentThread().interrupt();
  93. }
  94. catch (Exception e) {
  95. BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
  96. }
  97. }
  98. @Override
  99. public String toString() {
  100. return "InternalConsumer{" + "queue='" + this.queueName + '\'' +
  101. ", consumerTag='" + getConsumerTag() + '\'' +
  102. '}';
  103. }
  104. }

哇,内部类,而且继承了 DefaultConsumer ,这和我们前面学习Rabbitmq工作模式的过程中,自己手动开发的代码一样了吧,那我找到 投递方法:

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,

好亲切有木有,所以到这里真相大白咯。Broker将消息投递到了这里,我们看看他接收到消息搞什么动作?

  1. BlockingQueueConsumer.this.queue
  2. .put(new Delivery(consumerTag, envelope, properties, body, this.queueName));

很明显,和java amqp client 实现一样,他这也用到了Queue,去存储了,

this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);

也是个阻塞Queue哦,看来spring搞了一通,从客户端那边的queue里拿来,又放了一次queue。

那放进去了,就等着取呗,看谁来取咯。

3.3 客户端消费过程

接续上面的 mainLoop(),既然消息又存到了本地的queue中,那mainLoop 的目的岂不是很明确了,那就是死循环的去取消息消息,然后再转调到我们实际的 加入@RabbitListener 的方法中去呢。究竟是不是呢,验证下:

  1. private void mainLoop() throws Exception { // NOSONAR Exception
  2. try {
  3. boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
  4. if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
  5. checkAdjust(receivedOk);
  6. }
  7. long idleEventInterval = getIdleEventInterval();
  8. if (idleEventInterval > 0) {
  9. if (receivedOk) {
  10. updateLastReceive();
  11. }
  12. else {
  13. long now = System.currentTimeMillis();
  14. long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
  15. long lastReceive = getLastReceive();
  16. if (now > lastReceive + idleEventInterval
  17. && now > lastAlertAt + idleEventInterval
  18. && SimpleMessageListenerContainer.this.lastNoMessageAlert
  19. .compareAndSet(lastAlertAt, now)) {
  20. publishIdleContainerEvent(now - lastReceive);
  21. }
  22. }
  23. }
  24. }
  25. catch (ListenerExecutionFailedException ex) {
  26. // Continue to process, otherwise re-throw
  27. if (ex.getCause() instanceof NoSuchMethodException) {
  28. throw new FatalListenerExecutionException("Invalid listener", ex);
  29. }
  30. }
  31. catch (AmqpRejectAndDontRequeueException rejectEx) {
  32. /*
  33. * These will normally be wrapped by an LEFE if thrown by the
  34. * listener, but we will also honor it if thrown by an
  35. * error handler.
  36. */
  37. }
  38. }

看下重点方法:

boolean receivedOk = receiveAndExecute(this.consumer); 
  1. private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR
  2. PlatformTransactionManager transactionManager = getTransactionManager();
  3. if (transactionManager != null) {
  4. try {
  5. if (this.transactionTemplate == null) {
  6. this.transactionTemplate =
  7. new TransactionTemplate(transactionManager, getTransactionAttribute());
  8. }
  9. return this.transactionTemplate
  10. .execute(status -> { // NOSONAR null never returned
  11. RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(
  12. new RabbitResourceHolder(consumer.getChannel(), false),
  13. getConnectionFactory(), true);
  14. // unbound in ResourceHolderSynchronization.beforeCompletion()
  15. try {
  16. return doReceiveAndExecute(consumer);
  17. }
  18. catch (RuntimeException e1) {
  19. prepareHolderForRollback(resourceHolder, e1);
  20. throw e1;
  21. }
  22. catch (Exception e2) {
  23. throw new WrappedTransactionException(e2);
  24. }
  25. });
  26. }
  27. catch (WrappedTransactionException e) { // NOSONAR exception flow control
  28. throw (Exception) e.getCause();
  29. }
  30. }
  31. return doReceiveAndExecute(consumer);
  32. }

抛开事务,我们不关注。

return doReceiveAndExecute(consumer);
  1. private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
  2. Channel channel = consumer.getChannel();
  3. for (int i = 0; i < this.txSize; i++) {
  4. logger.trace("Waiting for message from consumer.");
  5. Message message = consumer.nextMessage(this.receiveTimeout);
  6. if (message == null) {
  7. break;
  8. }
  9. try {
  10. executeListener(channel, message);
  11. }

重点哦:

			Message message = consumer.nextMessage(this.receiveTimeout);

从内部消费者取消息咯

  1. public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
  2. if (logger.isTraceEnabled()) {
  3. logger.trace("Retrieving delivery for " + this);
  4. }
  5. checkShutdown();
  6. if (this.missingQueues.size() > 0) {
  7. checkMissingQueues();
  8. }
  9. Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
  10. if (message == null && this.cancelled.get()) {
  11. throw new ConsumerCancelledException();
  12. }
  13. return message;
  14. }

看到poll 我们就放心了,把消息取出来,包装成Message对象。

快调头回来,继续看:

  1. try {
  2. executeListener(channel, message);
  3. }

这就要真正处理这个消息了

  1. protected void executeListener(Channel channel, Message messageIn) {
  2. if (!isRunning()) {
  3. if (logger.isWarnEnabled()) {
  4. logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);
  5. }
  6. throw new MessageRejectedWhileStoppingException();
  7. }
  8. try {
  9. doExecuteListener(channel, messageIn);
  10. }
  11. catch (RuntimeException ex) {
  12. if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {
  13. if (this.statefulRetryFatalWithNullMessageId) {
  14. throw new FatalListenerExecutionException(
  15. "Illegal null id in message. Failed to manage retry for message: " + messageIn, ex);
  16. }
  17. else {
  18. throw new ListenerExecutionFailedException("Cannot retry message more than once without an ID",
  19. new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex),
  20. messageIn);
  21. }
  22. }
  23. handleListenerException(ex);
  24. throw ex;
  25. }
  26. }

代码不往下贴了,继续追就可以,最终还是找到了,打标@RabbitListener的那个方法上,得到了执行。真正让业务逻辑执行到了MQ推送过来的消息,

太不容易了,消息从发送-> Exchange->Queue -> java amqp client  ->spring client - >consume 最终得到了消费。

4. 总结

小结一下,我们从注解RabbitHandler RabbitListener 入手,一步步追踪到 与Broker链接的创建,Queue的声明,接着,启动新线程 注册一个内部的消费者到Broker中,Broker有消息的时候会推送到本地的BlockingQueue中去。

使用MainLoop 消费本地Blockinqueue的内容

贴个小图:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/559824
推荐阅读
相关标签
  

闽ICP备14008679号