当前位置:   article > 正文

RabbitMQ-死信队列常见用法

RabbitMQ-死信队列常见用法

目录

一、什么是死信

 二、什么是死信队列 

​编辑 三、第一种情景:消息被拒绝时

四、第二种场景:. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack

五、第三种场景: 消息的Expiration 过期时长或队列TTL过期时间。

六、 第四种情景:  消息队列达到最大容量


一、什么是死信

    在RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。

死信就是消息在特定场景下的一种表现形式,这些场景包括:

1. 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时 或者拒绝basicReject

2. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack

3. 消息的Expiration 过期时长或队列TTL过期时间

4. 消息队列达到最大容量

上述场景经常产生死信,即消息在这些场景中时,被称为死信。

 二、什么是死信队列 

    死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。

   死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的业务消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。 人工干预

 三、第一种情景:消息被拒绝时

#设置消费者手动应答模式

spring.rabbitmq.listener.simple.acknowledge-mode = manual
  1. package com.by.consumer;
  2. import com.by.model.OrderingOk;
  3. import com.rabbitmq.client.Channel;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.core.*;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import java.io.IOException;
  10. @Configuration
  11. @Slf4j
  12. public class DeadConsumer {
  13. //死信交换机
  14. @Bean
  15. public DirectExchange deadExchange() {
  16. return ExchangeBuilder.directExchange("Dead_E01").build();
  17. }
  18. //死信队列
  19. @Bean
  20. public Queue deadQueue1() {
  21. return QueueBuilder.durable("Dead_Q01").build();
  22. }
  23. //死信交换机与死信队列的绑定
  24. @Bean
  25. public Binding deadBinding1(Queue deadQueue1, DirectExchange deadExchange) {
  26. return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");
  27. }
  28. //业务队列
  29. @Bean
  30. public Queue queue1() {
  31. return QueueBuilder
  32. .durable("Direct_Q01")
  33. .deadLetterExchange("Dead_E01")
  34. .deadLetterRoutingKey("RK_DEAD")
  35. //.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
  36. //.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
  37. .build();
  38. }
  39. //业务交换机
  40. @Bean
  41. public DirectExchange exchange() {
  42. return ExchangeBuilder.directExchange("Direct_E01").build();
  43. }
  44. //业务交换机与队列的绑定
  45. @Bean
  46. public Binding binding1(Queue queue1, DirectExchange exchange) {
  47. return BindingBuilder.bind(queue1).to(exchange).with("RK01");
  48. }
  49. //@RabbitListener(queues = "Direct_Q01")
  50. // public void receiveMessage(OrderingOk msg) {
  51. // log.info("消费者1 收到消息:"+ msg );
  52. // int i= 5/0;
  53. // }
  54. @RabbitListener(queues = "Direct_Q01")
  55. public void receiveMessage(OrderingOk msg, Message message, Channel channel) throws IOException {
  56. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  57. System.out.println("消费者1 收到消息:" + msg + " tag:" + deliveryTag);
  58. channel.basicReject(deliveryTag, false);
  59. // try {
  60. // // 处理消息...
  61. // int i= 5/0;
  62. // // 如果处理成功,手动发送ack确认 ,Yes
  63. // channel.basicAck(deliveryTag, false);
  64. // } catch (Exception e) {
  65. // // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject) NO
  66. // channel.basicNack(deliveryTag, false, false); // 并重新入队
  67. //
  68. // }
  69. }
  70. }

四、第二种场景:. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack

一般要和自动重启一起使用,否则死信队列收不到消息

  1. #设置消费者自动应答模式
  2. spring.rabbitmq.listener.simple.acknowledge-mode = auto
  3. #开启自动应答重试机制
  4. spring.rabbitmq.listener.simple.retry.enabled=true
  1. package com.by.consumer;
  2. import com.by.model.OrderingOk;
  3. import com.rabbitmq.client.Channel;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.core.*;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import java.io.IOException;
  10. @Configuration
  11. @Slf4j
  12. public class DeadConsumer {
  13. //死信交换机
  14. @Bean
  15. public DirectExchange deadExchange() {
  16. return ExchangeBuilder.directExchange("Dead_E01").build();
  17. }
  18. //死信队列
  19. @Bean
  20. public Queue deadQueue1() {
  21. return QueueBuilder.durable("Dead_Q01").build();
  22. }
  23. //死信交换机与死信队列的绑定
  24. @Bean
  25. public Binding deadBinding1(Queue deadQueue1, DirectExchange deadExchange) {
  26. return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");
  27. }
  28. //业务队列
  29. @Bean
  30. public Queue queue1() {
  31. return QueueBuilder
  32. .durable("Direct_Q01")
  33. .deadLetterExchange("Dead_E01")
  34. .deadLetterRoutingKey("RK_DEAD")
  35. //.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
  36. //.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
  37. .build();
  38. }
  39. //业务交换机
  40. @Bean
  41. public DirectExchange exchange() {
  42. return ExchangeBuilder.directExchange("Direct_E01").build();
  43. }
  44. //业务交换机与队列的绑定
  45. @Bean
  46. public Binding binding1(Queue queue1, DirectExchange exchange) {
  47. return BindingBuilder.bind(queue1).to(exchange).with("RK01");
  48. }
  49. //@RabbitListener(queues = "Direct_Q01")
  50. // public void receiveMessage(OrderingOk msg) {
  51. // log.info("消费者1 收到消息:"+ msg );
  52. // int i= 5/0;
  53. // }
  54. @RabbitListener(queues = "Direct_Q01")
  55. public void receiveMessage(OrderingOk msg, Message message, Channel channel) throws IOException {
  56. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  57. System.out.println("消费者1 收到消息:" + msg + " tag:" + deliveryTag);
  58. int a=10/0;
  59. // channel.basicReject(deliveryTag, false);
  60. // try {
  61. // // 处理消息...
  62. // int i= 5/0;
  63. // // 如果处理成功,手动发送ack确认 ,Yes
  64. // channel.basicAck(deliveryTag, false);
  65. // } catch (Exception e) {
  66. // // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject) NO
  67. // channel.basicNack(deliveryTag, false, false); // 并重新入队
  68. //
  69. // }
  70. }
  71. }

五、第三种场景: 消息的Expiration 过期时长或队列TTL过期时间

  1. //业务队列
  2. @Bean
  3. public Queue queue1() {
  4. return QueueBuilder
  5. .durable("Direct_Q01")
  6. .deadLetterExchange("Dead_E01")
  7. .deadLetterRoutingKey("RK_DEAD")
  8. .ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
  9. //.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
  10. .build();
  11. }

六、 第四种情景:  消息队列达到最大容量

  1. //业务队列
  2. @Bean
  3. public Queue queue1() {
  4. return QueueBuilder
  5. .durable("Direct_Q01")
  6. .deadLetterExchange("Dead_E01")
  7. .deadLetterRoutingKey("RK_DEAD")
  8. // .ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
  9. .maxLength(5) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过5,会把队列之前的消息依次放进死信队列
  10. .build();
  11. }
  1. /**
  2. * 测试直联交换机
  3. *
  4. * @throws IOException
  5. * @throws InterruptedException
  6. */
  7. @Test
  8. void contextLoads() throws IOException, InterruptedException {
  9. for (int i = 0; i < 8; i++) {
  10. OrderingOk orderingOk = OrderingOk.builder().id(1).name("张三"+i).build();
  11. directProvide.send(orderingOk);
  12. }
  13. System.in.read();
  14. }

id为1,2,3的被装进了死信队列,因为数据太老,业务队列优先要新的数据 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/393646?site
推荐阅读
相关标签
  

闽ICP备14008679号