当前位置:   article > 正文

RabbitMQ异常重启,部分消费队列不消费问题_org.springframework.amqp.rabbit.listener.exception

org.springframework.amqp.rabbit.listener.exception.fatallistenerstartupexcep

RabbitMQ版本:
RabbitMQ 3.6.8, Erlang 19.0.4

jar包版本:
spring-boot-starter-amqp 2.0.4.RELEASE

问题背景:
之前线上服务曾经出现MQ重启之后,服务不消费队列消息,其具体表现为Queue没有Consumer。但是当时没有拿到日志,并且本着对RabbitMQ这类
中间件的信任,也并没有去做过多的深入研究,觉得是极其偶然事件。但是今天上午线上某个服务的RabbitMQ由于不知名原因重启,导致出现同样现象。
该服务目前一共监听4个队列,暂时用Queue1,Queue2,Queue3,Queue4代替。
在MQ恢复之后只有Queue2,Queue3两个队列正常消费,而另外两个队列则出现消息堆积情况。

排查过程:
首先查看Queue1,Queue4队列的消费者,发现消费者数量为0,表示当前队列没有消费者,但是Queue2,Queue3正常。检查服务代码4个队列除了消费逻辑
其余代码均一致,只调用了Spring提供的接口方法。

至此初步猜测是spring中rabbitMQ消费者线程错误恢复异常导致,部分消费者线程没有启动。
遂分析重启消费者线程源码源码:

RabbitMQ的RabbitListner监听队列是通过SimpleMessageListenerContainer实现的,当线程执行出现异常时会重新启动具体工作线程AsyncMessageProcessingConsumer,当执行到异常时会执行

  1. {
  2. //由此发现consumer的重启逻辑是有参数aborted控制
  3. if (!isActive(this.consumer) || aborted) {
  4. //省略其余代码
  5. this.consumer.stop();
  6. //省略其余代码
  7. }else{
  8. logger.info("Restarting " + this.consumer);
  9. //重启逻辑,是通过新启线程实现
  10. //具体内部逻辑 getTaskExecutor().execute(new AsyncMessageProcessingConsumer(consumer));
  11. restart(this.consumer);
  12. }
  13. }

异常日志

  1. [2021-03-04 094735.986][,][INFO][SimpleAsyncTaskExecutor-1][SimpleMessageListenerContainer$AsyncMessageProcessingConsumerrun1212] Restarting Consumer@504a9352 tags=[{amq.ctag-WWlW_DAeVfp570-uQvvXDg=queue1}], channel=Cached Rabbit Channel AMQChannel(amqproot@xxx.xx.xx.xx,4), conn Proxy@3920f51 Shared Rabbit Connection null, acknowledgeMode=MANUAL local queue size=0
  2. [2021-03-04 094735.987][,][INFO][SimpleAsyncTaskExecutor-2][AbstractConnectionFactorycreateBareConnection463] Attempting to connect to xxx.xx.xx.xx
  3. [2021-03-04 094735.989][,][ERROR][SimpleAsyncTaskExecutor-2][AbstractMessageListenerContainerredeclareElementsIfNecessary1618] Failed to checkredeclare auto-delete queue(s).
  4. org.springframework.amqp.AmqpConnectException java.net.ConnectException Connection refused (Connection refused)
  5. at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java62) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  6. at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java484) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  7. at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java626) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  8. at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java240) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  9. at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java1797) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  10. at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java1771) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  11. at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java1752) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  12. at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java345) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  13. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java1604) [spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  14. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java995) [spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  15. at java.lang.Thread.run(Thread.java748) [1.8.0_275]
  16. Caused by java.net.ConnectException Connection refused (Connection refused)
  17. at java.net.PlainSocketImpl.socketConnect(Native Method) ~[1.8.0_275]
  18. at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java350) ~[1.8.0_275]
  19. at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java206) ~[1.8.0_275]
  20. at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java188) ~[1.8.0_275]
  21. at java.net.SocksSocketImpl.connect(SocksSocketImpl.java392) ~[1.8.0_275]
  22. at java.net.Socket.connect(Socket.java607) ~[1.8.0_275]
  23. at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java60) ~[amqp-client-5.1.2.jar5.1.2]
  24. at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java955) ~[amqp-client-5.1.2.jar5.1.2]
  25. at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java907) ~[amqp-client-5.1.2.jar5.1.2]
  26. at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java1066) ~[amqp-client-5.1.2.jar5.1.2]
  27. at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java466) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  28. ... 9 more
  29. [2021-03-04 094736.262][,][INFO][iccpaasSimpleAsyncTaskExecutor-2][AbstractConnectionFactorycreateBareConnection463] Attempting to connect to xxx.xx.xx.xx
  30. [2021-03-04 094736.264][,][ERROR][iccpaasSimpleAsyncTaskExecutor-2][AbstractMessageListenerContainerredeclareElementsIfNecessary1618] Failed to checkredeclare auto-delete queue(s).

通过日志也能看到有打印重启日志。执行线程为SimpleAsyncTaskExecutor-,且每次重启会停止老线程并且新建线程。
但是通过观察它的重启逻辑发现,当遇到部分异常时RabbitMq会终止重启动作直接结束线程,如:

  1. {
  2. try{
  3. //省略
  4. }catch (QueuesNotAvailableException ex) {
  5. logger.error("Consumer received fatal=" + isMismatchedQueuesFatal() + " exception on startup", ex);
  6. if (isMissingQueuesFatal()) {
  7. this.startupException = ex;
  8. // Fatal, but no point re-throwing, so just abort.
  9. aborted = true;
  10. }
  11. publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
  12. }
  13. catch (PossibleAuthenticationFailureException ex) {
  14. logger.error("Consumer received fatal=" + isPossibleAuthenticationFailureFatal() +
  15. " exception during processing", ex);
  16. if (isPossibleAuthenticationFailureFatal()) {
  17. this.startupException =
  18. new FatalListenerStartupException("Authentication failure",
  19. new AmqpAuthenticationException(ex));
  20. // Fatal, but no point re-throwing, so just abort.
  21. aborted = true;
  22. }
  23. publishConsumerFailedEvent("Consumer received PossibleAuthenticationFailure during startup", aborted, ex);
  24. }
  25. }

同时发现日志打印了两次异常信息。

  1. [SimpleMessageListenerContainer$AsyncMessageProcessingConsumer:run:1111] Consumer received fatal exception on startup
  2. org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure

所以到这里基本已经真相大白,有两个消费者因为这个异常导致消费者线程没有继续重新启动。

初步推测RabbitMq在启动过程中还未完全启动,消费去服务器上注册,导致异常信息不正确。

解决方案:
只要配置参数为False即可,默认配置为ture。即不管什么异常继续无限重启消费者线程。

private boolean missingQueuesFatal = true;

private boolean possibleAuthenticationFailureFatal = true;

其实这两个异常分别是队列还未创建,以及用户鉴权失败,通常遇到这两个错误消费者及时一直重试也是毫无意义的,
但是谁又能想到RabbitMQ在重启的时候会导致消费者链接出现鉴权失败的异常。具体原因还得要查看RabbitMQ服务器的启动源码。
 

原文地址:https://blog.csdn.net/jy00733505/article/details/114705958

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/628829
推荐阅读
相关标签
  

闽ICP备14008679号