赞
踩
之前一直是自动ACK,现在需要用到手动ACK;
但是我在网上找了一阵子,感觉都是比较旧的,我用的时候出一些奇奇怪怪的情况;
我自己摸索了一阵子,发现按如下方式是可以完成手动ACK的;
因为我目前用到的是fanout模式下的手动ACK;
因此我就以fanout模式为例子;
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: msg
password: msg
virtual-host: tubai_test
listener:
simple:
prefetch: 1 #一次只能消费一条消息
acknowledge-mode: manual #手动ACK
#在消息没有被路由到指定的queue时将消息返回,而不是丢弃
publisher-returns: true
#发布消息成功到交换器后会触发回调方法
publisher-confirm-type: correlated
此处直接用接口来当生产者了;
package com.tubai; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RestController @CrossOrigin @RequestMapping("/receive") public class TestController{ @Autowired private RabbitTemplate rabbitTemplate;//封装了一些操作 /** * 将信号放入MQ * @param message * @return */ @PostMapping("/msg/muscle") public String receiveMuscleSign(@RequestBody String message) { //处理业务 for (int i = 1; i <= 5; i++) { rabbitTemplate.convertAndSend("muscle_fanout_exchange","",message+i); } return "ok"; } }
此处用一个类下的两个方法来模拟2个消费者
package com.tubai; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component public class MyConsumer { @RabbitListener(bindings = { @QueueBinding( value = @Queue("consumer_queue_1"), //绑定交换机 exchange = @Exchange(value = "muscle_fanout_exchange", type = "fanout") ) }) public void consumer1(String msg,Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("消费者1 => " + msg); //channel.basicAck(deliveryTag, false); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 } catch (Exception e) { channel.basicReject(deliveryTag, false); e.printStackTrace(); } } @RabbitListener(bindings = { @QueueBinding( value = @Queue("consumer_queue_2"), //绑定交换机 exchange = @Exchange(value = "muscle_fanout_exchange", type = "fanout") ) }) public void consumer2(String msg,Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("消费者2 => " + msg); channel.basicAck(deliveryTag, false); } catch (Exception e) { channel.basicReject(deliveryTag, false); e.printStackTrace(); } } }
注意一点,消费者1的手动ACK我们是注释掉了
而消费者2的手动ACK我们是开着的
原因是为了对照试验
我们期望的情况是:一共5条消息,消费者1和2都一一处理;
处理完毕后再取下一条,否则不让取;
那么按我们代码这样写;
消费者1只能取一条
而消费者2则能取满5条(因为消费者1的手动ACK被我们注释了,此处又不是自动ACK)
以下是实验截图
首先用postman发送请求
生产者生产消息完毕
接着看消费者
结果和我们预想的是一致的;
我们在看看MQ的管理页面来确认
可以看到,消费者2已经搞完了,而消费者1那边卡住了;
细心的小伙伴可能发现了我们在消费者的catch处写了这样一行代码
channel.basicReject(deliveryTag, false);
以下是解释
一般是有3种确认的,其中1种是正确确认,另外2种是错误确认;
就是以下三个
void basicReject(long deliveryTag, boolean requeue) throws IOException;
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
void basicAck(long deliveryTag, boolean multiple) throws IOException;
其中Ack是正确确认,而另外2个是错误确认;
这两个错误确认的区别在哪?
请看源代码的注释
显然可以看出
reject:只能否定一条消息
nack:可以否定一条或者多条消息
而错误确认的这两个,都有一个属性
boolean requeue
当它是true的时候,表示重新入队;
当它是false的时候,则表示抛弃掉;
使用拒绝后重新入列这个确认模式要谨慎,因为触发错误确认一般都是出现异常的时候,那么就可能导致死循环,即不断的入队-消费-报错-重新入队…;这将导致消息积压,万一就炸了…
我们将上述的消费者代码加一行代码;
此处只改动了消费者1,消费者2不变
新增一条抛异常的语句
int num = 1/0;
package com.tubai; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component public class MyConsumer { @RabbitListener(bindings = { @QueueBinding( value = @Queue("consumer_queue_1"), //绑定交换机 exchange = @Exchange(value = "muscle_fanout_exchange", type = "fanout") ) }) public void consumer1(String msg,Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("消费者1 => " + msg); int num = 1/0; channel.basicAck(deliveryTag, false); //第二个参数,手动确认可以被批处理,当该参数为 true 时 } catch (Exception e) { channel.basicReject(deliveryTag, false); e.printStackTrace(); } } @RabbitListener(bindings = { @QueueBinding( value = @Queue("consumer_queue_2"), //绑定交换机 exchange = @Exchange(value = "muscle_fanout_exchange", type = "fanout") ) }) public void consumer2(String msg,Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("消费者2 => " + msg); channel.basicAck(deliveryTag, false); } catch (Exception e) { channel.basicReject(deliveryTag, false); e.printStackTrace(); } } }
可以看到我们的消费者1也正常了,因为我们是先打印后确认,因此1~5也会被打印出来;
如果重复入队…那么我们的程序就会死循环了,疯狂打印,各位可以自己试试(大雾
重复入队见下面
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。