当前位置:   article > 正文

RabbitMq实现延迟队列功能

RabbitMq实现延迟队列功能

1、rabbitmq服务端打开延迟插件 (超过 4294967295毫秒 ≈ 1193 小时 ≈ 49.7 天  这个时间会立即触发)

注意:只有RabbitMQ 3.6.x以上才支持

在下载好之后,解压得到.ez结尾的插件包,将其复制到RabbitMQ安装目录下的plugins文件夹。

然后通过命令行启用该插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

该插件在通过上述命令启用后就可以直接使用,不需要重启。

2、添加依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

3、配置交换机和队列

  1. @Configuration
  2. public class DelayedConfig {
  3. public static String EXCHANGE_NAME = "delayed_exchange";
  4. public static String QUEUE_NAME = "delayed_queue";
  5. public static String KEY_NAME = "delayed_key";
  6. /**
  7. * 基于插件实现的交换机,必须是CustomExchange类型,标识这是一个延时类型的交换机
  8. */
  9. @Bean()
  10. public CustomExchange delayedExchange(){
  11. Map<String,Object> params = new HashMap<>();
  12. params.put("x-delayed-type","direct");
  13. //参数1:交换机名字,参数2:交换机的类型,参数3:是否持久化,参数4:是否自动删除队列,参数5:交换机的额外参数设置
  14. return new CustomExchange(EXCHANGE_NAME,"x-delayed-message",true,false,params);
  15. }
  16. @Bean()
  17. public Queue delayedQueue(){
  18. return new Queue(QUEUE_NAME);
  19. }
  20. @Bean
  21. public Binding delayedBinding(){
  22. return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(KEY_NAME).noargs();
  23. }
  24. }

 4、发送和接收消息

  1. @GetMapping("/t5")
  2. public void t5(){
  3. Date date = new Date();
  4. System.out.println("发送时间:" + date.toString());
  5. //发送消息
  6. rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME,DelayedConfig.KEY_NAME,"1延迟消息wxm",
  7. msg->{msg.getMessageProperties().setHeader(MessageProperties.X_DELAY, 15552000000L);
  8. msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  9. return msg;});
  10. //发送消息
  11. rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME,DelayedConfig.KEY_NAME,"延迟消息wxm",
  12. msg->{msg.getMessageProperties().setDelay(10000);
  13. msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  14. return msg;});
  15. log.info("发送成功");
  16. }
  17. @RabbitHandler
  18. @RabbitListener(queues = "delayed_queue")
  19. public void getDelayed(Message message, Channel channel) throws Exception{
  20. Date date = new Date();
  21. String rightNow = date.toString();
  22. String msg = new String(message.getBody());
  23. // 手动应答
  24. System.out.println(message.getMessageProperties().getDeliveryTag());
  25. channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  26. System.out.println("接受成功:"+msg+rightNow);
  27. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/1015793
推荐阅读
相关标签
  

闽ICP备14008679号