赞
踩
最近一个项目使用了rabbitmq作为消息队列,进行异步解耦操作,因涉及到数据的一致性问题,设置了手动应答和持久化功能。开发过程中一切顺利,然而天将降大任于斯人也必先苦其心智老其筋骨,饿其体肤,空乏其身,好吧偏题了。。。。在最终的测试运行中发现一些偶尔会有消息无法发送的情况,有时候1、2周出现,有时候1、2小时出现完全没有规律。本文记载了相关问题并继续处理。
1、设置publisher-confirm-type和publisher-returns发布确认属性,其中publisher-confirm-type有三类值:NONE、CORRELATED、SIMPLE
2、配置acknowledge-mode为manual手动确认消息
- rabbitmq:
- host: ******
- port: 5672
- publisher-confirm-type: correlated
- publisher-returns: true
- listener:
- simple:
- acknowledge-mode: manual
- retry:
- enabled: true
- public RabbitSend(RabbitTemplate rabbitTemplate) {
- super();
- this.rabbitTemplate = rabbitTemplate;
- this.rabbitTemplate.setMandatory(true);
- this.rabbitTemplate.setReturnsCallback(data -> {
- try {
- Thread.sleep(SLEEP_TIME);
- logger.info("消息发送重试=====>{}", data);
- } catch (Exception e) {
- logger.error("发送失败", e);
- }
- });
- this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
- if (ack) {
- logger.info("消息发送确认成功====>{}", correlationData);
- } else {
- logger.info("消息发送失败=====>{}", correlationData);
- }
- });
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
发送消息这里设置消息的持久化属性。
- public void routeSend(String message, String exchange, String routingKey) {
- Message msg = this.setMessage(message);
- logger.info("开始发送消息");
- rabbitTemplate.convertAndSend(exchange, routingKey, msg, new CorrelationData());
- logger.info("消息发送完成");
- }
-
- private Message setMessage(String json) {
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//持久化
- return new Message(json.getBytes(), messageProperties);
- }
消费消息这里采用手动应答的方式,同时如果出现异常将消息移到队尾。
- try {
- //处理消息,并手动应答
-
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- } catch (Exception e) {
- logger.error("消费失败:" + e.getMessage());
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(),
- MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(message));
- }
悲剧的时刻来了,系统运行过程中会出现莫名奇妙的发送消息失败,并且程序假死。于是只能去调试源码,通过日志的打印发现“消息发送完成”这行日志没有打印出来,因此基本确定是rabbitTemplate.convertAndSend这行有问题,跟踪进去:
- //1128行
- public void convertAndSend(String exchange, String routingKey, final Object object,
- @Nullable CorrelationData correlationData) throws AmqpException {
-
- send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
- }
-
-
- //1063行execute()
-
- public void send(final String exchange, final String routingKey,
- final Message message, @Nullable final CorrelationData correlationData)
- throws AmqpException {
- execute(channel -> {
- doSend(channel, exchange, routingKey, message,
- (RabbitTemplate.this.returnsCallback != null
- || (correlationData != null && StringUtils.hasText(correlationData.getId())))
- && isMandatoryFor(message),
- correlationData);
- return null;
- }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
- }
-
- //2136行 doExecute(action, connectionFactory)
- @Nullable
- private <T> T execute(final ChannelCallback<T> action, final ConnectionFactory connectionFactory) {
- if (this.retryTemplate != null) {
- try {
- return this.retryTemplate.execute(
- (RetryCallback<T, Exception>) context -> doExecute(action, connectionFactory),
- (RecoveryCallback<T>) this.recoveryCallback);
- }
- catch (RuntimeException e) { // NOSONAR catch and rethrow needed to avoid next catch
- throw e;
- }
- catch (Exception e) {
- throw RabbitExceptionTranslator.convertRabbitAccessException(e);
- }
- }
- else {
- return doExecute(action, connectionFactory);
- }
- }
-
-
- //ConnectionFactoryUtils.createConnection
- private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionFactory) {
- ...
- connection = ConnectionFactoryUtils.createConnection(connectionFactory,this.usePublisherConnection);
- ...
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
继续ConnectionFactoryUtils.java
- public static Connection createConnection(final ConnectionFactory connectionFactory,
- final boolean publisherConnectionIfPossible) {
-
- if (publisherConnectionIfPossible) {
- ConnectionFactory publisherFactory = connectionFactory.getPublisherConnectionFactory();
- if (publisherFactory != null) {
- return publisherFactory.createConnection();
- }
- }
- return connectionFactory.createConnection();
- }
跳转到CachingConnectionFactory.java
- public final Connection createConnection() throws AmqpException {
- if (this.stopped) {
- throw new AmqpApplicationContextClosedException(
- "The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");
- }
- synchronized (this.connectionMonitor) {//罪魁祸首,最后阻塞在这里
- if (this.cacheMode == CacheMode.CHANNEL) {
- if (this.connection.target == null) {
- this.connection.target = super.createBareConnection();
- // invoke the listener *after* this.connection is assigned
- if (!this.checkoutPermits.containsKey(this.connection)) {
- this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
- }
- this.connection.closeNotified.set(false);
- getConnectionListener().onCreate(this.connection);
- }
- return this.connection;
- }
- else if (this.cacheMode == CacheMode.CONNECTION) {
- return connectionFromCache();
- }
- }
- return null; // NOSONAR - never reach here - exceptions
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
经过漫长的调试最终发现阻塞在CachingConnectionFactory的721行synchronized (this.connectionMonitor)。connectionMonitor是一个Object对象,加了synchronized锁,但是在对connectionMonitor加锁的地方都打上断点后发现并没有哪里锁住了对象。。。
本文记录了rabbitmq的问题,期待大神能够提点。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。