当前位置:   article > 正文

RabbitMQ - 死信队列,延时队列_rabbitmq 死信队列和延迟队列

rabbitmq 死信队列和延迟队列

Time-To-Live and Expiration — RabbitMQ

一、死信队列

Dead Letter Exchanges — RabbitMQ

死信队列:

DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列

死信消息:

  • 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false
  • 消息过期(消息TTL过期。TTL:Time To Live的简称,即过期时间)
  • 队列达到最大的长度

过期消息:

rabbitmq 中存在2种方法可设置消息的过期时间:

  • 第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间
  • 第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样

如果同时使用这2种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个 死信 消息

队列设置:在队列申明的时候使用** x-message-ttl **参数,单位为 毫秒;

  • 队列中这个属性的设置要在第一次声明队列的时候设置才有效,如果队列一开始已存在且没有这个属性,则要删掉队列再重新声明才可以。
  • 队列的 TTL 只能被设置为某个固定的值,一旦设置后则不能更改,否则会抛出异常

单个消息设置:是设置消息属性的 expiration 参数的值,单位为 毫秒。

说明:

对于第一种设置队列属性的方法,一旦消息过期,就会从队列中抹去;而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的

 1. 生产者:
  声明队列的时候用属性指定其死信队列交换机名称。

测试:

  1. package rabbitmq;
  2. import java.io.IOException;
  3. import java.util.HashMap;
  4. import java.util.Map;
  5. import java.util.concurrent.TimeoutException;
  6. import com.rabbitmq.client.BuiltinExchangeType;
  7. import com.rabbitmq.client.Channel;
  8. import com.rabbitmq.client.Connection;
  9. import com.rabbitmq.client.ConnectionFactory;
  10. public class Producer {
  11.     public static ConnectionFactory getConnectionFactory() {
  12.         // 创建连接工程,下面给出的是默认的case
  13.         ConnectionFactory factory = new ConnectionFactory();
  14.         factory.setHost("192.168.99.100");
  15.         factory.setPort(5672);
  16.         factory.setUsername("guest");
  17.         factory.setPassword("guest");
  18.         factory.setVirtualHost("/");
  19.         return factory;
  20.     }
  21.     public static void main(String[] args) throws IOException, TimeoutException  {
  22.         ConnectionFactory connectionFactory = getConnectionFactory();
  23.         Connection newConnection = null;
  24.         Channel createChannel = null;
  25.         try {
  26.             newConnection = connectionFactory.newConnection();
  27.             createChannel = newConnection.createChannel();
  28.             
  29.             // 声明一个正常的direct类型的交换机
  30.             createChannel.exchangeDeclare("order.exchange", BuiltinExchangeType.DIRECT);
  31.             // 声明死信交换机为===order.dead.exchange
  32.             String dlxName = "order.dead.exchange";
  33.             createChannel.exchangeDeclare(dlxName, BuiltinExchangeType.DIRECT);
  34.             // 声明队列并指定死信交换机为上面死信交换机
  35.             Map<String, Object> arg = new HashMap<String, Object>();
  36.             arg.put("x-dead-letter-exchange", dlxName);
  37.             createChannel.queueDeclare("myQueue", true, false, false, arg);
  38.             
  39.             String message = "测试消息";
  40.             createChannel.basicPublish("order.exchange", "routing_key_myQueue", null, message.getBytes());
  41.             System.out.println("消息发送成功");
  42.         } catch (Exception e) {
  43.             e.printStackTrace();
  44.         } finally {
  45.             if (createChannel != null) {
  46.                 createChannel.close();
  47.             }
  48.             if (newConnection != null) {
  49.                 newConnection.close();
  50.             }
  51.         }
  52.         
  53.     }
  54. }

结果:

(1)生成两个Exchange

 (2)队列myQueue的死信队列有属性

2. 消费者: 
  一个消费者监听正常队列,一个消费者监听死信队列。(只是绑定的交换机不同)

消费者一:监听正常队列

  1. package rabbitmq;
  2. import java.io.IOException;
  3. import java.util.Date;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.TimeoutException;
  6. import com.rabbitmq.client.Channel;
  7. import com.rabbitmq.client.Connection;
  8. import com.rabbitmq.client.ConnectionFactory;
  9. import com.rabbitmq.client.DefaultConsumer;
  10. import com.rabbitmq.client.Envelope;
  11. import com.rabbitmq.client.AMQP.BasicProperties;
  12. public class Consumer {
  13.     public static ConnectionFactory getConnectionFactory() {
  14.         // 创建连接工程,下面给出的是默认的case
  15.         ConnectionFactory factory = new ConnectionFactory();
  16.         factory.setHost("192.168.99.100");
  17.         factory.setPort(5672);
  18.         factory.setUsername("guest");
  19.         factory.setPassword("guest");
  20.         factory.setVirtualHost("/");
  21.         return factory;
  22.     }
  23.     public static void main(String[] args) throws IOException, TimeoutException {
  24.         ConnectionFactory connectionFactory = getConnectionFactory();
  25.         Connection newConnection = null;
  26.         Channel createChannel = null;
  27.         try {
  28.             newConnection = connectionFactory.newConnection();
  29.             createChannel = newConnection.createChannel();
  30.             // 队列绑定交换机-channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])
  31.             createChannel.queueBind("myQueue", "order.exchange", "routing_key_myQueue");
  32.             createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {
  33.                 @Override
  34.                 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  35.                         byte[] body) throws IOException {
  36.                     System.out.println("consumerTag: " + consumerTag);
  37.                     System.out.println("envelope: " + envelope);
  38.                     System.out.println("properties: " + properties);
  39.                     String string = new String(body, "UTF-8");
  40.                     System.out.println("接收到消息: -》 " + string);
  41.                     long deliveryTag = envelope.getDeliveryTag();
  42.                     Channel channel = this.getChannel();
  43.                     System.out.println("拒绝消息, 使之进入死信队列");
  44.                     System.out.println("时间: " + new Date());
  45.                     try {
  46.                         TimeUnit.SECONDS.sleep(3);
  47.                     } catch (InterruptedException e) {
  48.                     }
  49.                     
  50.                     // basicReject第二个参数为false进入死信队列或丢弃
  51.                     channel.basicReject(deliveryTag, false);
  52.                 }
  53.             });
  54.         } catch (Exception e) {
  55.             e.printStackTrace();
  56.         } finally {
  57.         }
  58.     }
  59. }

消费者二:监听死信队列

  1. package rabbitmq;
  2. import java.io.IOException;
  3. import java.util.Date;
  4. import java.util.concurrent.TimeoutException;
  5. import com.rabbitmq.client.Channel;
  6. import com.rabbitmq.client.Connection;
  7. import com.rabbitmq.client.ConnectionFactory;
  8. import com.rabbitmq.client.DefaultConsumer;
  9. import com.rabbitmq.client.Envelope;
  10. import com.rabbitmq.client.AMQP.BasicProperties;
  11. public class Consumer2 {
  12.     public static ConnectionFactory getConnectionFactory() {
  13.         // 创建连接工程,下面给出的是默认的case
  14.         ConnectionFactory factory = new ConnectionFactory();
  15.         factory.setHost("192.168.99.100");
  16.         factory.setPort(5672);
  17.         factory.setUsername("guest");
  18.         factory.setPassword("guest");
  19.         factory.setVirtualHost("/");
  20.         return factory;
  21.     }
  22.     public static void main(String[] args) throws IOException, TimeoutException {
  23.         ConnectionFactory connectionFactory = getConnectionFactory();
  24.         Connection newConnection = null;
  25.         Channel createChannel = null;
  26.         try {
  27.             newConnection = connectionFactory.newConnection();
  28.             createChannel = newConnection.createChannel();
  29.             // 队列绑定交换机-channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])
  30.             createChannel.queueBind("myQueue", "order.dead.exchange", "routing_key_myQueue");
  31.             createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {
  32.                 @Override
  33.                 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  34.                         byte[] body) throws IOException {
  35.                     System.out.println("时间: " + new Date());
  36.                     
  37.                     System.out.println("consumerTag: " + consumerTag);
  38.                     System.out.println("envelope: " + envelope);
  39.                     System.out.println("properties: " + properties);
  40.                     String string = new String(body, "UTF-8");
  41.                     System.out.println("接收到消息: -》 " + string);
  42.                     long deliveryTag = envelope.getDeliveryTag();
  43.                     Channel channel = this.getChannel();
  44.                     channel.basicAck(deliveryTag, true);
  45.                     System.out.println("死信队列中处理完消息息");
  46.                 }
  47.             });
  48.         } catch (Exception e) {
  49.             e.printStackTrace();
  50.         } finally {
  51.         }
  52.     }
  53. }

结果: 消费者一先正常监听到,basicReject为false拒绝后进入死信队列;消费者二监听的死信队列收到消息。

消费者一打出的日志如下:

  1. consumerTag: amq.ctag-0noHs24F0FsGe-dfwwqWNw
  2. envelope: Envelope(deliveryTag=1, redeliver=false, exchange=order.exchange, routingKey=routing_key_myQueue)
  3. properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
  4. 接收到消息: -》 测试消息
  5. 拒绝消息, 使之进入死信队列
  6. 时间: Sat Nov 07 12:18:44 CST 2020

消费者二打出的日志如下:

  1. 时间: Sat Nov 07 12:18:47 CST 2020
  2. consumerTag: amq.ctag-ajYMpMFkXHDiYWkD3XFJ7Q
  3. envelope: Envelope(deliveryTag=1, redeliver=false, exchange=order.dead.exchange, routingKey=routing_key_myQueue)
  4. properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{reason=rejected, count=1, exchange=order.exchange, time=Sat Nov 07 01:52:19 CST 2020, routing-keys=[routing_key_myQueue], queue=myQueue}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
  5. 接收到消息: -》 测试消息
  6. 死信队列中处理完消息息

注意:

  进入死信队列之后,headers 加了一些死信相关的信息,包括原队列以及进入死信的原因。

补充:在队列进入死信队列之前也可以修改其routingKey,而且只有在指定x-dead-letter-exchange的前提下才能修改下面属性,否则会报错

(1)修改生产者声明队列的方式,如下:

  1. // 声明一个正常的direct类型的交换机
  2.             createChannel.exchangeDeclare("order.exchange", BuiltinExchangeType.DIRECT);
  3.             // 声明死信交换机为===order.dead.exchange
  4.             String dlxName = "order.dead.exchange";
  5.             createChannel.exchangeDeclare(dlxName, BuiltinExchangeType.DIRECT);
  6.             // 声明队列并指定死信交换机为上面死信交换机
  7.             Map<String, Object> arg = new HashMap<String, Object>();
  8.             arg.put("x-dead-letter-exchange", dlxName);
  9.             // 修改进入死信队列的routingkey,如果不修改会使用默认的routingKey
  10.             arg.put("x-dead-letter-routing-key", "routing_key_myQueue_dead");
  11.             createChannel.queueDeclare("myQueue", true, false, false, arg);

(2)修改监听死信队列的消费者二:

  1. // 队列绑定交换机-channel.queueBind(队列名, 交换机名, 路由key[广播消息设置为空串])
  2.             createChannel.queueBind("myQueue", "order.dead.exchange", "routing_key_myQueue_dead");

结果,收到消费者二收到的信息如下:

  1. 时间: Sat Nov 07 12:27:08 CST 2020
  2. consumerTag: amq.ctag-THqpEdYH_-iNeCIccgpuaw
  3. envelope: Envelope(deliveryTag=1, redeliver=false, exchange=order.dead.exchange, routingKey=routing_key_myQueue_dead)
  4. properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{reason=rejected, count=1, exchange=order.exchange, time=Sat Nov 07 02:00:41 CST 2020, routing-keys=[routing_key_myQueue], queue=myQueue}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
  5. 接收到消息: -》 测试消息
  6. 死信队列中处理完消息

二、延时队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费

RabbitMQ本身没提供延时队列,我们可以利用消息的生存时间和死信队列实现延时

典型的应用场景就是订单30分钟内未支付就关闭订单,还有一种场景,账单24小时未确认,就发送提醒消息

延时队列插件安装

2.1.1、yml配置

  1. spring:
  2. rabbitmq:
  3. host: 192.168.99.12
  4. port: 5672
  5. username: guest
  6. password: guest
  7. # 发送确认
  8. publisher-confirms: true
  9. # 路由失败回调
  10. publisher-returns: true
  11. template:
  12. # 必须设置成true 消息路由失败通知监听者,false 将消息丢弃
  13. mandatory: true
  14. #消费端
  15. listener:
  16. simple:
  17. # 每次从RabbitMQ获取的消息数量
  18. prefetch: 1
  19. default-requeue-rejected: false
  20. # 每个队列启动的消费者数量
  21. concurrency: 1
  22. # 每个队列最大的消费者数量
  23. max-concurrency: 1
  24. # 签收模式为手动签收-那么需要在代码中手动ACK
  25. acknowledge-mode: manual
  26. #邮件队列
  27. email:
  28. queue:
  29. name: demo.email
  30. #邮件交换器名称
  31. exchange:
  32. name: demoTopicExchange
  33. #死信队列
  34. dead:
  35. letter:
  36. queue:
  37. name: demo.dead.letter
  38. exchange:
  39. name: demoDeadLetterTopicExchange
  40. #延时队列
  41. delay:
  42. queue:
  43. name: demo.delay
  44. exchange:
  45. name: demoDelayTopicExchange

2.1.2、延时队列配置

  1. /**
  2. * rabbitmq 配置
  3. *
  4. * @author DUCHONG
  5. * @since 2020-08-23 14:05
  6. **/
  7. @Configuration
  8. @Slf4j
  9. public class RabbitmqConfig {
  10. @Value("${email.queue.name}")
  11. private String emailQueue;
  12. @Value("${exchange.name}")
  13. private String topicExchange;
  14. @Value("${dead.letter.queue.name}")
  15. private String deadLetterQueue;
  16. @Value("${dead.letter.exchange.name}")
  17. private String deadLetterExchange;
  18. @Value("${delay.queue.name}")
  19. private String delayQueue;
  20. @Value("${delay.exchange.name}")
  21. private String delayExchange;
  22. @Bean
  23. public Queue emailQueue() {
  24. Map<String, Object> arguments = new HashMap<>(2);
  25. // 绑定死信交换机
  26. arguments.put("x-dead-letter-exchange", deadLetterExchange);
  27. // 绑定死信的路由key
  28. arguments.put("x-dead-letter-routing-key", deadLetterQueue+".#");
  29. return new Queue(emailQueue,true,false,false,arguments);
  30. }
  31. @Bean
  32. TopicExchange emailExchange() {
  33. return new TopicExchange(topicExchange);
  34. }
  35. @Bean
  36. Binding bindingEmailQueue() {
  37. return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");
  38. }
  39. //私信队列和交换器
  40. @Bean
  41. public Queue deadLetterQueue() {
  42. return new Queue(deadLetterQueue);
  43. }
  44. @Bean
  45. TopicExchange deadLetterExchange() {
  46. return new TopicExchange(deadLetterExchange);
  47. }
  48. @Bean
  49. Binding bindingDeadLetterQueue() {
  50. return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueue+".#");
  51. }
  52. //延时队列
  53. @Bean
  54. public Queue delayQueue() {
  55. return new Queue(delayQueue);
  56. }
  57. @Bean
  58. CustomExchange delayExchange() {
  59. Map<String, Object> args = new HashMap<>();
  60. args.put("x-delayed-type", "topic");
  61. //参数二为类型:必须是x-delayed-message
  62. return new CustomExchange(delayExchange, "x-delayed-message", true, false, args);
  63. }
  64. @Bean
  65. Binding bindingDelayQueue() {
  66. return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(delayQueue+".#").noargs();
  67. }
  68. }

2.2、消息发送方

30分钟时间太久了,这里延时2分钟来看效果

  1. @Configuration
  2. @EnableScheduling
  3. @Slf4j
  4. public class ScheduleController {
  5. @Autowired
  6. RabbitTemplate rabbitTemplate;
  7. @Value("${exchange.name}")
  8. private String topicExchange;
  9. @Value("${delay.exchange.name}")
  10. private String delayTopicExchange;
  11. @Scheduled(cron = "0 0/1 * * * ?")
  12. public void sendEmailMessage() {
  13. String msg = RandomStringUtils.randomAlphanumeric(8);
  14. JSONObject email=new JSONObject();
  15. email.put("content",msg);
  16. email.put("to","duchong@qq.com");
  17. CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
  18. rabbitTemplate.convertAndSend(topicExchange,"demo.email.x",email.toJSONString(),correlationData);
  19. log.info("---发送 email 消息---{}---messageId---{}",email,correlationData.getId());
  20. }
  21. @Scheduled(cron = "0 0/1 * * * ?")
  22. public void sendDelayOrderMessage() throws Exception{
  23. //订单号 id实际是保存订单后返回的,这里用uuid代替
  24. String orderId = UUID.randomUUID().toString();
  25. // 模拟订单信息
  26. JSONObject order=new JSONObject();
  27. order.put("orderId",orderId);
  28. order.put("goodsName","vip充值");
  29. order.put("orderAmount","99.00");
  30. CorrelationData correlationData=new CorrelationData(orderId);
  31. MessageProperties messageProperties = new MessageProperties();
  32. messageProperties.setMessageId(orderId);
  33. //30分钟时间太长,这里延时120s消费
  34. messageProperties.setHeader("x-delay", 120000);
  35. Message message = new Message(order.toJSONString().getBytes(CharEncoding.UTF_8), messageProperties);
  36. rabbitTemplate.convertAndSend(delayTopicExchange,"demo.delay.x",message,correlationData);
  37. log.info("---发送 order 消息---{}---orderId---{}",order,correlationData.getId());
  38. //睡一会,为了看延迟效果
  39. TimeUnit.MINUTES.sleep(10);
  40. }
  41. }

2.3、消息消费方

  1. @Component
  2. @Slf4j
  3. public class MessageHandler {
  4. /**
  5. * 邮件发送
  6. * @param message
  7. * @param channel
  8. * @param headers
  9. * @throws IOException
  10. */
  11. @RabbitListener(queues ="demo.email")
  12. @RabbitHandler
  13. public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
  14. try {
  15. String msg=new String(message.getBody(), CharEncoding.UTF_8);
  16. JSONObject jsonObject = JSON.parseObject(msg);
  17. jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
  18. log.info("---接受到消息---{}",jsonObject);
  19. //主动异常
  20. int m=1/0;
  21. //手动签收
  22. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  23. }
  24. catch (Exception e) {
  25. log.info("handleEmailMessage捕获到异常,拒绝重新入队---消息ID---{}", headers.get("spring_returned_message_correlation"));
  26. //异常,ture 重新入队,或者false,进入死信队列
  27. channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
  28. }
  29. }
  30. /**
  31. * 死信消费者,自动签收开启状态下,超过重试次数,或者手动签收,reject或者Nack
  32. * @param message
  33. */
  34. @RabbitListener(queues = "demo.dead.letter")
  35. public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {
  36. //可以考虑数据库记录,每次进来查数量,达到一定的数量,进行预警,人工介入处理
  37. log.info("接收到死信消息:---{}---消息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));
  38. //回复ack
  39. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  40. }
  41. /**
  42. * 延时队列消费
  43. * @param message
  44. * @param channel
  45. * @param headers
  46. * @throws IOException
  47. */
  48. @RabbitListener(queues ="demo.delay")
  49. @RabbitHandler
  50. public void handleOrderDelayMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
  51. try {
  52. String msg=new String(message.getBody(), CharEncoding.UTF_8);
  53. JSONObject jsonObject = JSON.parseObject(msg);
  54. log.info("---接受到订单消息---orderId---{}",message.getMessageProperties().getMessageId());
  55. log.info("---订单信息---order---{}",jsonObject);
  56. //业务逻辑,根据订单id获取订单信息,如果还未支付,设置关闭状态,如果已支付,不做任何处理
  57. //手动签收
  58. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  59. }
  60. catch (Exception e) {
  61. log.info("handleOrderDelayMessage捕获到异常,重新入队---orderId---{}", headers.get("spring_returned_message_correlation"));
  62. //异常,ture 重新入队,或者false,进入死信队列
  63. channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
  64. }
  65. }
  66. }

2.4、结果

运行结果显示,同一个订单号的消息,发送过后2分钟,消费者才接受到,符合预期

https://www.cnblogs.com/geekdc/p/13550620.html

消息队列RabbitMQ(五):死信队列与延迟队列

rabbitmq的延迟队列和死信队列_死信队列和延时队列的区别_zhuwenaptx的博客-CSDN博客

RabbitMQ的死信队列和延时队列 - 简书

RabbitMQ死信队列与延迟队列_51CTO博客_rabbitmq延迟队列

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/930166
推荐阅读
相关标签
  

闽ICP备14008679号