当前位置:   article > 正文

rabbitmq的延迟队列和死信队列的实现(二种方式,超详细)_mq 延时消息 死信队列

mq 延时消息 死信队列

一、什么是延迟对列和死信队列

死信队列:元素产生后没及时的被消费,一直存放在队列中。

延迟队列:延时队列,队列内部是有顺序的,其最重要的特性是延迟时间,是希望在指定时间到了以后或之前取出和处理,类似一个定时任务,但是比定时任务要节省资源。延时队列其实就是特殊的死信队列。

但是队列的特点是先进先出,如果对应不同的元素有不同的时间,那么如果存在延迟时间小的元素在延迟时间大的元素之后,那么就达不到对应的效果。

如何实现类似定时任务的效果呢?

就是当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
 

二、如何实现类似于定时任务的实现方式(两种)

1.延迟队列(TTL)+死信队列(DLX)

2.死信队列(DLX)+对应的插件

三、具体实现方式

1.1 ,TTL+DXL

这个方法的目的是,先产生两个队列,其中一个队列作为死信队列,一个作为绑定死信的延迟队列。通过设置TTL(延迟时间:可以再这里设置,也可以在生产者针对消息进行设置,但是这种方法,要求一个死信队列存放的都是同一个延迟时间的消息)。

  1. /**
  2. * 构建TTL死信延迟的exchange,并对其进行命名
  3. */
  4. @Bean
  5. public DirectExchange delayDirectExchange() {
  6. return ExchangeBuilder
  7. .directExchange(QueueEnum.DELAY_QUEUE.getExchangeName())
  8. .durable(true)
  9. .build();
  10. }
  11. /**
  12. * 用来转发死信的exchange(也就是一个普通的exchange)
  13. */
  14. @Bean
  15. public DirectExchange delayBindExchange() {
  16. return ExchangeBuilder
  17. .directExchange(QueueEnum.DELAY_BIND_QUEUE.getExchangeName())
  18. .durable(true)
  19. .build();
  20. }
  21. /**
  22. * TTL死信延迟队列
  23. */
  24. @Bean
  25. public Queue delayQueue() {
  26. return QueueBuilder
  27. .durable(QueueEnum.DELAY_QUEUE.getQueueName())
  28. .withArgument("x-message-ttl", CommonConstant.OUT_ORDER_TIME)
  29. .withArgument("x-dead-letter-exchange", QueueEnum.DELAY_BIND_QUEUE.getExchangeName())
  30. .withArgument("x-dead-letter-routing-key", QueueEnum.DELAY_BIND_QUEUE.getPath())
  31. .build();
  32. }
  33. /**
  34. * 负责转发TTL的队列(也是一个普通的队列)
  35. */
  36. @Bean
  37. public Queue delayBindQueue() {
  38. return QueueBuilder
  39. .durable(QueueEnum.DELAY_BIND_QUEUE.getQueueName())
  40. .build();
  41. }
  42. /**
  43. * TTL死信转发绑定
  44. */
  45. @Bean
  46. Binding delayBindBinding (){
  47. return BindingBuilder
  48. .bind(delayBindQueue())
  49. .to(delayBindExchange())
  50. .with(QueueEnum.DELAY_BIND_QUEUE.getPath());
  51. }
  52. }

其中延迟TTL队列的重要的属性

x-message-ttl:延迟时间
x-dead-letter-exchange:绑定的死信交换机
x-dead-letter-routing-key: 绑定的路由,一般是对应死信队列名称。

其中生产者通过调用方法可以把消息的延迟时间加上去,mq取延迟时间最短的为准。

  1. /**
  2. * 发送延迟消息
  3. * @param msg 消息内容
  4. * @param delayTime 延迟时间,单位毫秒
  5. */
  6. private static AmqpTemplate amqpTemplate;
  7. public static void sendDelayMsg(String msg, Long delayTime,QueueEnum queueEnum){
  8. amqpTemplate.convertAndSend(queueEnum.getExchangeName(),queueEnum.getPath(),msg,message -> {
  9. message.getMessageProperties().setExpiration(delayTime.toString());
  10. return message;
  11. });
  12. }

这个是自己写的方法,其中生产者生产消息的最重要的方法是

convertAndSend()

1.2 2、死信队列(DLX)+对应的插件

上面演示了通过TTL+DLX实现的延迟队列,这样分的时间段越多,需要的交换机及队列也越多。前面的方式实现起来有点复杂,其实有更简单的实现方式,在RabbitMQ 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。

这个方法我没有尝试
https://blog.csdn.net/weixin_46991199/article/details/131782214?spm=1001.2014.3001.5502icon-default.png?t=N6B9https://blog.csdn.net/weixin_46991199/article/details/131782214?spm=1001.2014.3001.5502

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/930175
推荐阅读
相关标签
  

闽ICP备14008679号