当前位置:   article > 正文

RabbitMQ初步到精通-第十章-RabbitMQ之Spring客户端源码_the applicationcontext is closed and the connectio

the applicationcontext is closed and the connectionfactory can no longer cre



        1. 前言

2. 客户端消费代码

2.1 消费的实现方式

2.2 消费中注解解释

2.3 推测Spring实现过程


3.1 集成SpringBoot 启动过程

3.2 Broker投递消息给客户端过程

3.3 客户端消费过程

4. 总结


1. 前言


同学们最好需要有研读过 Spring源码及SpringBoot 源码的经验,会更好衔接一下,不过关系也不大。

由于Spring 体系的庞大,封装的rabbit客户端实现的功能也很多,例 创建连接、生产者推送消息,事务,消费者消费等等内容,那我们这次只抽取rabbitmq消费的部分,进行研读。


  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2. 客户端消费代码

2.1 消费的实现方式


  1. @RabbitHandler
  2. @RabbitListener(queues = "SolarWaterHeater")
  1. @RabbitHandler
  2. @RabbitListener(queuesToDeclare = @Queue("SolarWaterHeater"))
  1. @RabbitHandler
  2. @RabbitListener(bindings = @QueueBinding(
  3. value = @Queue("SolarWaterHeater-RedWine"),
  4. key = "REDWINE",
  5. exchange = @Exchange(value = "routing-exchange", type = ExchangeTypes.DIRECT, durable = "false")))

2.2 消费中注解解释


第一个:RabbitHandler 看下它的解释:

* Annotation that marks a method to be the target of a Rabbit message
* listener within a class that is annotated with {@link RabbitListener}.


@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })

第二个 RabbitListener

1. Annotation that marks a method to be the target of a Rabbit message listener 


2. When defined at the class level, a single message listener container is used to
* service all methods annotated with {@code @RabbitHandler}



2.3 推测Spring实现过程


在开始看代码之前,我们先想一想,我们之前的使用java amqp客户端开发消费逻辑的过程,




4、监听方法实现 继承DefaultConumer

5、basic.Consume 注册到Broker







从哪入手呢?首先点开 RabbitListener 的源码,然后Download源码。



然后紧接着看到 RabbitListenerAnnotationBeanPostProcessor

这个类有什么特点呢?首先是处理RabbitListener 的处理类,然后呢是一个BeanPostProcessor继承了BeanPostProcessor 接口-读过Spring源码的同学,肯定就能得到最有效的信息了,这个类会在系统初始化的时候,执行postProcessAfterInitialization()这个方法。如果没读过Spring源码的话就先跟着节奏走吧。


3.1 集成SpringBoot 启动过程


首先 这是一个SpringBoot 项目,通过SpringBoot 的启动类的Main 方法进行启动,然后开始扫描各个组件,初始化各种信息,这个不再细聊。【需要读SpringBoot源码】

其次呢,SpringBoot 只是对Spring 的封装,还是需要回到Spring 的类初始化的过程中去。【需要读Spring源码】

如下呢,即Spring 的核心初始化方法:无论Spring 再怎么升级,这几个核心方法基本不会怎么变化了,这里面我们找到 【registerBeanPostProcessors】,从这里面就会触发到我们上面所说的-


  1. @Override
  2. public void refresh() throws BeansException, IllegalStateException {
  3. synchronized (this.startupShutdownMonitor) {
  4. // Prepare this context for refreshing.
  5. prepareRefresh();
  6. // Tell the subclass to refresh the internal bean factory.
  7. ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
  8. // Prepare the bean factory for use in this context.
  9. prepareBeanFactory(beanFactory);
  10. try {
  11. // Allows post-processing of the bean factory in context subclasses.
  12. postProcessBeanFactory(beanFactory);
  13. // Invoke factory processors registered as beans in the context.
  14. invokeBeanFactoryPostProcessors(beanFactory);
  15. // Register bean processors that intercept bean creation.
  16. registerBeanPostProcessors(beanFactory);
  17. // Initialize message source for this context.
  18. initMessageSource();
  19. // Initialize event multicaster for this context.
  20. initApplicationEventMulticaster();
  21. // Initialize other special beans in specific context subclasses.
  22. onRefresh();
  23. // Check for listener beans and register them.
  24. registerListeners();
  25. // Instantiate all remaining (non-lazy-init) singletons.
  26. finishBeanFactoryInitialization(beanFactory);
  27. // Last step: publish corresponding event.
  28. finishRefresh();
  29. }
  30. catch (BeansException ex) {
  31. if (logger.isWarnEnabled()) {
  32. logger.warn("Exception encountered during context initialization - " +
  33. "cancelling refresh attempt: " + ex);
  34. }
  35. // Destroy already created singletons to avoid dangling resources.
  36. destroyBeans();
  37. // Reset 'active' flag.
  38. cancelRefresh(ex);
  39. // Propagate exception to caller.
  40. throw ex;
  41. }
  42. finally {
  43. // Reset common introspection caches in Spring's core, since we
  44. // might not ever need metadata for singleton beans anymore...
  45. resetCommonCaches();
  46. }
  47. }
  48. }

随着Spring 的启动,开始触发到了RabbitListenerAnnotationBeanPostProcessor 中的 

postProcessAfterInitialization 方法。


这就很好解释了,bean 就是我们的消费类,

解析到了 标有注解的方法 @RabbitListener,然后进行处理。processAmqpListener

  1. @Override
  2. public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
  3. Class<?> targetClass = AopUtils.getTargetClass(bean);
  4. final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
  5. for (ListenerMethod lm : metadata.listenerMethods) {
  6. for (RabbitListener rabbitListener : lm.annotations) {
  7. processAmqpListener(rabbitListener, lm.method, bean, beanName);
  8. }
  9. }
  10. if (metadata.handlerMethods.length > 0) {
  11. processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
  12. }
  13. return bean;
  14. }
  1. protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
  2. // 对应的消费方法
  3. Method methodToUse = checkProxy(method, bean);
  4. //封装对象
  5. MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
  6. endpoint.setMethod(methodToUse);
  7. // 继续处理
  8. processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
  9. }


  1. protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
  2. Object adminTarget, String beanName) {
  3. endpoint.setBean(bean);
  4. endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
  5. endpoint.setId(getEndpointId(rabbitListener));
  6. endpoint.setQueueNames(resolveQueues(rabbitListener));
  7. endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
  8. endpoint.setBeanFactory(this.beanFactory);
  9. endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
  10. Object errorHandler = resolveExpression(rabbitListener.errorHandler());
  11. if (errorHandler instanceof RabbitListenerErrorHandler) {
  12. endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler);
  13. }
  14. else if (errorHandler instanceof String) {
  15. String errorHandlerBeanName = (String) errorHandler;
  16. if (StringUtils.hasText(errorHandlerBeanName)) {
  17. endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
  18. }
  19. }
  20. else {
  21. throw new IllegalStateException("error handler mut be a bean name or RabbitListenerErrorHandler, not a "
  22. + errorHandler.getClass().toString());
  23. }
  24. String group = rabbitListener.group();
  25. if (StringUtils.hasText(group)) {
  26. Object resolvedGroup = resolveExpression(group);
  27. if (resolvedGroup instanceof String) {
  28. endpoint.setGroup((String) resolvedGroup);
  29. }
  30. }
  31. String autoStartup = rabbitListener.autoStartup();
  32. if (StringUtils.hasText(autoStartup)) {
  33. endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
  34. }
  35. endpoint.setExclusive(rabbitListener.exclusive());
  36. String priority = resolve(rabbitListener.priority());
  37. if (StringUtils.hasText(priority)) {
  38. try {
  39. endpoint.setPriority(Integer.valueOf(priority));
  40. }
  41. catch (NumberFormatException ex) {
  42. throw new BeanInitializationException("Invalid priority value for " +
  43. rabbitListener + " (must be an integer)", ex);
  44. }
  45. }
  46. // 以上 前面都完成了对 MethodRabbitListenerEndpoint 对象的封装,封装的也都是注解中的属性
  47. //此方法内部实际没执行 跳过
  48. resolveAdmin(endpoint, rabbitListener, adminTarget);
  49. //跳过
  50. RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, adminTarget, beanName);
  51. // 属性填充 放入List ,不重要
  52. this.registrar.registerEndpoint(endpoint, factory);
  53. }



public void afterSingletonsInstantiated() 方法,这是由于实现了接口SmartInitializingSingleton, 后续得到了处理。


1. RabbitListenerEndpointRegistrar

2. RabbitListenerEndpointRegistry

有没有长得很像,这里面是把 RabbitListenerEndpointRegistry 手工注册到了RabbitListenerEndpointRegistrar 里面,然后进行了一系列初始化,

这里面不再详细展开了,但这个RabbitListenerEndpointRegistry 很重要,后面还会涉及到它

 RabbitListenerEndpointRegistry 实现了一个Lifecycle接口,后续会调用到它的实现start()

将对应的消费Class 做好了封装 ,返回,继续Spring的初始化过程。


  1. /**
  2. * Finish the refresh of this context, invoking the LifecycleProcessor's
  3. * onRefresh() method and publishing the
  4. * {@link org.springframework.context.event.ContextRefreshedEvent}.
  5. */
  6. protected void finishRefresh() {
  7. // Clear context-level resource caches (such as ASM metadata from scanning).
  8. clearResourceCaches();
  9. // Initialize lifecycle processor for this context.
  10. initLifecycleProcessor();
  11. // Propagate refresh to lifecycle processor first.
  12. getLifecycleProcessor().onRefresh();
  13. // Publish the final event.
  14. publishEvent(new ContextRefreshedEvent(this));
  15. // Participate in LiveBeansView MBean, if active.
  16. LiveBeansView.registerApplicationContext(this);
  17. }



这个方法是获取 lifecycle的处理器,进行lifecycle接口实现类的处理,这就呼应到了上面的 RabbitListenerEndpointRegistry ,他实现了lifecycle的接口。

最终一番流转终于到了 这个Registry处理逻辑中:

  1. @Override
  2. public void start() {
  3. for (MessageListenerContainer listenerContainer : getListenerContainers()) {
  4. startIfNecessary(listenerContainer);
  5. }
  6. }
  1. /**
  2. * Start the specified {@link MessageListenerContainer} if it should be started
  3. * on startup or when start is called explicitly after startup.
  4. * @param listenerContainer the container.
  5. * @see MessageListenerContainer#isAutoStartup()
  6. */
  7. private void startIfNecessary(MessageListenerContainer listenerContainer) {
  8. if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
  9. listenerContainer.start();
  10. }
  11. }
MessageListenerContainer 也是在上面afterSingletonsInstantiated 处理好的,现在要启动这个监听者容器。

来到了 AbstractMessageListenerContainer 中的启动方法:

  1. /**
  2. * Start this container.
  3. * @see #doStart
  4. */
  5. @Override
  6. public void start() {
  7. if (isRunning()) {
  8. return;
  9. }
  10. if (!this.initialized) {
  11. synchronized (this.lifecycleMonitor) {
  12. if (!this.initialized) {
  13. afterPropertiesSet();
  14. }
  15. }
  16. }
  17. try {
  18. logger.debug("Starting Rabbit listener container.");
  19. configureAdminIfNeeded();
  20. checkMismatchedQueues();
  21. doStart();
  22. }
  23. catch (Exception ex) {
  24. throw convertRabbitAccessException(ex);
  25. }
  26. finally {
  27. this.lazyLoad = false;
  28. }
  29. }
configureAdminIfNeeded() 获取RabbitAdmin 
checkMismatchedQueues() 这个方法就很关键了,运行到此时打开我们的抓包工具,这里面开始创建Connection了。
  1. protected void checkMismatchedQueues() {
  2. if (this.mismatchedQueuesFatal && this.amqpAdmin != null) {
  3. try {
  4. this.amqpAdmin.initialize();
  5. }
  6. catch (AmqpConnectException e) {
  7. logger.info("Broker not available; cannot check queue declarations");
  8. }
  9. catch (AmqpIOException e) {
  10. if (RabbitUtils.isMismatchedQueueArgs(e)) {
  11. throw new FatalListenerStartupException("Mismatched queues", e);
  12. }
  13. else {
  14. logger.info("Failed to get connection during start(): " + e);
  15. }
  16. }
  17. }
  18. else {
  19. try {
  20. // 创建连接方法
  21. Connection connection = getConnectionFactory().createConnection(); // NOSONAR
  22. if (connection != null) {
  23. connection.close();
  24. }
  25. }
  26. catch (Exception e) {
  27. logger.info("Broker not available; cannot force queue declarations during start: " + e.getMessage());
  28. }
  29. }
  30. }


Connection connection = getConnectionFactory().createConnection(); 
  1. @Override
  2. public final Connection createConnection() throws AmqpException {
  3. if (this.stopped) {
  4. throw new AmqpApplicationContextClosedException(
  5. "The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");
  6. }
  7. synchronized (this.connectionMonitor) {
  8. if (this.cacheMode == CacheMode.CHANNEL) {
  9. if (this.connection.target == null) {
  10. this.connection.target = super.createBareConnection();
  11. // invoke the listener *after* this.connection is assigned
  12. if (!this.checkoutPermits.containsKey(this.connection)) {
  13. this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
  14. }
  15. this.connection.closeNotified.set(false);
  16. getConnectionListener().onCreate(this.connection);
  17. }
  18. return this.connection;
  19. }
  20. else if (this.cacheMode == CacheMode.CONNECTION) {
  21. return connectionFromCache();
  22. }
  23. }
  24. return null; // NOSONAR - never reach here - exceptions
  25. }


1. 此步直接就创建了Connection、

this.connection.target = super.createBareConnection();


2. 继续这一步也很关键,创建完连接后,会把接下来的 Exchange、Queue、绑定关系根据注解配置中的内容,该创建的都创建一遍。





  1. /**
  2. * Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe
  3. * (but unnecessary) to call this method more than once.
  4. */
  5. @Override // NOSONAR complexity
  6. public void initialize() {
  7. if (this.applicationContext == null) {
  8. this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
  9. return;
  10. }
  11. this.logger.debug("Initializing declarations");
  12. Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
  13. this.applicationContext.getBeansOfType(Exchange.class).values());
  14. Collection<Queue> contextQueues = new LinkedList<Queue>(
  15. this.applicationContext.getBeansOfType(Queue.class).values());
  16. Collection<Binding> contextBindings = new LinkedList<Binding>(
  17. this.applicationContext.getBeansOfType(Binding.class).values());
  18. processLegacyCollections(contextExchanges, contextQueues, contextBindings);
  19. processDeclarables(contextExchanges, contextQueues, contextBindings);
  20. final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);
  21. final Collection<Queue> queues = filterDeclarables(contextQueues);
  22. final Collection<Binding> bindings = filterDeclarables(contextBindings);
  23. for (Exchange exchange : exchanges) {
  24. if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
  25. this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
  26. + exchange.getName()
  27. + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
  28. + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
  29. + "reopening the connection.");
  30. }
  31. }
  32. for (Queue queue : queues) {
  33. if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
  34. this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
  35. + queue.getName()
  36. + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
  37. + queue.isExclusive() + ". "
  38. + "It will be redeclared if the broker stops and is restarted while the connection factory is "
  39. + "alive, but all messages will be lost.");
  40. }
  41. }
  42. if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
  43. this.logger.debug("Nothing to declare");
  44. return;
  45. }
  46. this.rabbitTemplate.execute(channel -> {
  47. declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
  48. declareQueues(channel, queues.toArray(new Queue[queues.size()]));
  49. declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
  50. return null;
  51. });
  52. this.logger.debug("Declarations finished");
  53. }


  1. DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
  2. queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());


 到此呢,Queue也声明好了。下面呢,下面就该basic.Consume 了吧,把消费者注册到Broker中去。



  1. /**
  2. * Start this container.
  3. * @see #doStart
  4. */
  5. @Override
  6. public void start() {
  7. if (isRunning()) {
  8. return;
  9. }
  10. if (!this.initialized) {
  11. synchronized (this.lifecycleMonitor) {
  12. if (!this.initialized) {
  13. afterPropertiesSet();
  14. }
  15. }
  16. }
  17. try {
  18. logger.debug("Starting Rabbit listener container.");
  19. configureAdminIfNeeded();
  20. checkMismatchedQueues();
  21. doStart();
  22. }
  23. catch (Exception ex) {
  24. throw convertRabbitAccessException(ex);
  25. }
  26. finally {
  27. this.lazyLoad = false;
  28. }
  29. }





  1. /**
  2. * Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer
  3. * to this container's task executor.
  4. */
  5. @Override
  6. protected void doStart() {
  7. checkListenerContainerAware();
  8. super.doStart();
  9. synchronized (this.consumersMonitor) {
  10. if (this.consumers != null) {
  11. throw new IllegalStateException("A stopped container should not have consumers");
  12. }
  13. int newConsumers = initializeConsumers();
  14. if (this.consumers == null) {
  15. logger.info("Consumers were initialized and then cleared " +
  16. "(presumably the container was stopped concurrently)");
  17. return;
  18. }
  19. if (newConsumers <= 0) {
  20. if (logger.isInfoEnabled()) {
  21. logger.info("Consumers are already running");
  22. }
  23. return;
  24. }
  25. Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
  26. for (BlockingQueueConsumer consumer : this.consumers) {
  27. AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
  28. processors.add(processor);
  29. getTaskExecutor().execute(processor);
  30. if (getApplicationEventPublisher() != null) {
  31. getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
  32. }
  33. }
  34. waitForConsumersToStart(processors);
  35. }
  36. }


int newConsumers = initializeConsumers();
  1. protected int initializeConsumers() {
  2. int count = 0;
  3. synchronized (this.consumersMonitor) {
  4. if (this.consumers == null) {
  5. this.cancellationLock.reset();
  6. this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
  7. for (int i = 0; i < this.concurrentConsumers; i++) {
  8. BlockingQueueConsumer consumer = createBlockingQueueConsumer();
  9. this.consumers.add(consumer);
  10. count++;
  11. }
  12. }
  13. }
  14. return count;
  15. }


BlockingQueueConsumer consumer = createBlockingQueueConsumer();


BlockingQueueConsumer -这将是后续我们非常重要的一个类


  1. for (BlockingQueueConsumer consumer : this.consumers) {
  2. AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
  3. processors.add(processor);
  4. getTaskExecutor().execute(processor);
  5. if (getApplicationEventPublisher() != null) {
  6. getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
  7. }
  8. }




对象。这个对象点进去,大家看下这是个实现了Runnable接口的线程对象。哦哦,真正的核心哦。使用 SimpleAsyncTaskExecutor   来new的线程,这个执行器可不是线程池哦,来一个线程就会New一个,大家自行研究。




  1. @Override // NOSONAR - complexity - many catch blocks
  2. public void run() { // NOSONAR - line count
  3. if (!isActive()) {
  4. return;
  5. }
  6. boolean aborted = false;
  7. this.consumer.setLocallyTransacted(isChannelLocallyTransacted());
  8. String routingLookupKey = getRoutingLookupKey();
  9. if (routingLookupKey != null) {
  10. SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey); // NOSONAR both never null
  11. }
  12. if (this.consumer.getQueueCount() < 1) {
  13. if (logger.isDebugEnabled()) {
  14. logger.debug("Consumer stopping; no queues for " + this.consumer);
  15. }
  16. SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
  17. if (getApplicationEventPublisher() != null) {
  18. getApplicationEventPublisher().publishEvent(
  19. new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
  20. }
  21. this.start.countDown();
  22. return;
  23. }
  24. try {
  25. initialize();
  26. while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
  27. mainLoop();
  28. }
  29. }



  1. private void initialize() throws Throwable { // NOSONAR
  2. try {
  3. redeclareElementsIfNecessary();
  4. this.consumer.start();
  5. this.start.countDown();
  6. }


1.  redeclareElementsIfNecessary - 这个是再进行检查进行Exchange 、Queue、Binding的声明与前面声明的方法实现的共用。


  1. public void start() throws AmqpException {
  2. if (logger.isDebugEnabled()) {
  3. logger.debug("Starting consumer " + this);
  4. }
  5. this.thread = Thread.currentThread();
  6. try {
  7. this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,
  8. this.transactional);
  9. this.channel = this.resourceHolder.getChannel();
  10. ClosingRecoveryListener.addRecoveryListenerIfNecessary(this.channel); // NOSONAR never null here
  11. }
  12. catch (AmqpAuthenticationException e) {
  13. throw new FatalListenerStartupException("Authentication failure", e);
  14. }
  15. this.deliveryTags.clear();
  16. this.activeObjectCounter.add(this);
  17. passiveDeclarations();
  18. setQosAndreateConsumers();
  19. }



  1. private void setQosAndreateConsumers() {
  2. if (!this.acknowledgeMode.isAutoAck() && !cancelled()) {
  3. // Set basicQos before calling basicConsume (otherwise if we are not acking the broker
  4. // will send blocks of 100 messages)
  5. try {
  6. this.channel.basicQos(this.prefetchCount);
  7. }
  8. catch (IOException e) {
  9. this.activeObjectCounter.release(this);
  10. throw new AmqpIOException(e);
  11. }
  12. }
  13. try {
  14. if (!cancelled()) {
  15. for (String queueName : this.queues) {
  16. if (!this.missingQueues.contains(queueName)) {
  17. consumeFromQueue(queueName);
  18. }
  19. }
  20. }
  21. }
  22. catch (IOException e) {
  23. throw RabbitExceptionTranslator.convertRabbitAccessException(e);
  24. }
  25. }





  1. private void consumeFromQueue(String queue) throws IOException {
  2. InternalConsumer consumer = new InternalConsumer(this.channel, queue);
  3. String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
  4. (this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
  5. this.exclusive, this.consumerArgs,
  6. consumer);
  7. if (consumerTag != null) {
  8. this.consumers.put(queue, consumer);
  9. if (logger.isDebugEnabled()) {
  10. logger.debug("Started on queue '" + queue + "' with tag " + consumerTag + ": " + this);
  11. }
  12. }
  13. else {
  14. logger.error("Null consumer tag received for queue " + queue);
  15. }
  16. }


String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
      (this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
      this.exclusive, this.consumerArgs,


这里转向 3.2 Broker投递消息给客户端  解释




  1. initialize();
  2. while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
  3. mainLoop();
  4. }

这里也有个mainLoop ,于是想到了,java 的amqp客户端也存在呢mainLoop ,这里的逻辑难道也和他的逻辑契合的?我们转向 3.3 客户端消费过程继续。

3.2 Broker投递消息给客户端过程

上面说到了,已经将消费者注册到了Broker中去了,但一定注意哦,注册到Broker 中的,可不是我们使用注解 RabbitListener 标注的实际消费方法哦,而是新创建了一个内部的消费者:InternalConsumer


  1. private final class InternalConsumer extends DefaultConsumer {
  2. private final String queueName;
  3. boolean canceled;
  4. InternalConsumer(Channel channel, String queue) {
  5. super(channel);
  6. this.queueName = queue;
  7. }
  8. @Override
  9. public void handleConsumeOk(String consumerTag) {
  10. super.handleConsumeOk(consumerTag);
  11. if (logger.isDebugEnabled()) {
  12. logger.debug("ConsumeOK: " + BlockingQueueConsumer.this);
  13. }
  14. if (BlockingQueueConsumer.this.applicationEventPublisher != null) {
  15. BlockingQueueConsumer.this.applicationEventPublisher
  16. .publishEvent(new ConsumeOkEvent(this, this.queueName, consumerTag));
  17. }
  18. }
  19. @Override
  20. public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
  21. if (logger.isDebugEnabled()) {
  22. if (RabbitUtils.isNormalShutdown(sig)) {
  23. logger.debug("Received shutdown signal for consumer tag=" + consumerTag + ": " + sig.getMessage());
  24. }
  25. else {
  26. logger.debug("Received shutdown signal for consumer tag=" + consumerTag, sig);
  27. }
  28. }
  29. BlockingQueueConsumer.this.shutdown = sig;
  30. // The delivery tags will be invalid if the channel shuts down
  31. BlockingQueueConsumer.this.deliveryTags.clear();
  32. BlockingQueueConsumer.this.activeObjectCounter.release(BlockingQueueConsumer.this);
  33. }
  34. @Override
  35. public void handleCancel(String consumerTag) throws IOException {
  36. if (logger.isWarnEnabled()) {
  37. logger.warn("Cancel received for " + consumerTag + " ("
  38. + this.queueName
  39. + "); " + BlockingQueueConsumer.this);
  40. }
  41. BlockingQueueConsumer.this.consumers.remove(this.queueName);
  42. if (!BlockingQueueConsumer.this.consumers.isEmpty()) {
  43. basicCancel(false);
  44. }
  45. else {
  46. BlockingQueueConsumer.this.cancelled.set(true);
  47. }
  48. }
  49. @Override
  50. public void handleCancelOk(String consumerTag) {
  51. if (logger.isDebugEnabled()) {
  52. logger.debug("Received cancelOk for tag " + consumerTag + " ("
  53. + this.queueName
  54. + "); " + BlockingQueueConsumer.this);
  55. }
  56. this.canceled = true;
  57. }
  58. @Override
  59. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
  60. byte[] body) {
  61. if (logger.isDebugEnabled()) {
  62. logger.debug("Storing delivery for consumerTag: '"
  63. + consumerTag + "' with deliveryTag: '" + envelope.getDeliveryTag() + "' in "
  64. + BlockingQueueConsumer.this);
  65. }
  66. try {
  67. if (BlockingQueueConsumer.this.abortStarted > 0) {
  68. if (!BlockingQueueConsumer.this.queue.offer(
  69. new Delivery(consumerTag, envelope, properties, body, this.queueName),
  70. BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
  71. Channel channelToClose = super.getChannel();
  72. RabbitUtils.setPhysicalCloseRequired(channelToClose, true);
  73. // Defensive - should never happen
  74. BlockingQueueConsumer.this.queue.clear();
  75. if (!this.canceled) {
  76. getChannel().basicCancel(consumerTag);
  77. }
  78. try {
  79. channelToClose.close();
  80. }
  81. catch (@SuppressWarnings("unused") TimeoutException e) {
  82. // no-op
  83. }
  84. }
  85. }
  86. else {
  87. BlockingQueueConsumer.this.queue
  88. .put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
  89. }
  90. }
  91. catch (@SuppressWarnings("unused") InterruptedException e) {
  92. Thread.currentThread().interrupt();
  93. }
  94. catch (Exception e) {
  95. BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
  96. }
  97. }
  98. @Override
  99. public String toString() {
  100. return "InternalConsumer{" + "queue='" + this.queueName + '\'' +
  101. ", consumerTag='" + getConsumerTag() + '\'' +
  102. '}';
  103. }
  104. }

哇,内部类,而且继承了 DefaultConsumer ,这和我们前面学习Rabbitmq工作模式的过程中,自己手动开发的代码一样了吧,那我找到 投递方法:

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,


  1. BlockingQueueConsumer.this.queue
  2. .put(new Delivery(consumerTag, envelope, properties, body, this.queueName));

很明显,和java amqp client 实现一样,他这也用到了Queue,去存储了,

this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);



3.3 客户端消费过程

接续上面的 mainLoop(),既然消息又存到了本地的queue中,那mainLoop 的目的岂不是很明确了,那就是死循环的去取消息消息,然后再转调到我们实际的 加入@RabbitListener 的方法中去呢。究竟是不是呢,验证下:

  1. private void mainLoop() throws Exception { // NOSONAR Exception
  2. try {
  3. boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
  4. if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
  5. checkAdjust(receivedOk);
  6. }
  7. long idleEventInterval = getIdleEventInterval();
  8. if (idleEventInterval > 0) {
  9. if (receivedOk) {
  10. updateLastReceive();
  11. }
  12. else {
  13. long now = System.currentTimeMillis();
  14. long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
  15. long lastReceive = getLastReceive();
  16. if (now > lastReceive + idleEventInterval
  17. && now > lastAlertAt + idleEventInterval
  18. && SimpleMessageListenerContainer.this.lastNoMessageAlert
  19. .compareAndSet(lastAlertAt, now)) {
  20. publishIdleContainerEvent(now - lastReceive);
  21. }
  22. }
  23. }
  24. }
  25. catch (ListenerExecutionFailedException ex) {
  26. // Continue to process, otherwise re-throw
  27. if (ex.getCause() instanceof NoSuchMethodException) {
  28. throw new FatalListenerExecutionException("Invalid listener", ex);
  29. }
  30. }
  31. catch (AmqpRejectAndDontRequeueException rejectEx) {
  32. /*
  33. * These will normally be wrapped by an LEFE if thrown by the
  34. * listener, but we will also honor it if thrown by an
  35. * error handler.
  36. */
  37. }
  38. }


boolean receivedOk = receiveAndExecute(this.consumer); 
  1. private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR
  2. PlatformTransactionManager transactionManager = getTransactionManager();
  3. if (transactionManager != null) {
  4. try {
  5. if (this.transactionTemplate == null) {
  6. this.transactionTemplate =
  7. new TransactionTemplate(transactionManager, getTransactionAttribute());
  8. }
  9. return this.transactionTemplate
  10. .execute(status -> { // NOSONAR null never returned
  11. RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(
  12. new RabbitResourceHolder(consumer.getChannel(), false),
  13. getConnectionFactory(), true);
  14. // unbound in ResourceHolderSynchronization.beforeCompletion()
  15. try {
  16. return doReceiveAndExecute(consumer);
  17. }
  18. catch (RuntimeException e1) {
  19. prepareHolderForRollback(resourceHolder, e1);
  20. throw e1;
  21. }
  22. catch (Exception e2) {
  23. throw new WrappedTransactionException(e2);
  24. }
  25. });
  26. }
  27. catch (WrappedTransactionException e) { // NOSONAR exception flow control
  28. throw (Exception) e.getCause();
  29. }
  30. }
  31. return doReceiveAndExecute(consumer);
  32. }


return doReceiveAndExecute(consumer);
  1. private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
  2. Channel channel = consumer.getChannel();
  3. for (int i = 0; i < this.txSize; i++) {
  4. logger.trace("Waiting for message from consumer.");
  5. Message message = consumer.nextMessage(this.receiveTimeout);
  6. if (message == null) {
  7. break;
  8. }
  9. try {
  10. executeListener(channel, message);
  11. }


			Message message = consumer.nextMessage(this.receiveTimeout);


  1. public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
  2. if (logger.isTraceEnabled()) {
  3. logger.trace("Retrieving delivery for " + this);
  4. }
  5. checkShutdown();
  6. if (this.missingQueues.size() > 0) {
  7. checkMissingQueues();
  8. }
  9. Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
  10. if (message == null && this.cancelled.get()) {
  11. throw new ConsumerCancelledException();
  12. }
  13. return message;
  14. }

看到poll 我们就放心了,把消息取出来,包装成Message对象。


  1. try {
  2. executeListener(channel, message);
  3. }


  1. protected void executeListener(Channel channel, Message messageIn) {
  2. if (!isRunning()) {
  3. if (logger.isWarnEnabled()) {
  4. logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);
  5. }
  6. throw new MessageRejectedWhileStoppingException();
  7. }
  8. try {
  9. doExecuteListener(channel, messageIn);
  10. }
  11. catch (RuntimeException ex) {
  12. if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {
  13. if (this.statefulRetryFatalWithNullMessageId) {
  14. throw new FatalListenerExecutionException(
  15. "Illegal null id in message. Failed to manage retry for message: " + messageIn, ex);
  16. }
  17. else {
  18. throw new ListenerExecutionFailedException("Cannot retry message more than once without an ID",
  19. new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex),
  20. messageIn);
  21. }
  22. }
  23. handleListenerException(ex);
  24. throw ex;
  25. }
  26. }


太不容易了,消息从发送-> Exchange->Queue -> java amqp client  ->spring client - >consume 最终得到了消费。

4. 总结

小结一下,我们从注解RabbitHandler RabbitListener 入手,一步步追踪到 与Broker链接的创建,Queue的声明,接着,启动新线程 注册一个内部的消费者到Broker中,Broker有消息的时候会推送到本地的BlockingQueue中去。

使用MainLoop 消费本地Blockinqueue的内容


