赞
踩
延时消息,即生产者发送一条消息,消费者在等待指定的时间之后再进行消费。常见应用场景如商品下单后,当用户没有在固定时间内支付即会销毁订单。RabbitMQ 实现延时消息的原理是通过 TTL
+ dead message
(消息存活时间 + 死信)组合来实现的。RabbitMQ 3.8 开始,提供了专门的延时队列插件。SpringBoot 1.6 开始支持 RabbitMQ 的延时插件。本文将基于 Springboot 2.X 集成 RabbitMQ 实现延时消息队列功能。
Spring boot 集成 RabbitMQ: SpringBoot 快速整合 RabbitMQ 消息队列框架
插件下载地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
选择与 RabbitMQ 版本对应的插件
RabbitMQ 的插件目录:
操作系统 | 插件目录 |
---|---|
Linux | /usr/lib/rabbitmq/plugins and /usr/lib/rabbitmq/lib/rabbitmq_server-{version}/plugins |
Windows | 默认地址C:\Program Files\RabbitMQ\rabbitmq_server-{version}\plugins ,如果为自定义安装,则根据安装目录而定 |
macOS(Homebrew) | /usr/local/Cellar/rabbitmq/{version}/plugins |
作者使用的为 Linux 操作系统(centOS),其目录为:
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.0-beta.4/plugins
将下载好的插件复制到插件目录即可
执行启用插件命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
./demo-base-web/src/main/java/com/ljq/demo/springboot/baseweb/config/RabbitMQConfig.java
package com.ljq.demo.springboot.baseweb.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @Description: tabbitmq 配置信息 * @Author: junqiang.lu * @Date: 2019/1/21 */ @Configuration public class RabbitMQConfig { /** * 消息队列名称、消息队列路由键、消费者消费队列路由键 */ public static final String QUEUE_NAME_DELAY_CART = "rabbitmq_delay_cart"; /** * 交换机名称 */ public static final String DIRECT_EXCHANGE_NAME_DELAY_CART = "rabbitmq_direct_exchange_delay_cart"; /** * 交换机代理的路由键 */ public static final String DIRECT_EXCHANGE_ROUT_KEY_DELAY_CART = "rabbitmq.spring.boot.cart"; /** * 生产者发送路由键 */ public static final String QUEUE_SENDER_ROUTING_KEY_DELAY_CART = "rabbitmq.spring.boot.cart"; /** * 延时时长(单位:毫秒) */ public static final int QUEUE_DELAY_TIME_CART = 30000; /** * 定义延时队列(cart) * * @return */ @Bean("queueDelayCart") public Queue queueDelayCart() { return new Queue(QUEUE_NAME_DELAY_CART, true, false, true); } /** * 定义延时交换机(cart) * @return */ @Bean("delayExchangeCart") public CustomExchange delayExchangeCart() { Map<String, Object> args = new HashMap<>(16); args.put("x-delayed-type", "direct"); CustomExchange exchange = new CustomExchange(DIRECT_EXCHANGE_NAME_DELAY_CART,"x-delayed-message", false, true, args); return exchange; } /** * 绑定延时交换机与队列(cart) * * @param queue * @param customExchange * @return */ @Bean public Binding bindingDirectExchangeDelayCart(@Qualifier("queueDelayCart") Queue queue, @Qualifier("delayExchangeCart") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(DIRECT_EXCHANGE_ROUT_KEY_DELAY_CART).noargs(); } }
./demo-base-web/src/main/java/com/ljq/demo/springboot/baseweb/rabbitmq/RabbitMQSender.java
package com.ljq.demo.springboot.baseweb.rabbitmq; import com.ljq.demo.springboot.baseweb.config.RabbitMQConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @Description: rabbitMQ 消息发送者 * @Author: junqiang.lu * @Date: 2019/1/21 */ @Service public class RabbitMQSender { private static final Logger logger = LoggerFactory.getLogger(RabbitMQSender.class); @Autowired private AmqpTemplate rabbitTemplate; /** * 使用直连交换机发送延时消息 * 交换机类型: {@link org.springframework.amqp.core.DirectExchange} * 交换机名称: {@link RabbitMQConfig#DIRECT_EXCHANGE_ROUT_KEY_DELAY_CART} * @param message */ public void sendDirectDelayCart(String message) { logger.info("exchangeName = {}, queue sender outing key = {}, message = {}", RabbitMQConfig.DIRECT_EXCHANGE_NAME_DELAY_CART, RabbitMQConfig.QUEUE_SENDER_ROUTING_KEY_DELAY_CART, message); rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE_NAME_DELAY_CART, RabbitMQConfig.QUEUE_SENDER_ROUTING_KEY_DELAY_CART, message, message1 -> { message1.getMessageProperties().setDelay(RabbitMQConfig.QUEUE_DELAY_TIME_CART); return message1; }); } }
./demo-base-web/src/main/java/com/ljq/demo/springboot/baseweb/rabbitmq/RabbitMQReceiver.java
package com.ljq.demo.springboot.baseweb.rabbitmq; import com.ljq.demo.springboot.baseweb.config.RabbitMQConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @Description: RabbitMQ 消息队列消费者 * @Author: junqiang.lu * @Date: 2019/1/21 */ @Service public class RabbitMQReceiver { private static final Logger logger = LoggerFactory.getLogger(RabbitMQReceiver.class); /** * 消息接收 * * @param message */ @RabbitHandler @RabbitListener(queues = {RabbitMQConfig.QUEUE_NAME_DELAY_CART}) public void receiveDelayCart(String message) { logger.info("Received queueName = {}, message = {}",RabbitMQConfig.QUEUE_NAME_DELAY_CART, message); } }
./demo-web/src/main/java/com/ljq/demo/springboot/web/controller/RabbitMQController.java
package com.ljq.demo.springboot.web.controller; import com.ljq.demo.springboot.baseweb.api.ApiResult; import com.ljq.demo.springboot.baseweb.rabbitmq.RabbitMQSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: RabbitMQ 消息队列测试 controller * @Author: junqiang.lu * @Date: 2019/1/21 */ @RestController @RequestMapping(value = "api/demo/rabbitmq") public class RabbitMQController { @Autowired private RabbitMQSender rabbitMQSender; /** * 发送测试-延时队列 * * @return */ @GetMapping(value = "/send/delay") public ApiResult sendDelay(){ rabbitMQSender.sendDirectDelayCart("send direct delay cart"); return ApiResult.success(); } }
启动项目,访问测试接口:
http://127.0.0.1:8088/api/demo/rabbitmq/send/delay
服务器日志:
2021-10-13 16:31:37 | INFO | http-nio-8088-exec-1 | c.ljq.demo.springboot.web.acpect.SimpleInterceptor 29| preHandle 2021-10-13 16:31:37 | INFO | http-nio-8088-exec-1 | com.ljq.demo.springboot.web.acpect.LogAspect 66| [AOP-LOG-START] requestMark: 70e83f06-9ec4-427e-9a4d-d8ebacfab13b requestIP: 127.0.0.1 contentType:null requestUrl: http://127.0.0.1:8088/api/demo/rabbitmq/send/delay requestMethod: GET requestParams: targetClassAndMethod: com.ljq.demo.springboot.web.controller.RabbitMQController#sendDelay 2021-10-13 16:31:37 | INFO | http-nio-8088-exec-1 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQSender 108| exchangeName = rabbitmq_direct_exchange_delay_cart, queue sender outing key = rabbitmq.spring.boot.cart, message = send direct delay cart 2021-10-13 16:31:37 | INFO | http-nio-8088-exec-1 | com.ljq.demo.springboot.web.acpect.LogAspect 72| [AOP-LOG-END] requestMark: 70e83f06-9ec4-427e-9a4d-d8ebacfab13b requestUrl: http://127.0.0.1:8088/api/demo/rabbitmq/send/delay response: ApiResult(code=200, msg=成功, data=null, extraData=null, timestamp=1634113897994) 2021-10-13 16:31:38 | INFO | http-nio-8088-exec-1 | com.ljq.demo.springboot.baseweb.log.LogService 44| [LOG-RESPONSE] requestIp: 127.0.0.1 requestUrl: http://127.0.0.1:8088/api/demo/rabbitmq/send/delay response: ApiResult(code=200, msg=成功, data=null, extraData=null, timestamp=1634113897994) 2021-10-13 16:31:38 | INFO | http-nio-8088-exec-1 | c.ljq.demo.springboot.web.acpect.SimpleInterceptor 38| postHandle 2021-10-13 16:31:38 | INFO | http-nio-8088-exec-1 | c.ljq.demo.springboot.web.acpect.SimpleInterceptor 44| afterCompletion 2021-10-13 16:32:08 | INFO | SimpleAsyncTaskExecutor-1 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQReceiver 50| Received queueName = rabbitmq_delay_cart, message = send direct delay cart
从日志可以看出在消息发送 30
秒后被消费。
至此,基于 SpringBoot 集成 RabbitMQ 实现延时消息队列的功能已经完成。
Scheduling Messages with RabbitMQ(官方文档)
RabbitMQ-Community Plugins(官方文档)
RabbitMQ-Installing Additional Plugins(官方文档)
Spring AMQP 1.6.0 Milestone 1 (and 1.5.4) Available
How to public message with delay time rabbitmq implement in spring boot
Gtihub 源码地址 : https://github.com/Flying9001/springBootDemo
个人公众号:404Code,分享半个互联网人的技术与思考,感兴趣的可以关注.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。