赞
踩
你来,我们一起精进!你不来,我和你的竞争对手一起精进!
blog.csdn.net/LoveLacie
一、Consumer Ack的三种方式
(1)、自动确认:acknowledge = “none”,这是默认的方式,如果不配置的话,默认就是自动确认,消费方从消息队列中拿出消息后,消息队列中都会清除掉这条消息(不安全).
(2)、手动确认:acknowledge = “manual”,手动确认就是当消费者取出来消息其后的操作正常执行后,返回给消息队列,让其清除该条消息;如果后续执行有异常,可以设置requeue=true返回其消息队列,再让其消息队列重新给消费者发送消息.
(3)、根据异常情况确认(很麻烦):acknowledge = “auto”.
二、进入主题:
SpringBoot+RabbitMQ实现手动Consumer Ack
1、pom文件中导入依赖坐标
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
2、在生产者与消费者工程yml配置文件中开启手动Ack
- spring:
- rabbitmq:
- host: 192.168.253.128 #ip
- username: guest
- password: guest
- virtual-host: /
- port: 5672
- listener:
- simple:
- acknowledge-mode: manual #开启手动Ack
-
-
3、在生产者工程中创建一个配置类声明队列与交换机的关系
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
-
- @Configuration
- public class RabbitMQConfig {
-
- //交换机的名称;
- public static final String DIRECT_EXCHANGE_NAME = "direct_boot_exchange";
- //队列名称;
- public static final String DIRECT_QUEUE_NAME = "direct_boot_queue";
-
-
- /**
- * 声明交换机,在以后我们会定义多个交换机,
- * 所以给这个注入的Bean起一个名字,同理在绑定的时候用@Qualifier注解;
- * durablie:持久化
- */
-
- @Bean("directExchange")
- public Exchange directExchange(){
- return ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).durable(true).build();
- }
-
- //声明队列;
- @Bean("directQueue")
- public Queue testQueue(){
- return QueueBuilder.durable(DIRECT_QUEUE_NAME).build();
- }
-
- //绑定交换机和队列,把上述声明的交换机、队列作为参数传入进来;
- @Bean
- public Binding bindDirectExchangeQueue(@Qualifier("directQueue") Queue queue,
- @Qualifier("directExchange") Exchange exchange){
-
- return BindingBuilder.bind(queue).to(exchange).with("info").noargs();
-
- }
-
- }
-
-
4、在消费者工程中创建一个组件监听在生产者声明的队列
-
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
-
- @Component
- public class MyAckListener {
-
- /**
- *
- * @param message 队列中的消息;
- * @param channel 当前的消息队列;
- * @param tag 取出来当前消息在队列中的的索引,
- * 用这个@Header(AmqpHeaders.DELIVERY_TAG)注解可以拿到;
- * @throws IOException
- */
-
- @RabbitListener(queues = "direct_boot_queue")
- public void myAckListener(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
-
- System.out.println(message);
- try {
-
- /**
- * 无异常就确认消息
- * basicAck(long deliveryTag, boolean multiple)
- * deliveryTag:取出来当前消息在队列中的的索引;
- * multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
- * deliveryTag为5及其以下的消息;一般设置为false
- */
- channel.basicAck(tag, false);
- }catch (Exception e){
- /**
- * 有异常就绝收消息
- * basicNack(long deliveryTag, boolean multiple, boolean requeue)
- * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
- * false:将消息丢弃
- */
- channel.basicNack(tag,false,true);
- }
-
- }
-
- }
-
5、在生产者中创建一个测试类来发送消息
- import com.itlw.config.RabbitMQConfig;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
-
-
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class ProducedTest {
-
- //从IOC容器中拿模板类;
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void test(){
- //发送消息;
- rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE_NAME,
- "info","这是一条测试消息....");
- }
- }
-
5、启动消费者工程来接收此队列的消息
可以看到控制台输出了接收到的消息,并且因为已经被确认,所以队列中消息已经为0,要测出效果,手动添加一个异常.
6、手动添加一个异常
- try {
- /**
- * 无异常就确认消息
- * basicAck(long deliveryTag, boolean multiple)
- * deliveryTag:取出来当前消息在队列中的的索引;
- * multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
- * deliveryTag为5及其以下的消息;一般设置为false
- */
- int i = 3 / 0;//手动添加异常
- channel.basicAck(tag, false);
- } catch (Exception e) {
- /**
- * 有异常就绝收消息
- * basicNack(long deliveryTag, boolean multiple, boolean requeue)
- * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
- * false:将消息丢弃
- */
- channel.basicNack(tag, false, true);
- }
-
7、再次运行看结果
我设置了 channel.basicNack(tag, false, true);第三个requeue属性为true由队列又重新发送给消费者,消费者接收到消息后确认之前遇到了错误又重新拒收消息…所以进入了一个死循环
等暂停运行后,可以看到消息队列中还剩一条消息,就是消费者绝收的这条消息,如果把requeue设置为false,那么这个队列中将没有这条消息.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。