当前位置:   article > 正文

rabbitmq无法连接问题_rabbitmq连接不上

rabbitmq连接不上

背景

        最近一个项目使用了rabbitmq作为消息队列,进行异步解耦操作,因涉及到数据的一致性问题,设置了手动应答和持久化功能。开发过程中一切顺利,然而天将降大任于斯人也必先苦其心智老其筋骨,饿其体肤,空乏其身,好吧偏题了。。。。在最终的测试运行中发现一些偶尔会有消息无法发送的情况,有时候1、2周出现,有时候1、2小时出现完全没有规律。本文记载了相关问题并继续处理。

Rabbit配置

        1、设置publisher-confirm-type和publisher-returns发布确认属性,其中publisher-confirm-type有三类值:NONE、CORRELATED、SIMPLE

  • NONE:禁用发布确认模式,是默认值;
  • CORRELATED:发布消息成功到交换器后会触发回调方法;
  • SIMPLE:触发回调方法,并在发布消息成功后,调用waitForConfirms或waitForConfirmsOrDie方法等待返回发送结果。

        2、配置acknowledge-mode为manual手动确认消息

  • acknowledge-mode 三种值
  • none 自动确认,收到消息就通知broker,是默认值
  • manual 手动确认
  • auto 根据异常情况确认
  1. rabbitmq:
  2. host: ******
  3. port: 5672
  4. publisher-confirm-type: correlated
  5. publisher-returns: true
  6. listener:
  7. simple:
  8. acknowledge-mode: manual
  9. retry:
  10. enabled: true

 RabbitTemplate配置

  1. public RabbitSend(RabbitTemplate rabbitTemplate) {
  2. super();
  3. this.rabbitTemplate = rabbitTemplate;
  4. this.rabbitTemplate.setMandatory(true);
  5. this.rabbitTemplate.setReturnsCallback(data -> {
  6. try {
  7. Thread.sleep(SLEEP_TIME);
  8. logger.info("消息发送重试=====>{}", data);
  9. } catch (Exception e) {
  10. logger.error("发送失败", e);
  11. }
  12. });
  13. this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  14. if (ack) {
  15. logger.info("消息发送确认成功====>{}", correlationData);
  16. } else {
  17. logger.info("消息发送失败=====>{}", correlationData);
  18. }
  19. });
  20. }

发送消息

        发送消息这里设置消息的持久化属性。

  1. public void routeSend(String message, String exchange, String routingKey) {
  2. Message msg = this.setMessage(message);
  3. logger.info("开始发送消息");
  4. rabbitTemplate.convertAndSend(exchange, routingKey, msg, new CorrelationData());
  5. logger.info("消息发送完成");
  6. }
  7. private Message setMessage(String json) {
  8. MessageProperties messageProperties = new MessageProperties();
  9. messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//持久化
  10. return new Message(json.getBytes(), messageProperties);
  11. }

消费消息

        消费消息这里采用手动应答的方式,同时如果出现异常将消息移到队尾。

  1. try {
  2. //处理消息,并手动应答
  3. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  4. } catch (Exception e) {
  5. logger.error("消费失败:" + e.getMessage());
  6. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  7. channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(),
  8. MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(message));
  9. }

问题

        悲剧的时刻来了,系统运行过程中会出现莫名奇妙的发送消息失败,并且程序假死。于是只能去调试源码,通过日志的打印发现“消息发送完成”这行日志没有打印出来,因此基本确定是rabbitTemplate.convertAndSend这行有问题,跟踪进去:

  1. //1128
  2. public void convertAndSend(String exchange, String routingKey, final Object object,
  3. @Nullable CorrelationData correlationData) throws AmqpException {
  4. send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
  5. }
  6. //1063行execute()
  7. public void send(final String exchange, final String routingKey,
  8. final Message message, @Nullable final CorrelationData correlationData)
  9. throws AmqpException {
  10. execute(channel -> {
  11. doSend(channel, exchange, routingKey, message,
  12. (RabbitTemplate.this.returnsCallback != null
  13. || (correlationData != null && StringUtils.hasText(correlationData.getId())))
  14. && isMandatoryFor(message),
  15. correlationData);
  16. return null;
  17. }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
  18. }
  19. //2136行 doExecute(action, connectionFactory)
  20. @Nullable
  21. private <T> T execute(final ChannelCallback<T> action, final ConnectionFactory connectionFactory) {
  22. if (this.retryTemplate != null) {
  23. try {
  24. return this.retryTemplate.execute(
  25. (RetryCallback<T, Exception>) context -> doExecute(action, connectionFactory),
  26. (RecoveryCallback<T>) this.recoveryCallback);
  27. }
  28. catch (RuntimeException e) { // NOSONAR catch and rethrow needed to avoid next catch
  29. throw e;
  30. }
  31. catch (Exception e) {
  32. throw RabbitExceptionTranslator.convertRabbitAccessException(e);
  33. }
  34. }
  35. else {
  36. return doExecute(action, connectionFactory);
  37. }
  38. }
  39. //ConnectionFactoryUtils.createConnection
  40. private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionFactory) {
  41. ...
  42. connection = ConnectionFactoryUtils.createConnection(connectionFactory,this.usePublisherConnection);
  43. ...
  44. }

继续ConnectionFactoryUtils.java

  1. public static Connection createConnection(final ConnectionFactory connectionFactory,
  2. final boolean publisherConnectionIfPossible) {
  3. if (publisherConnectionIfPossible) {
  4. ConnectionFactory publisherFactory = connectionFactory.getPublisherConnectionFactory();
  5. if (publisherFactory != null) {
  6. return publisherFactory.createConnection();
  7. }
  8. }
  9. return connectionFactory.createConnection();
  10. }

跳转到CachingConnectionFactory.java

  1. public final Connection createConnection() throws AmqpException {
  2. if (this.stopped) {
  3. throw new AmqpApplicationContextClosedException(
  4. "The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");
  5. }
  6. synchronized (this.connectionMonitor) {//罪魁祸首,最后阻塞在这里
  7. if (this.cacheMode == CacheMode.CHANNEL) {
  8. if (this.connection.target == null) {
  9. this.connection.target = super.createBareConnection();
  10. // invoke the listener *after* this.connection is assigned
  11. if (!this.checkoutPermits.containsKey(this.connection)) {
  12. this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
  13. }
  14. this.connection.closeNotified.set(false);
  15. getConnectionListener().onCreate(this.connection);
  16. }
  17. return this.connection;
  18. }
  19. else if (this.cacheMode == CacheMode.CONNECTION) {
  20. return connectionFromCache();
  21. }
  22. }
  23. return null; // NOSONAR - never reach here - exceptions
  24. }

经过漫长的调试最终发现阻塞在CachingConnectionFactory的721行synchronized (this.connectionMonitor)。connectionMonitor是一个Object对象,加了synchronized锁,但是在对connectionMonitor加锁的地方都打上断点后发现并没有哪里锁住了对象。。。

        本文记录了rabbitmq的问题,期待大神能够提点。

        

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/559761
推荐阅读
相关标签
  

闽ICP备14008679号