当前位置:   article > 正文

Springcloud集成 RabbitMQ延时队列_springcloud rabbitmq 延迟队列

springcloud rabbitmq 延迟队列

一、场景

     当涉及到需要延时处理的业务,比如订单30分钟后过期,2小时后操作业务数据等操作,这里选择用MQ的延时队列+插件来处理,本文记录具体代码实现供参考。

二、代码配置

统一集成在common包中,供各服务集成调用。

1.  mq   config配置

  1. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  2. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.config.ConfigurableBeanFactory;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.context.annotation.Scope;
  8. /**
  9. * 描述:RabbitMQ配置类
  10. *
  11. * @author: winy_work
  12. * @date: 2022-08-24 9:50
  13. */
  14. @Configuration
  15. public class RabbitMQConfig {
  16. /**
  17. * 模板配置
  18. * @param connectionFactory
  19. * @return
  20. */
  21. @Bean
  22. /** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */
  23. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  24. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  25. CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory) connectionFactory;
  26. /** 如果要进行消息回调,则这里必须要设置为true */
  27. cachingConnectionFactory.setPublisherConfirms(true);
  28. //rabbitmq心跳20s
  29. cachingConnectionFactory.setRequestedHeartBeat(20);
  30. // 设置自动恢复
  31. cachingConnectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
  32. // 设置 每10s ,重试一次
  33. cachingConnectionFactory.getRabbitConnectionFactory().setNetworkRecoveryInterval(10);
  34. // 设置不重新声明交换器,队列等信息。
  35. cachingConnectionFactory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(false);
  36. cachingConnectionFactory.setChannelCacheSize(50);
  37. RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
  38. return template;
  39. }
  40. }

2.  交换机 exchange 声明 和 队列声明,绑定

  1. import com.bossien.common.constants.RabbitMQConstant;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.CustomExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.beans.factory.annotation.Qualifier;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. /**
  12. * 描述:RabbitMQ 延时队列声明集合类
  13. *
  14. * @author: winy_work
  15. * @date: 2023-08-24 9:50
  16. */
  17. @Configuration
  18. public class RabbitMQDelayedDeclare {
  19. /**
  20. * 声明交换机
  21. *
  22. * @return
  23. */
  24. @Bean
  25. public CustomExchange delayedExchange() {
  26. Map<String, Object> args = new HashMap<>(2);
  27. // 交换机类型
  28. args.put("x-delayed-type", "direct");
  29. return new CustomExchange(RabbitMQConstant.DELAYED_EXCHANGE,
  30. "x-delayed-message",
  31. true,
  32. false,
  33. args);
  34. }
  35. /**
  36. * 声明队列
  37. *
  38. * @return
  39. */
  40. @Bean
  41. public Queue delayedQueue() {
  42. Queue queue = new Queue(RabbitMQConstant.DELAYED_QUEUE, true, false, false);
  43. return queue;
  44. }
  45. /**
  46. * 绑定交换机
  47. *
  48. * @param delayedQueue
  49. * @param delayedExchange
  50. * @return
  51. */
  52. @Bean
  53. public Binding bindingDelayedQueue(
  54. @Qualifier("delayedQueue") Queue delayedQueue,
  55. @Qualifier("delayedExchange") CustomExchange delayedExchange
  56. ) {
  57. return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(RabbitMQConstant.DELAYED_QUEUE).noargs();
  58. }
  59. }

3. 发送消息生产(工具)类

  1. import com.alibaba.fastjson.JSONObject;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.core.MessageBuilder;
  6. import org.springframework.amqp.core.MessageProperties;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. import java.util.UUID;
  11. /**
  12. * 消息发送服务接口
  13. *
  14. * @author winy_work
  15. */
  16. @Component
  17. public class RabbitMQProducer {
  18. private final static Logger logger = LoggerFactory.getLogger(RabbitMQProducer.class);
  19. @Autowired
  20. private RabbitTemplate rabbitTemplate;
  21. /**
  22. * 发送延时消息到队列(定时需求的才用)
  23. *
  24. * @param message 消息内容
  25. * @param delayedTime 延时时间 单位毫秒
  26. */
  27. public void sendDelayMsg(Object message, Integer delayedTime) {
  28. String correlationDataId = UUID.randomUUID().toString();
  29. String msgStr = JSONObject.toJSONString(message);
  30. logger.info("messageId is " + correlationDataId + ",消息内容=" + msgStr);
  31. if (msgStr == null) {
  32. logger.error("messageId is " + correlationDataId + ",send message failed: message is null");
  33. return;
  34. }
  35. Message messageObj = MessageBuilder.withBody(msgStr.getBytes())
  36. .setContentType(MessageProperties.CONTENT_TYPE_JSON)
  37. .setContentEncoding("utf-8")
  38. .setMessageId(UUID.randomUUID() + "")
  39. .build();
  40. // 消息发送
  41. rabbitTemplate.convertAndSend(RabbitMQConstant.DELAYED_EXCHANGE,
  42. RabbitMQConstant.DELAYED_QUEUE,
  43. messageObj,
  44. (msg) -> {
  45. msg.getMessageProperties().setDelay(delayedTime);
  46. return msg;
  47. });
  48. }
  49. }

4. 注入bean, 交给spring管理, 支持bean注入调用

common resource下新建文件夹 META-INF,  创建文件  spring.factories, 添加内容如下:

  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  2. com.bossien.common.core.rabbitmq.RabbitMQConfig,\
  3. com.bossien.common.core.rabbitmq.RabbitMQProducer,\
  4. com.bossien.common.core.rabbitmq.RabbitMQDelayedDeclare

5. 上面队列常量类涉及常量  RabbitMQConstant

  1. /**
  2. * 延时队列专用交换机
  3. */
  4. public static final String DELAYED_EXCHANGE = "delayed.exchange";
  5. /**
  6. * 延时队列
  7. */
  8. public static final String DELAYED_QUEUE = "delayed.queue";

三、消息生产者测试类

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.web.bind.annotation.GetMapping;
  3. import org.springframework.web.bind.annotation.RequestMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. /**
  6. * 描述:mq测试控制类
  7. *
  8. * @author: winy_work
  9. * @date: 2022-08-22 14:12
  10. */
  11. @RestController
  12. @RequestMapping("/rabbitmq")
  13. public class TestRabbitmqController {
  14. @Autowired
  15. private RabbitMQProducer producer;
  16. /**
  17. * 测试延时队列场景
  18. * @return
  19. */
  20. @GetMapping("/testRabbitmqDelayed")
  21. public Response testRabbitmqDelayed(){
  22. // 如果正常情况,那么消费者会根据时间从小到大依次消费执行
  23. producer.sendDelayMsg("测试延时队列-5s",5000);
  24. producer.sendDelayMsg("测试延时队列-30s",30000);
  25. producer.sendDelayMsg("测试延时队列-20s",20000);
  26. producer.sendDelayMsg("测试延时队列-15s",15000);
  27. producer.sendDelayMsg("测试延时队列-2s",2000);
  28. return new Response().success("发送成功!");
  29. }
  30. }

四、消息消费者测试类

  1. import com.alibaba.fastjson.JSONObject;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * 测试mq监听类
  9. *
  10. * @author winy_work
  11. * @desc
  12. */
  13. @Component
  14. public class TestListenter {
  15. private final static Logger logger = LoggerFactory.getLogger(TestListenter.class);
  16. /**
  17. * 消费测试延时队列
  18. * @param message
  19. */
  20. @RabbitListener(queues = RabbitMQConstant.DELAYED_QUEUE)
  21. public void handleDelayedMessage(Object obj) {
  22. logger.info("mq接收到信息:message={}", obj);
  23. }
  24. }

五、注意:上面四步配置完成后工程是启动不起来的,因为声明延时队列的 类型 

x-delayed-messageMQ中还没有。需要安装该插件。

插件下载地址:这里需要根据自己安装的mq版本号来下载对应的插件(.ez 文件)。如下图是3.10.2版本的插件。

Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

六、插件安装

下面的地址根据自己的安装路径替换。试了不重启也可以。可以不重启。

  1. # RabbitMQ 的安装目录
  2. cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
  3. # 将下载的插件放到 RabbitMQ 安装目录的 plugins 目录下
  4. cp /usr/local/rabbitmq/rabbitmq_delayed_message_exchange-3.10.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
  5. # 安装插件
  6. rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  7. # 重启 RabbitMQ
  8. systemctl restart rabbitmq-server

安装成功后登录MQ控制台,点击exchange  tab页面,查看是否多了如下图选择项,如有,则安装成功。

 七、启动程序,调用rest接口测试   localhost:8080/rabbitmq/testRabbitmqDelayed

可以看到控制台打印日志,是按照时间从小到大依次执行的。2s  5s  15s  20s 30s

至此,延时队列就可以成功使用了。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/414733
推荐阅读
相关标签
  

闽ICP备14008679号