当前位置:   article > 正文

SpringBoot中,如何整合RabbitMQ实现延时队列?

rabbitmq delaymillisecond

一、介绍

  • 1、什么是延时队列?
    延时队列即就是放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费
  • 2、适用场景
    (1)商城订单超时未支付,取消订单
    (2)使用权限到期前十分钟提醒用户
    (3)收益项目,投入后一段时间后产生收益

二、实现方式

从以上场景中,我们可以看出,延时队列的主要功能就是在指定的时间之后做指定的事情,那么,我们思考有哪些工具我们可以使用?

  • 1、Redis 监听过期 Key
https://lizhou.blog.csdn.net/article/details/109238083
  • 2、RabbitMQ等实现延时队列

这也是本<typo id="typo-299" data-origin="片" ignoretag="true">片</typo>文章中要讲的知识点,使用 RabbitMQ 实现延时队列有两种方式

(1)利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)

(2)利用 RabbitMQ 中的插件 x-delay-message

本文主要讲解第二种方式,使用插件的方式

三、下载插件

RabbitMQ 实现了一个插件 x-delay-message 来实现延时队列,我们可以从 这里 下载到它

https://www.rabbitmq.com/community-plugins.html

选择 rabbitmq_delayed_message_exchange 插件,如图所示

image.png
image.png

选择 .ez 格式的文件下载,下载后放置 RabbitMQ 的安装目录下的 plugins 目录下,如我的路径为

D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.16\plugins

执行命令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

安装插件完成

四、在SpringBoot整合RabbitMQ

1、引入 RabbitMQ 依赖

  1. <!-- rabbitmq消息队列 -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

2、配置 RabbitMQ 信息

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. username: guest
  6. password: guest
  7. listener:
  8. simple:
  9. # 手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 none
  10. acknowledge-mode: manual

3、RabbitMQ 常量类

  1. package com.asurplus.common.rabbitmq;
  2. /**
  3. * rabbit常量类
  4. *
  5. * @Author Lizhou
  6. */
  7. public final class RabbitConst {
  8. /**
  9. * 交换机
  10. */
  11. public static final String DELAY_EXCHANGE = "delay_exchange";
  12. /**
  13. * 队列
  14. */
  15. public static final String DELAY_QUEUE = "delay_queue";
  16. /**
  17. * 路由
  18. */
  19. public static final String DELAY_KEY = "delay_key";
  20. }

4、RabbitMQ 配置类

  1. package com.asurplus.common.rabbitmq;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.CustomExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. /**
  11. * rabbitmq配置类
  12. *
  13. * @Author Lizhou
  14. */
  15. @Configuration
  16. public class RabbitConfig {
  17. /**
  18. * 延时队列交换机
  19. *
  20. * @return
  21. */
  22. @Bean
  23. public CustomExchange delayExchange() {
  24. Map<String, Object> args = new HashMap<>();
  25. args.put("x-delayed-type", "direct");
  26. return new CustomExchange(RabbitConst.DELAY_EXCHANGE, "x-delayed-message", true, false, args);
  27. }
  28. /**
  29. * 延时队列
  30. *
  31. * @return
  32. */
  33. @Bean
  34. public Queue delayQueue() {
  35. return new Queue(RabbitConst.DELAY_QUEUE, true);
  36. }
  37. /**
  38. * 给延时队列绑定交换机
  39. *
  40. * @return
  41. */
  42. @Bean
  43. public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
  44. return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConst.DELAY_KEY).noargs();
  45. }
  46. }

5、RabbitMQ 生产者

  1. package com.asurplus.common.rabbitmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * rabbitMq生产者
  8. *
  9. * @Author Lizhou
  10. */
  11. @Component
  12. @Slf4j
  13. public class RabbitProducer {
  14. @Autowired
  15. private RabbitTemplate rabbitTemplate;
  16. /**
  17. * 发送消息
  18. *
  19. * @param object 发送对象
  20. * @param millisecond 延时(毫秒)
  21. */
  22. public void sendDelayMessage(Object object, long millisecond) {
  23. this.rabbitTemplate.convertAndSend(
  24. RabbitConst.DELAY_EXCHANGE,
  25. RabbitConst.DELAY_KEY,
  26. object.toString(),
  27. message -> {
  28. message.getMessageProperties().setHeader("x-delay", millisecond);
  29. return message;
  30. }
  31. );
  32. }
  33. }

6、RabbitMQ 消费者

  1. package com.asurplus.common.rabbitmq;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.io.IOException;
  8. /**
  9. * activeMq消费者
  10. *
  11. * @Author Lizhou
  12. */
  13. @Component
  14. @Slf4j
  15. public class RabbitConsumer {
  16. /**
  17. * 接收消息
  18. *
  19. * @param object 监听的内容
  20. */
  21. @RabbitListener(queues = RabbitConst.DELAY_QUEUE)
  22. public void cfgUserReceiveDealy(Object object, Message message, Channel channel) throws IOException {
  23. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  24. try {
  25. log.info("接受消息:{}", object.toString());
  26. } catch (Exception e) {
  27. log.error(e.getMessage());
  28. /**
  29. * basicRecover方法是进行补发操作,
  30. * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
  31. * 设置为false是只补发给当前的consumer
  32. */
  33. channel.basicRecover(false);
  34. }
  35. }
  36. }

五、测试

  1. package com.asurplus;
  2. import com.asurplus.common.rabbitmq.RabbitProducer;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.SpringApplication;
  5. import org.springframework.boot.autoconfigure.SpringBootApplication;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. @SpringBootApplication
  9. @RestController
  10. public class RabbitmqApplication {
  11. @Autowired
  12. private RabbitProducer product;
  13. @GetMapping("init")
  14. public void init() {
  15. String message1 = "这是第一条消息";
  16. String message2 = "这是第二条消息";
  17. product.sendDelayMessage(message1, 5000);
  18. product.sendDelayMessage(message2, 10000);
  19. }
  20. public static void main(String[] args) {
  21. SpringApplication.run(RabbitmqApplication.class, args);
  22. }
  23. }

通过测试,第一条消息在 5s后接收到,第二条消息在 10s后接收到,说明我们的延时队列已经成功

作者:Asurplus、
原文链接:https://lizhou.blog.csdn.net/article/details/113917675

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/507361
推荐阅读
相关标签
  

闽ICP备14008679号