当前位置:   article > 正文

RabbitMq基于延迟插件实现延迟队列_基于 rabbitmq_delayed_message_exchange插件,实现延迟队列效果

基于 rabbitmq_delayed_message_exchange插件,实现延迟队列效果

概述:

基于延迟插件实现的延迟队列可以真正实现延迟,不会像死信队列做的延迟那样,无论延迟时间均按顺序排队的情况,基于延迟插件实现的延迟队列是不会按照顺序来排队执行,按时间长短执行,时间长的就延迟慢,时间短的则延迟快,真正意义上实现了消息的延迟

1.安装延迟队列插件

Community Plugins — RabbitMQhttps://www.rabbitmq.com/community-plugins.html下载 rabbitmg_delayed_message_exchange 插件

具体安装参详其它博主博客

2.利用延迟插件实现流程图

1.死信也可以实现延迟队列,但是有很大的缺点,就是不管延迟时长,都得按顺序排队,切死信的方式实现延迟流程结构复杂

2.基于延迟插件实现,主要就是延迟交换机的类型,需要特殊去声明好

3代码实例

1.交换机,队列配置类 

  1. package com.rabbitmqtest.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.CustomExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.beans.factory.annotation.Qualifier;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import java.util.HashMap;
  10. @Configuration
  11. public class DelayedQueueConfig {
  12. //队列
  13. public static final String DELAYED_QUEUE_NAME = "delayed.queue";
  14. //交换机
  15. public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
  16. //routingKey
  17. public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
  18. //声明队列
  19. @Bean
  20. public Queue delayedQueue() {
  21. return new Queue(DELAYED_QUEUE_NAME);
  22. }
  23. //声明交换机 基于延迟插件的交换机
  24. @Bean
  25. public CustomExchange delayedExchange() {
  26. HashMap<String, Object> arguments = new HashMap<>();
  27. arguments.put("x-delayed-type", "direct");
  28. /**
  29. * 1.交换机名称
  30. * 2.交换机的类型
  31. * 3.是否需要持久化
  32. * 4.是否需要自动删除
  33. * 5.其它参数
  34. */
  35. return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
  36. true, false, arguments);
  37. }
  38. //绑定 队列和延迟交换机
  39. @Bean
  40. public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {
  41. return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
  42. }
  43. }

2. 消费者

  1. package com.rabbitmqtest.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.util.Date;
  8. /**
  9. * 监听死信队列
  10. * 过期时间TTL
  11. */
  12. @Component
  13. @Slf4j
  14. public class DeadLetterQueueConsumer {
  15. //接收消息,监听的队列名称对应config里声明的队列
  16. @RabbitListener(queues = "QD")
  17. public void receiveD(Message message, Channel channel){
  18. String s = new String(message.getBody());
  19. log.info("当前时间: [{}] ,收到死信队列的消息 : [{}] ",new Date().toString(),s);
  20. }
  21. }

3.生产者/发送方

  1. package com.rabbitmqtest.controller;
  2. import com.rabbitmqtest.config.DelayedQueueConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.PathVariable;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import javax.annotation.Resource;
  9. import java.util.Date;
  10. @Slf4j
  11. @RestController
  12. public class RabbitTestController {
  13. @Resource
  14. private RabbitTemplate rabbitTemplate;
  15. /**
  16. * 基于延迟插件发送消息
  17. * @param message
  18. * @param delayTime
  19. */
  20. @GetMapping("/sendDelayMsg/{message}/{delayTime}")
  21. public void rabbitSimple1(@PathVariable("message") String message,@PathVariable("delayTime") Integer delayTime){
  22. log.info("当前时间: [{}] ,发送一条时长:[{}] ms 发送信息给延迟队列delayed.queue : [{}] ",new Date().toString(),delayTime,message);
  23. //发送消息对应config声明的交换机关系
  24. rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg ->{
  25. //发送消息的时候 延迟时长 单位:ms
  26. msg.getMessageProperties().setDelay(delayTime);
  27. return msg;
  28. });
  29. }
  30. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/400986
推荐阅读
相关标签
  

闽ICP备14008679号