当前位置:   article > 正文

【问题排查】通过一次创建队列失败了解Rabbitmq自动创建队列原理_java springboot项目启动连接rabbitmq时无法自动添加队列,提示reply-cod

java springboot项目启动连接rabbitmq时无法自动添加队列,提示reply-code=404, re

问题描述

同事说有个项目不能自动创建新队列,每次新增队列都要先在rabbitmq后台手动创建,并且只有生产环境是这样,测试环境没有这个问题。

我一听还有这种事情,这怎么能忍,本着我不入地狱谁入地狱的精神开始排查这个问题,顺便在深入了解下springboot怎么实现rabbitmq队列的自动创建。


查找原因

正常来说,消费者使用@RabbitListener注解声明了queue、exchange之后会自动创建不存在的队列,并声明队列和exchange的绑定关系。在这个项目中也是这么用的,但是为什么没有创建队列呢。

首先通过观察启动日志发现如下报错信息片段

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'test_queue' in vhost 'test_vhost', class-id=50, method-id=10)
  • 1

通过报错信息能很明确知道是因为队列在vhost中不存在,channel异常关闭。通过官网也能说明这一点

rabbitmq官网对channel错误信息描述在这里插入图片描述但这还是因为没有创建队列导致的结果,还要继续找没有自动创建的原因。继续查看日志发现还有一条报错信息

Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'auto_delete' for exchange 'dome_exchange' in vhost 'test_vhost': received 'false' but current is 'true', class-id=40, method-id=10)
  • 1

因为这条报错信息和当前队列并没有直接关系,所以被同事忽略了。但我看到channel.close关键字心想难道就是因为这个报错导致链接关闭没有创建队列?带着这个疑问继续排查问题。

查看源码

首先确认报错原因,通过rabbitmq后台查看deme_exchange,发现auto_delete参数确实为true

rabbitmq后台查看参数在这里插入图片描述

消费者代码
在这里插入图片描述

消费者代码中并没有声明auto_delete参数,通过查看@Exchange注解发现auto_delete默认为false

	/**
	 * @return true if the exchange is to be declared as auto-delete.
	 */
	String autoDelete() default "false";
  • 1
  • 2
  • 3
  • 4

这就导致应用启动时代码中声明的exchange和远程已经存在的参数不一致,导致报406错误。通过上面官网截图也可看到对406报错的说明能够印证这一点。至于不一致的原因,后来问同事他应该是在后台手动创建的exchange,并没有注意这些参数细节。所以一定要规范操作啊,不然各种挖坑。

找到406的报错原因后,继续排查为什么队列没有自动创建。通过查看@RabbitListener注解源码,发现对queues参数的说明如下(不同springboot版本说明可能不一样)

	/**
	 * The queues for this listener.
	 * The entries can be 'queue name', 'property-placeholder keys' or 'expressions'.
	 * Expression must be resolved to the queue name or {@code Queue} object.
	 * The queue(s) must exist, or be otherwise defined elsewhere as a bean(s) with
	 * a {@link org.springframework.amqp.rabbit.core.RabbitAdmin} in the application
	 * context.
	 * Mutually exclusive with {@link #bindings()} and {@link #queuesToDeclare()}.
	 * @return the queue names or expressions (SpEL) to listen to from target
	 * @see org.springframework.amqp.rabbit.listener.MessageListenerContainer
	 */
	String[] queues() default {};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

其中说到要么队列已经存在,或者在其他地方定义了RabbitAdmin这个bean。

继续查看RabbitAdmin源码发现几个关键信息,该类实现InitializingBean接口,该接口在Bean初始化完成后会执行指定逻辑,在InitializingBean接口下的afterPropertiesSet()方法中实现,RabbitAdmin重写的afterPropertiesSet()方法中发现会调用initialize()方法,该方法源码如下

/**
	* Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe
	 * (but unnecessary) to call this method more than once.
	 */
	@Override // NOSONAR complexity
	public void initialize() {

		if (this.applicationContext == null) {
			this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
			return;
		}

		this.logger.debug("Initializing declarations");
		Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
				this.applicationContext.getBeansOfType(Exchange.class).values());
		Collection<Queue> contextQueues = new LinkedList<Queue>(
				this.applicationContext.getBeansOfType(Queue.class).values());
		Collection<Binding> contextBindings = new LinkedList<Binding>(
				this.applicationContext.getBeansOfType(Binding.class).values());
		Collection<DeclarableCustomizer> customizers =
				this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();

		processDeclarables(contextExchanges, contextQueues, contextBindings);

		final Collection<Exchange> exchanges = filterDeclarables(contextExchanges, customizers);
		final Collection<Queue> queues = filterDeclarables(contextQueues, customizers);
		final Collection<Binding> bindings = filterDeclarables(contextBindings, customizers);

		for (Exchange exchange : exchanges) {
			if ((!exchange.isDurable() || exchange.isAutoDelete())  && this.logger.isInfoEnabled()) {
				this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
						+ exchange.getName()
						+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
						+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
						+ "reopening the connection.");
			}
		}

		for (Queue queue : queues) {
			if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
				this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
						+ queue.getName()
						+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
						+ queue.isExclusive() + ". "
						+ "It will be redeclared if the broker stops and is restarted while the connection factory is "
						+ "alive, but all messages will be lost.");
			}
		}

		if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
			this.logger.debug("Nothing to declare");
			return;
		}
		this.rabbitTemplate.execute(channel -> {
			declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
			declareQueues(channel, queues.toArray(new Queue[queues.size()]));
			declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
			return null;
		});
		this.logger.debug("Declarations finished");

	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

可以看到initialize()方法中会从spring容器中获取所有的Exchange、Queue和Binding等bean,并在declareExchanges()、declareQueues()、declareBindings()等方法中创建不存在的exchange、queues和绑定关系。这也能解释为什么要在消费者中创建相应的exchange、queue和Bingding的bean,并且自动创建队列。

当declareExchanges()方法有异常返回406时,并不会继续执行下面的declareQueues()和declareBindings()方法,所以新队列并没有自动创建。至此就找到了队列没有创建的原因以及自动创建exchange和queues相关的源码。而测试环境没有报错的原因是因为不存在exchange参数不一致的情况。

RabbitAdmin是如何自动创建的

进一步思考发现RabbitAdmin这个bean并没有在代码中创建,那么它是什么时候注入到容器中的呢。

继续追查源码发现RabbitAdmin在RabbitAutoConfiguration类中被创建并注入到容器中

		@Bean
		@ConditionalOnSingleCandidate(ConnectionFactory.class)
		@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
		@ConditionalOnMissingBean
		public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
			return new RabbitAdmin(connectionFactory);
		}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

RabbitAutoConfiguration是rabbitmq的配置类,通过注解可以发现当存在RabbitTemplate类时会启用

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
  • 1
  • 2
  • 3
  • 4
  • 5

RabbitTemplate是对Rabbitmq封装的操作类,在项目中有被显式声明,所以RabbitAutoConfiguration也会被创建,至此rabbitmq自动创建队列的大致流程就被串起来了。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/944879
推荐阅读
相关标签
  

闽ICP备14008679号