当前位置:   article > 正文

手把手教你SpringBoot+RabbitMQ实现手动Consumer Ack

springboot consumer ack

你知道的越多,不知道的就越多,业余的像一棵小草!

你来,我们一起精进!你不来,我和你的竞争对手一起精进!

编辑:业余草

blog.csdn.net/LoveLacie

推荐:https://www.xttblog.com/?p=5162

一、Consumer Ack的三种方式

(1)、自动确认:acknowledge = “none”,这是默认的方式,如果不配置的话,默认就是自动确认,消费方从消息队列中拿出消息后,消息队列中都会清除掉这条消息(不安全).

(2)、手动确认:acknowledge = “manual”,手动确认就是当消费者取出来消息其后的操作正常执行后,返回给消息队列,让其清除该条消息;如果后续执行有异常,可以设置requeue=true返回其消息队列,再让其消息队列重新给消费者发送消息.

(3)、根据异常情况确认(很麻烦):acknowledge = “auto”.

二、进入主题:

SpringBoot+RabbitMQ实现手动Consumer Ack

1、pom文件中导入依赖坐标

  1.         <dependency>
  2.             <groupId>org.springframework.boot</groupId>
  3.             <artifactId>spring-boot-starter-amqp</artifactId>
  4.         </dependency>

2、在生产者与消费者工程yml配置文件中开启手动Ack

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.253.128 #ip
  4.     username: guest
  5.     password: guest
  6.     virtual-host: /
  7.     port: 5672
  8.     listener:
  9.       simple:
  10.         acknowledge-mode: manual #开启手动Ack

3、在生产者工程中创建一个配置类声明队列与交换机的关系

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class RabbitMQConfig {
  7.     //交换机的名称;
  8.     public static final String DIRECT_EXCHANGE_NAME = "direct_boot_exchange";
  9.     //队列名称;
  10.     public static final String DIRECT_QUEUE_NAME = "direct_boot_queue";
  11.     /**
  12.      * 声明交换机,在以后我们会定义多个交换机,
  13.      * 所以给这个注入的Bean起一个名字,同理在绑定的时候用@Qualifier注解;
  14.      * durablie:持久化
  15.      */
  16.     @Bean("directExchange")
  17.     public Exchange directExchange(){
  18.         return ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).durable(true).build();
  19.     }
  20.     //声明队列;
  21.     @Bean("directQueue")
  22.     public Queue testQueue(){
  23.         return QueueBuilder.durable(DIRECT_QUEUE_NAME).build();
  24.     }
  25.     //绑定交换机和队列,把上述声明的交换机、队列作为参数传入进来;
  26.     @Bean
  27.     public Binding bindDirectExchangeQueue(@Qualifier("directQueue") Queue queue,
  28.                                            @Qualifier("directExchange") Exchange exchange){
  29.                                            
  30.         return BindingBuilder.bind(queue).to(exchange).with("info").noargs();
  31.     }
  32. }

4、在消费者工程中创建一个组件监听在生产者声明的队列

  1. import com.rabbitmq.client.Channel;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.amqp.support.AmqpHeaders;
  4. import org.springframework.messaging.handler.annotation.Header;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. @Component
  8. public class MyAckListener {
  9.     /**
  10.      *
  11.      * @param message 队列中的消息;
  12.      * @param channel 当前的消息队列;
  13.      * @param tag 取出来当前消息在队列中的的索引,
  14.      * 用这个@Header(AmqpHeaders.DELIVERY_TAG)注解可以拿到;
  15.      * @throws IOException
  16.      */
  17.     @RabbitListener(queues = "direct_boot_queue")
  18.     public void myAckListener(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
  19.         System.out.println(message);
  20.         try {
  21.             /**
  22.              * 无异常就确认消息
  23.              * basicAck(long deliveryTag, boolean multiple)
  24.              * deliveryTag:取出来当前消息在队列中的的索引;
  25.              * multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
  26.              * deliveryTag为5及其以下的消息;一般设置为false
  27.              */
  28.             channel.basicAck(tag, false);
  29.         }catch (Exception e){
  30.             /**
  31.              * 有异常就绝收消息
  32.              * basicNack(long deliveryTag, boolean multiple, boolean requeue)
  33.              * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
  34.              *         false:将消息丢弃
  35.              */
  36.             channel.basicNack(tag,false,true);
  37.         }
  38.         
  39.     }
  40. }

5、在生产者中创建一个测试类来发送消息

  1. import com.itlw.config.RabbitMQConfig;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. import org.springframework.test.context.junit4.SpringRunner;
  8. @RunWith(SpringRunner.class)
  9. @SpringBootTest
  10. public class ProducedTest {
  11.     //从IOC容器中拿模板类;
  12.     @Autowired
  13.     private RabbitTemplate rabbitTemplate;
  14.     @Test
  15.     public void test(){
  16.         //发送消息;
  17.         rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE_NAME,
  18.                 "info","这是一条测试消息....");
  19.     }
  20. }

5、启动消费者工程来接收此队列的消息

可以看到控制台输出了接收到的消息,并且因为已经被确认,所以队列中消息已经为0,要测出效果,手动添加一个异常.

6、手动添加一个异常

  1.  try {
  2.             /**
  3.              * 无异常就确认消息
  4.              * basicAck(long deliveryTag, boolean multiple)
  5.              * deliveryTag:取出来当前消息在队列中的的索引;
  6.              * multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
  7.              * deliveryTag为5及其以下的消息;一般设置为false
  8.              */
  9.             int i = 3 / 0;//手动添加异常
  10.             channel.basicAck(tag, false);
  11.         } catch (Exception e) {
  12.             /**
  13.              * 有异常就绝收消息
  14.              * basicNack(long deliveryTag, boolean multiple, boolean requeue)
  15.              * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
  16.              *         false:将消息丢弃
  17.              */
  18.             channel.basicNack(tag, falsetrue);
  19.         }

7、再次运行看结果

我设置了 channel.basicNack(tag, false, true);第三个requeue属性为true由队列又重新发送给消费者,消费者接收到消息后确认之前遇到了错误又重新拒收消息…所以进入了一个死循环

等暂停运行后,可以看到消息队列中还剩一条消息,就是消费者绝收的这条消息,如果把requeue设置为false,那么这个队列中将没有这条消息.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/127081?site
推荐阅读
相关标签
  

闽ICP备14008679号