赞
踩
目录
四、第二种场景:. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack
五、第三种场景: 消息的Expiration 过期时长或队列TTL过期时间。
在RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。
死信就是消息在特定场景下的一种表现形式,这些场景包括:
1. 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时 或者拒绝basicReject
2. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack
3. 消息的Expiration 过期时长或队列TTL过期时间。
4. 消息队列达到最大容量
上述场景经常产生死信,即消息在这些场景中时,被称为死信。
死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。
死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的业务消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。 人工干预
#设置消费者手动应答模式
spring.rabbitmq.listener.simple.acknowledge-mode = manual
-
- package com.by.consumer;
-
-
- import com.by.model.OrderingOk;
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.io.IOException;
-
- @Configuration
- @Slf4j
- public class DeadConsumer {
- //死信交换机
- @Bean
- public DirectExchange deadExchange() {
- return ExchangeBuilder.directExchange("Dead_E01").build();
- }
-
- //死信队列
- @Bean
- public Queue deadQueue1() {
- return QueueBuilder.durable("Dead_Q01").build();
- }
-
- //死信交换机与死信队列的绑定
- @Bean
- public Binding deadBinding1(Queue deadQueue1, DirectExchange deadExchange) {
- return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");
- }
-
- //业务队列
- @Bean
- public Queue queue1() {
- return QueueBuilder
- .durable("Direct_Q01")
- .deadLetterExchange("Dead_E01")
- .deadLetterRoutingKey("RK_DEAD")
- //.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
- //.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
- .build();
- }
-
- //业务交换机
- @Bean
- public DirectExchange exchange() {
- return ExchangeBuilder.directExchange("Direct_E01").build();
- }
-
- //业务交换机与队列的绑定
- @Bean
- public Binding binding1(Queue queue1, DirectExchange exchange) {
- return BindingBuilder.bind(queue1).to(exchange).with("RK01");
- }
-
- //@RabbitListener(queues = "Direct_Q01")
- // public void receiveMessage(OrderingOk msg) {
- // log.info("消费者1 收到消息:"+ msg );
- // int i= 5/0;
- // }
-
- @RabbitListener(queues = "Direct_Q01")
- public void receiveMessage(OrderingOk msg, Message message, Channel channel) throws IOException {
-
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
-
- System.out.println("消费者1 收到消息:" + msg + " tag:" + deliveryTag);
-
- channel.basicReject(deliveryTag, false);
- // try {
- // // 处理消息...
- // int i= 5/0;
- // // 如果处理成功,手动发送ack确认 ,Yes
- // channel.basicAck(deliveryTag, false);
- // } catch (Exception e) {
- // // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject) NO
- // channel.basicNack(deliveryTag, false, false); // 并重新入队
- //
- // }
- }
- }
一般要和自动重启一起使用,否则死信队列收不到消息
- #设置消费者自动应答模式
- spring.rabbitmq.listener.simple.acknowledge-mode = auto
- #开启自动应答重试机制
- spring.rabbitmq.listener.simple.retry.enabled=true
-
- package com.by.consumer;
-
-
- import com.by.model.OrderingOk;
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.io.IOException;
-
- @Configuration
- @Slf4j
- public class DeadConsumer {
- //死信交换机
- @Bean
- public DirectExchange deadExchange() {
- return ExchangeBuilder.directExchange("Dead_E01").build();
- }
-
- //死信队列
- @Bean
- public Queue deadQueue1() {
- return QueueBuilder.durable("Dead_Q01").build();
- }
-
- //死信交换机与死信队列的绑定
- @Bean
- public Binding deadBinding1(Queue deadQueue1, DirectExchange deadExchange) {
- return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");
- }
-
- //业务队列
- @Bean
- public Queue queue1() {
- return QueueBuilder
- .durable("Direct_Q01")
- .deadLetterExchange("Dead_E01")
- .deadLetterRoutingKey("RK_DEAD")
- //.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
- //.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
- .build();
- }
-
- //业务交换机
- @Bean
- public DirectExchange exchange() {
- return ExchangeBuilder.directExchange("Direct_E01").build();
- }
-
- //业务交换机与队列的绑定
- @Bean
- public Binding binding1(Queue queue1, DirectExchange exchange) {
- return BindingBuilder.bind(queue1).to(exchange).with("RK01");
- }
-
- //@RabbitListener(queues = "Direct_Q01")
- // public void receiveMessage(OrderingOk msg) {
- // log.info("消费者1 收到消息:"+ msg );
- // int i= 5/0;
- // }
-
- @RabbitListener(queues = "Direct_Q01")
- public void receiveMessage(OrderingOk msg, Message message, Channel channel) throws IOException {
-
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
-
- System.out.println("消费者1 收到消息:" + msg + " tag:" + deliveryTag);
- int a=10/0;
- // channel.basicReject(deliveryTag, false);
- // try {
- // // 处理消息...
- // int i= 5/0;
- // // 如果处理成功,手动发送ack确认 ,Yes
- // channel.basicAck(deliveryTag, false);
- // } catch (Exception e) {
- // // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject) NO
- // channel.basicNack(deliveryTag, false, false); // 并重新入队
- //
- // }
- }
- }
- //业务队列
- @Bean
- public Queue queue1() {
- return QueueBuilder
- .durable("Direct_Q01")
- .deadLetterExchange("Dead_E01")
- .deadLetterRoutingKey("RK_DEAD")
- .ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
- //.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
- .build();
- }
- //业务队列
- @Bean
- public Queue queue1() {
- return QueueBuilder
- .durable("Direct_Q01")
- .deadLetterExchange("Dead_E01")
- .deadLetterRoutingKey("RK_DEAD")
- // .ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
- .maxLength(5) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过5,会把队列之前的消息依次放进死信队列
- .build();
- }
- /**
- * 测试直联交换机
- *
- * @throws IOException
- * @throws InterruptedException
- */
- @Test
- void contextLoads() throws IOException, InterruptedException {
- for (int i = 0; i < 8; i++) {
- OrderingOk orderingOk = OrderingOk.builder().id(1).name("张三"+i).build();
- directProvide.send(orderingOk);
- }
-
- System.in.read();
- }
id为1,2,3的被装进了死信队列,因为数据太老,业务队列优先要新的数据
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。