赞
踩
RabbitMQ是目前最为流行的消息队列之一,它的高可靠性、高可用性和高性能使得它成为众多应用场景下的首选。在实际应用中,我们经常需要实现延时队列来解决一些业务问题,比如订单超时未支付自动取消等。本文将介绍如何使用RabbitMQ实现延时队列
下面先来解释一下
延时队列(也可以称为延迟队列,其实都是一个意思):
延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
延时消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息
延时任务:设置在一定时间之后才执行的任务
死信:
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
1、消费者使用basic.reject或basic.nack声明消息消费失败,并且消息的requeue参数设置为false
2、消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
3、要投递的队列消息堆积满了,最早的消息可能成为死信
延时队列可以用于以下场景:
业务场景:
我们通常会在电商网站中(或者app比如:京东,淘宝)进行下单,购买商品,但是我们由于没哟及时支付,会出现订单超时未支付自动取消的情况
下面用一张简单的图片来设计一下业务场景:
那我们该如何去实现延时队列呢,下面用一张图片给大家解释一下
话不多说,上代码!!!
作者在这里只创建了一个交换机,这个交换机可以同时绑定两个队列(有两个队列,一个队列设置了它的ttl(消息过期时间),同时设置了消息过期后的路由交换机和路由的routeKey,如果不设置过期策略那么消息过期之后就会进入死信队列,另外一个队列是普通队列,监听的时候只用去监听普通队列,达到延迟队列的效果。跟上图效果一样,消息通过这个交换机到达设置了过期时间的的队列,这个延迟队列没有消费者进行消费,当消息过期之后,会通过这个交换机路由到正常的队列,然后进行消费)
导入依赖
- <dependencies>
- <dependency>
- <groupId>cn.hutool</groupId>
- <artifactId>hutool-all</artifactId>
- <version>5.8.3</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- </dependencies>
配置类
- package com.atguigu.gulimall.auth.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Exchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * @Author YanShuLing
- * @Package:com.atguigu.gulimall.auth.config
- * @Project: brook
- * @Description TODO
- * @name:RabbitMQConfig
- * @Date 2024/3/8:9:56
- */
- @Configuration
- public class RabbitMqConfig {
-
- //创建了一个简单的队列
- @Bean
- public Queue createOrderReleaseQueue(){
- return new Queue("gmall.order.release.queue");
- }
-
- //这个是一个延时队列
- @Bean
- public Queue createOrderDeadQueue(){
-
- Map<String,Object> map = new HashMap<>();
- //队列消息的过期时间为十秒
- map.put("x-message-ttl",10000);
- //交换机
- map.put("x-dead-letter-exchange","gmall-order-exchange");
- //路由key
- map.put("x-dead-letter-routing-key","gmall.order.release.queue");
-
- return new Queue("gmall.order.dead.queue",true,false,false,map);
- }
- //交换机
- @Bean
- public Exchange createOrderExchange(){
- return new DirectExchange("gmall-order-exchange");
- }
- //交换机和正常队列绑定
- @Bean
- public Binding createOrderReleaseBind(){
- return new Binding("gmall.order.release.queue",Binding.DestinationType.QUEUE,
- "gmall-order-exchange","gmall.order.release.queue",null
- );
- }
- //交换机和延迟队列绑定
- @Bean
- public Binding createOrderDeadBind(){
- return new Binding("gmall.order.dead.queue",Binding.DestinationType.QUEUE,
- "gmall-order-exchange","gmall.order.dead.queue",null
- );
- }
- }
生产者(作者写了一个发送验证码的代码):
- @PostMapping("/createOrder")
- public R createOrder(String mobile){
- //生成随机的四位数(验证码)
- String code = RandomUtil.randomNumbers(4);
- //redis给这个验证码设置过期时间为5分钟
- redisTemplate.opsForValue().set("send_sms_"+mobile,code,5, TimeUnit.MINUTES);
-
- String content = StrFormatter.format(Constants.SMS_TEMPLATE,code);
- //给这个消息生成一个唯一标识,为了解决消息重复消费问题
- String messageId = IdUtil.randomUUID();
- //生产者发送消息,第一个参数是路由交换机,第二个参数是路由键,作者设置了跟死信队列一样的
- 名称,无伤大雅
- rabbitRemplate.convertAndSend("gmall-order-exchange","gmall.order.dead.queue",
- JSON.toJSONstring(new SmsParamVo(mobile,content,messageId)));
-
- //发送验证码,日志打印
- log.info("发送延迟消息给ttl队列,当前时间:{},消息内容:{}",new Date().toString(),content);
- // smsService.sendSms(mobile,content);
- return R.ok("成功");
- }
消费者:用来监听消息
- //消费者监听队列为gmall.order.release.queue队列的消息
- @RabbitListener(queues = {"gmall.order.release.queue"})
- @Component
- @Slf4j
- public class SmsListener {
-
- private final SmsService smsService;
-
- private final RedisTemplate redisTemplate;
-
- public SmsListener(SmsService smsService, RedisTemplate redisTemplate) {
- this.smsService = smsService;
- this.redisTemplate = redisTemplate;
- }
-
- @RabbitHandler
- public void sendSms(String string, Channel channel, Message message){
-
- //消息标签
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- SmsParamVo smsParamVo = JSON.parseObject(string, SmsParamVo.class);
- if(redisTemplate.hasKey(smsParamVo.getMsgId())){
- //拿到消息的唯一标签,如果是已经消费过的消息,直接拒绝签收
- channel.basicReject(deliveryTag,false);
- return;
- }
- //打印日志
- log.info("发送延迟消息给ttl队列,当前时间:{},消息内容:{}",new Date().toString(),smsPar
- mVo);
- //调用发送短信
- // smsService.sendSms(smsParamVo.getMobile(),smsParamVo.getContext());
- redisTemplate.opsForValue().set(smsParamVo.getMsgId(),smsParamVo.getMsgId(),12, TimeUnit.HOURS);
- //确认签收,消息会从队列中删除
- channel.basicAck(deliveryTag,false);
-
-
- } catch (IOException e) {
- try {
- if(deliveryTag<=3){
- //如果是由于某种特殊原因,消息没有发送成功,然后重回队列,
- channel.basicNack(deliveryTag,false,true);
- }
- //当重试次数达到一定的数量,就放进死信队列
- channel.basicNack(deliveryTag,false,false);
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- throw new RuntimeException(e);
- }
-
-
- }
-
- }
测试发送之前,我们先来到rabbitMq可视化界面观察一下
下面我们来测试一下,作者使用的是Postman
看看后台日志打印,我们可以看到我们已经实现了延迟消息的效果
还有一种方式也可以实现延迟消息
那就是延迟消息插件,RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列
1、前往RabbitMQ官网下载往RabbitMQ添加延迟消息的插件
RabbitMQ官网下载插件的网址:Community Plugins | RabbitMQ
2、下载rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下载的插件就得是什么版本的,得对应上,以下截图为官方文档的对插件版本的要求说明
这里作者的版本是3.9.13所以,作者就下载3.9版本的
选择3.9版本
3、把这个插件传输到服务器上
4、拷贝下载好的插件到容器中
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.9.0.ez
可以看到我已经copy到容器内部了
5、安装延迟队列插件
进入RabbitMQ安装目录的目录下
- //进入容器内部
- docker exec -it rabbitmq /bin/bash
进入安装目录
cd /opt/rabbitmq/plugins
使用如下命令启用延迟插件
- rabbitmq-plugins enable rabbitmq_delayed_message_exchange
-
如下我们就安装好了,然后我们重启rabbitmq容器
使用 exit 命令退出容器
使用docker restart rabbitmq 重启容器
我们来rabbitmq的可视化界面查看
这样说明我们的延迟插件就安装好啦!
到此就结束啦!希望可以帮到你,可以帮作者点个关注和小心心嘛!你们的支持就是我最大的动力,以后也会努力更新的哦!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。