赞
踩
在yml文件中配置:
spring:
rabbitmq:
publisher-confirm-type: CORRELATED
publisher-returns: true
publisher-confirm-type:消息确认的类型
/** * The type of publisher confirms to use. */ public enum ConfirmType { /** * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()} * within scoped operations. */ SIMPLE, /** * Use with {@code CorrelationData} to correlate confirmations with sent * messsages. */ CORRELATED, /** * Publisher confirms are disabled (default). */ NONE }
未投递到Queue退回模式
@Test
public void sendMessageTest(){
/**
* 发送消息
*/
rabbitTemplate.convertAndSend("java-direct-exchange","hello-java","hello world!");
log.info("发送成功");
}
结果:
returnMessage执行。
配置yml文件:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual/auto/none
消费者三种消息确认模式:
/**
* 监听java-queue队列中OrderReturnApplyEntity的消息
* @param msg
* @param orderReturnApplyEntity
*/
@RabbitHandler
public void recivedMessage(Message msg, OrderReturnApplyEntity orderReturnApplyEntity){
System.out.println("orderReturnApplyEntity:"+orderReturnApplyEntity);
//添加异常
int i = 1 / 0;
}
发送消息:
消费者不停重试:
消息一致是未确认:
停掉消费者重新回到队列,消息没有丢失:
在yml中添加配置
acknowledge-mode: none
再次运行上述异常代码 并发送消息:
消费者抛出异常:
消息被消费:
在yml中添加配置
acknowledge-mode: manual
改造消费者代码:
@RabbitHandler public void recivedMessage(Message msg, OrderReturnApplyEntity orderReturnApplyEntity, Channel channel) throws IOException { try { System.out.println("接收到消息:" + msg); int i = 1 / 0; // 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息 channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (msg.getMessageProperties().getRedelivered()) { System.out.println("消息重试后依然失败,拒绝再次接收"); // 拒绝消息,不再重新入队(如果绑定了死信队列消息会进入死信队列,没有绑定死信队列则消息被丢弃,也可以把失败消息记录到redis或者mysql中),也可以设置为true再重试。 channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false); } else { System.out.println("消息消费时出现异常,即将再次返回队列处理"); // Nack消息,重新入队(重试一次) channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true); } e.printStackTrace(); } }
basicAck表示确认消息,false只确认当前一个消息
// 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
basicReject表示拒绝消息,false表示不再重新入队列
// 拒绝消息,不再重新入队(如果绑定了死信队列消息会进入死信队列,没有绑定死信队列则消息被丢弃,也可以把失败消息记录到redis或者mysql中),也可以设置为true再重试。
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
// Nack消息,重新入队(重试一次)
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
发送消息:
消费消息:
接收到消息:(Body:'{"id":1,"orderId":null,"skuId":null,"orderSn":null,"createTime":1624356355348,"memberUsername":null,"returnAmount":null,"returnName":null,"returnPhone":null,"status":null,"handleTime":null,"skuImg":null,"skuName":"iPhone","skuBrand":null,"skuAttrsVals":null,"skuCount":null,"skuPrice":null,"skuRealPrice":null,"reason":null,"description述":null,"descPics":null,"handleNote":null,"handleMan":null,"receiveMan":null,"receiveTime":null,"receiveNote":null,"receivePhone":null,"companyAddress":null}' MessageProperties [headers={spring_listener_return_correlation=79900859-553e-461b-9bc4-83241e4138e4, __TypeId__=com.simon.gulimall.order.entity.OrderReturnApplyEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=java-direct-exchange, receivedRoutingKey=hello-java, deliveryTag=1, consumerTag=amq.ctag-Q5c52wics9W5XNrHtwQTSg, consumerQueue=java-queue])
消息消费时出现异常,即将再次返回队列处理
接收到消息:(Body:'{"id":1,"orderId":null,"skuId":null,"orderSn":null,"createTime":1624356355348,"memberUsername":null,"returnAmount":null,"returnName":null,"returnPhone":null,"status":null,"handleTime":null,"skuImg":null,"skuName":"iPhone","skuBrand":null,"skuAttrsVals":null,"skuCount":null,"skuPrice":null,"skuRealPrice":null,"reason":null,"description述":null,"descPics":null,"handleNote":null,"handleMan":null,"receiveMan":null,"receiveTime":null,"receiveNote":null,"receivePhone":null,"companyAddress":null}' MessageProperties [headers={spring_listener_return_correlation=79900859-553e-461b-9bc4-83241e4138e4, __TypeId__=com.simon.gulimall.order.entity.OrderReturnApplyEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=java-direct-exchange, receivedRoutingKey=hello-java, deliveryTag=2, consumerTag=amq.ctag-Q5c52wics9W5XNrHtwQTSg, consumerQueue=java-queue])
java.lang.ArithmeticException: / by zero
at com.simon.gulimall.order.service.impl.OrderItemServiceImpl.recivedMessage(OrderItemServiceImpl.java:41)
消息重试后依然失败,拒绝再次接收
第一步报错,进行重试,如果重试依然报错,就进行拒绝。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。