赞
踩
RabbitMQ版本:
RabbitMQ 3.6.8, Erlang 19.0.4
jar包版本:
spring-boot-starter-amqp 2.0.4.RELEASE
问题背景:
之前线上服务曾经出现MQ重启之后,服务不消费队列消息,其具体表现为Queue没有Consumer。但是当时没有拿到日志,并且本着对RabbitMQ这类
中间件的信任,也并没有去做过多的深入研究,觉得是极其偶然事件。但是今天上午线上某个服务的RabbitMQ由于不知名原因重启,导致出现同样现象。
该服务目前一共监听4个队列,暂时用Queue1,Queue2,Queue3,Queue4代替。
在MQ恢复之后只有Queue2,Queue3两个队列正常消费,而另外两个队列则出现消息堆积情况。
排查过程:
首先查看Queue1,Queue4队列的消费者,发现消费者数量为0,表示当前队列没有消费者,但是Queue2,Queue3正常。检查服务代码4个队列除了消费逻辑
其余代码均一致,只调用了Spring提供的接口方法。
至此初步猜测是spring中rabbitMQ消费者线程错误恢复异常导致,部分消费者线程没有启动。
遂分析重启消费者线程源码源码:
RabbitMQ的RabbitListner监听队列是通过SimpleMessageListenerContainer实现的,当线程执行出现异常时会重新启动具体工作线程AsyncMessageProcessingConsumer,当执行到异常时会执行
- {
- //由此发现consumer的重启逻辑是有参数aborted控制
- if (!isActive(this.consumer) || aborted) {
- //省略其余代码
- this.consumer.stop();
- //省略其余代码
- }else{
- logger.info("Restarting " + this.consumer);
- //重启逻辑,是通过新启线程实现
- //具体内部逻辑 getTaskExecutor().execute(new AsyncMessageProcessingConsumer(consumer));
- restart(this.consumer);
- }
- }
异常日志
- [2021-03-04 094735.986][,][INFO][SimpleAsyncTaskExecutor-1][SimpleMessageListenerContainer$AsyncMessageProcessingConsumerrun1212] Restarting Consumer@504a9352 tags=[{amq.ctag-WWlW_DAeVfp570-uQvvXDg=queue1}], channel=Cached Rabbit Channel AMQChannel(amqproot@xxx.xx.xx.xx,4), conn Proxy@3920f51 Shared Rabbit Connection null, acknowledgeMode=MANUAL local queue size=0
- [2021-03-04 094735.987][,][INFO][SimpleAsyncTaskExecutor-2][AbstractConnectionFactorycreateBareConnection463] Attempting to connect to xxx.xx.xx.xx
- [2021-03-04 094735.989][,][ERROR][SimpleAsyncTaskExecutor-2][AbstractMessageListenerContainerredeclareElementsIfNecessary1618] Failed to checkredeclare auto-delete queue(s).
- org.springframework.amqp.AmqpConnectException java.net.ConnectException Connection refused (Connection refused)
- at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java62) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
- at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java484) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
- at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java626) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
- at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java240) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
- at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java1797) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
- at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java1771) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
- at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java1752) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
- at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java345) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
- at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java1604) [spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
- at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java995) [spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
- at java.lang.Thread.run(Thread.java748) [1.8.0_275]
- Caused by java.net.ConnectException Connection refused (Connection refused)
- at java.net.PlainSocketImpl.socketConnect(Native Method) ~[1.8.0_275]
- at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java350) ~[1.8.0_275]
- at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java206) ~[1.8.0_275]
- at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java188) ~[1.8.0_275]
- at java.net.SocksSocketImpl.connect(SocksSocketImpl.java392) ~[1.8.0_275]
- at java.net.Socket.connect(Socket.java607) ~[1.8.0_275]
- at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java60) ~[amqp-client-5.1.2.jar5.1.2]
- at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java955) ~[amqp-client-5.1.2.jar5.1.2]
- at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java907) ~[amqp-client-5.1.2.jar5.1.2]
- at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java1066) ~[amqp-client-5.1.2.jar5.1.2]
- at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java466) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
- ... 9 more
- [2021-03-04 094736.262][,][INFO][iccpaasSimpleAsyncTaskExecutor-2][AbstractConnectionFactorycreateBareConnection463] Attempting to connect to xxx.xx.xx.xx
- [2021-03-04 094736.264][,][ERROR][iccpaasSimpleAsyncTaskExecutor-2][AbstractMessageListenerContainerredeclareElementsIfNecessary1618] Failed to checkredeclare auto-delete queue(s).
通过日志也能看到有打印重启日志。执行线程为SimpleAsyncTaskExecutor-,且每次重启会停止老线程并且新建线程。
但是通过观察它的重启逻辑发现,当遇到部分异常时RabbitMq会终止重启动作直接结束线程,如:
- {
- try{
- //省略
- }catch (QueuesNotAvailableException ex) {
- logger.error("Consumer received fatal=" + isMismatchedQueuesFatal() + " exception on startup", ex);
- if (isMissingQueuesFatal()) {
- this.startupException = ex;
- // Fatal, but no point re-throwing, so just abort.
- aborted = true;
- }
- publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
- }
- catch (PossibleAuthenticationFailureException ex) {
- logger.error("Consumer received fatal=" + isPossibleAuthenticationFailureFatal() +
- " exception during processing", ex);
- if (isPossibleAuthenticationFailureFatal()) {
- this.startupException =
- new FatalListenerStartupException("Authentication failure",
- new AmqpAuthenticationException(ex));
- // Fatal, but no point re-throwing, so just abort.
- aborted = true;
- }
- publishConsumerFailedEvent("Consumer received PossibleAuthenticationFailure during startup", aborted, ex);
- }
- }
同时发现日志打印了两次异常信息。
- [SimpleMessageListenerContainer$AsyncMessageProcessingConsumer:run:1111] Consumer received fatal exception on startup
- org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure
所以到这里基本已经真相大白,有两个消费者因为这个异常导致消费者线程没有继续重新启动。
初步推测RabbitMq在启动过程中还未完全启动,消费去服务器上注册,导致异常信息不正确。
解决方案:
只要配置参数为False即可,默认配置为ture。即不管什么异常继续无限重启消费者线程。
private boolean missingQueuesFatal = true;
private boolean possibleAuthenticationFailureFatal = true;
其实这两个异常分别是队列还未创建,以及用户鉴权失败,通常遇到这两个错误消费者及时一直重试也是毫无意义的,
但是谁又能想到RabbitMQ在重启的时候会导致消费者链接出现鉴权失败的异常。具体原因还得要查看RabbitMQ服务器的启动源码。
原文地址:https://blog.csdn.net/jy00733505/article/details/114705958
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。