赞
踩
消息丢失的情况
(1)生产者方面:生产者发送消息至MQ的数据丢失
(2)RabbitMQ方面:MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失
(3)消费者方面:消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完
解决方法
1.配置文件中添加
#消息已发送到交换机(Exchange)时返回
spring.rabbitmq.publisher-confirm-type=correlated
# 消息在未被队列收到的情况下返回
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-returns=true
# 开启消息手动确认机制
spring.rabbitmq.listener.simple.acknowledge-mode=manual
2.config类配置
package com.example.demo.config; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Configuration public class RabbitmqConfig { @Autowired AmqpAdmin amqpAdmin; @Autowired RabbitTemplate rabbitTemplate; @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //发送到exchange时调用回调函数 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback:"+"相关数据:"+correlationData+" ConfirmCallback:"+"确认情况:"+ack+" ConfirmCallback:"+"原因:"+cause); } }); //设置消息抵达队列的失败回调 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { System.out.println("二位热热我热太热"); } }); return rabbitTemplate; } /** * 在spring中也可使用@bean的方式去創建 綁定 等操作 这里我创建了一个交换机 */ @Bean public void createNormalExchange(){ DirectExchange mydirect = new DirectExchange("mydirect3", true, false); amqpAdmin.declareExchange(mydirect); } }
3.监听类改写
package com.example.demo.service.impl; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.io.IOException; @Service public class RabbitmqTest2Impl { @RabbitListener(queues = {"mybe"}) public void getMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { byte[] body = message.getBody(); MessageProperties messageProperties = message.getMessageProperties(); Thread.sleep(2000); String s = new String(body); System.out.println("消费消息完成"+s); //进行手动确认 channel.basicAck(deliveryTag,false); } catch (IOException e) { //消息消费方错误后的处理 //deliveryTag消息id //multiple – true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag. // requeue是否重新入队 channel.basicNack(deliveryTag,false,false); e.printStackTrace(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。