当前位置:   article > 正文

RabbitMQ之“延时队列”_rabbitmq延时队列

rabbitmq延时队列

延时队列

RabbitMQ是目前最为流行的消息队列之一,它的高可靠性、高可用性和高性能使得它成为众多应用场景下的首选。在实际应用中,我们经常需要实现延时队列来解决一些业务问题,比如订单超时未支付自动取消等。本文将介绍如何使用RabbitMQ实现延时队列

下面先来解释一下

延时队列(也可以称为延迟队列,其实都是一个意思):

延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

延时消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息

延时任务:设置在一定时间之后才执行的任务

死信

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

1、消费者使用basic.reject或basic.nack声明消息消费失败,并且消息的requeue参数设置为false

2、消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费

3、要投递的队列消息堆积满了,最早的消息可能成为死信

延时队列可以用于以下场景:

  1. 订单处理:在电商网站中,订单处理是一个常见的业务流程。如果订单需要立即处理,可以使用RabbitMQ的延时队列来实现延迟处理。例如,可以将订单发送到一个延时队列中,并设置一个延迟时间(例如30分钟),然后在延迟时间到达后,将订单从队列中取出并进行处理。
  2. 消息推送:在移动应用或Web应用程序中,可以使用RabbitMQ的延时队列来实现消息推送。例如,可以将用户订阅的消息发送到一个延时队列中,并设置一个延迟时间(例如1小时),然后在延迟时间到达后,将消息从队列中取出并推送给用户。
  3. 定时任务:在分布式系统中,可以使用RabbitMQ的延时队列来实现定时任务。例如,可以将需要定期执行的任务发送到一个延时队列中,并设置一个延迟时间(例如每天),然后在延迟时间到达后,将任务从队列中取出并执行。
  4. 数据备份:在数据库中,可以使用RabbitMQ的延时队列来实现数据备份。例如,可以将需要备份的数据发送到一个延时队列中,并设置一个延迟时间(例如每天),然后在延迟时间到达后,将数据从队列中取出并进行备份。
  5. 优惠券发放:您可以设置一个延时队列,将优惠券发放任务添加到队列中,设置一定的延时时间,以保证优惠券在特定时间后才能被消费。
  6. 动态路由:您可以使用延时队列来实现动态路由的功能,将消息发送到延时队列中,并设置一定的路由规则,以实现消息在特定时间后被路由到不同的目标队列中。

业务场景:

我们通常会在电商网站中(或者app比如:京东,淘宝)进行下单,购买商品,但是我们由于没哟及时支付,会出现订单超时未支付自动取消的情况

下面用一张简单的图片来设计一下业务场景:

那我们该如何去实现延时队列呢,下面用一张图片给大家解释一下

话不多说,上代码!!!

作者在这里只创建了一个交换机,这个交换机可以同时绑定两个队列(有两个队列,一个队列设置了它的ttl(消息过期时间),同时设置了消息过期后的路由交换机和路由的routeKey,如果不设置过期策略那么消息过期之后就会进入死信队列,另外一个队列是普通队列,监听的时候只用去监听普通队列,达到延迟队列的效果。跟上图效果一样,消息通过这个交换机到达设置了过期时间的的队列,这个延迟队列没有消费者进行消费,当消息过期之后,会通过这个交换机路由到正常的队列,然后进行消费)

导入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>cn.hutool</groupId>
  4. <artifactId>hutool-all</artifactId>
  5. <version>5.8.3</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-amqp</artifactId>
  10. </dependency>
  11. </dependencies>

配置类

  1. package com.atguigu.gulimall.auth.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.DirectExchange;
  4. import org.springframework.amqp.core.Exchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. /**
  11. * @Author YanShuLing
  12. * @Package:com.atguigu.gulimall.auth.config
  13. * @Project: brook
  14. * @Description TODO
  15. * @name:RabbitMQConfig
  16. * @Date 2024/3/8:9:56
  17. */
  18. @Configuration
  19. public class RabbitMqConfig {
  20. //创建了一个简单的队列
  21. @Bean
  22. public Queue createOrderReleaseQueue(){
  23. return new Queue("gmall.order.release.queue");
  24. }
  25. //这个是一个延时队列
  26. @Bean
  27. public Queue createOrderDeadQueue(){
  28. Map<String,Object> map = new HashMap<>();
  29. //队列消息的过期时间为十秒
  30. map.put("x-message-ttl",10000);
  31. //交换机
  32. map.put("x-dead-letter-exchange","gmall-order-exchange");
  33. //路由key
  34. map.put("x-dead-letter-routing-key","gmall.order.release.queue");
  35. return new Queue("gmall.order.dead.queue",true,false,false,map);
  36. }
  37. //交换机
  38. @Bean
  39. public Exchange createOrderExchange(){
  40. return new DirectExchange("gmall-order-exchange");
  41. }
  42. //交换机和正常队列绑定
  43. @Bean
  44. public Binding createOrderReleaseBind(){
  45. return new Binding("gmall.order.release.queue",Binding.DestinationType.QUEUE,
  46. "gmall-order-exchange","gmall.order.release.queue",null
  47. );
  48. }
  49. //交换机和延迟队列绑定
  50. @Bean
  51. public Binding createOrderDeadBind(){
  52. return new Binding("gmall.order.dead.queue",Binding.DestinationType.QUEUE,
  53. "gmall-order-exchange","gmall.order.dead.queue",null
  54. );
  55. }
  56. }

生产者(作者写了一个发送验证码的代码):

  1. @PostMapping("/createOrder")
  2. public R createOrder(String mobile){
  3. //生成随机的四位数(验证码)
  4. String code = RandomUtil.randomNumbers(4);
  5. //redis给这个验证码设置过期时间为5分钟
  6. redisTemplate.opsForValue().set("send_sms_"+mobile,code,5, TimeUnit.MINUTES);
  7. String content = StrFormatter.format(Constants.SMS_TEMPLATE,code);
  8. //给这个消息生成一个唯一标识,为了解决消息重复消费问题
  9. String messageId = IdUtil.randomUUID();
  10. //生产者发送消息,第一个参数是路由交换机,第二个参数是路由键,作者设置了跟死信队列一样的
  11. 名称,无伤大雅
  12. rabbitRemplate.convertAndSend("gmall-order-exchange","gmall.order.dead.queue",
  13. JSON.toJSONstring(new SmsParamVo(mobile,content,messageId)));
  14. //发送验证码,日志打印
  15. log.info("发送延迟消息给ttl队列,当前时间:{},消息内容:{}",new Date().toString(),content);
  16. // smsService.sendSms(mobile,content);
  17. return R.ok("成功");
  18. }

消费者:用来监听消息

  1. //消费者监听队列为gmall.order.release.queue队列的消息
  2. @RabbitListener(queues = {"gmall.order.release.queue"})
  3. @Component
  4. @Slf4j
  5. public class SmsListener {
  6. private final SmsService smsService;
  7. private final RedisTemplate redisTemplate;
  8. public SmsListener(SmsService smsService, RedisTemplate redisTemplate) {
  9. this.smsService = smsService;
  10. this.redisTemplate = redisTemplate;
  11. }
  12. @RabbitHandler
  13. public void sendSms(String string, Channel channel, Message message){
  14. //消息标签
  15. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  16. try {
  17. SmsParamVo smsParamVo = JSON.parseObject(string, SmsParamVo.class);
  18. if(redisTemplate.hasKey(smsParamVo.getMsgId())){
  19. //拿到消息的唯一标签,如果是已经消费过的消息,直接拒绝签收
  20. channel.basicReject(deliveryTag,false);
  21. return;
  22. }
  23. //打印日志
  24. log.info("发送延迟消息给ttl队列,当前时间:{},消息内容:{}",new Date().toString(),smsPar
  25. mVo);
  26. //调用发送短信
  27. // smsService.sendSms(smsParamVo.getMobile(),smsParamVo.getContext());
  28. redisTemplate.opsForValue().set(smsParamVo.getMsgId(),smsParamVo.getMsgId(),12, TimeUnit.HOURS);
  29. //确认签收,消息会从队列中删除
  30. channel.basicAck(deliveryTag,false);
  31. } catch (IOException e) {
  32. try {
  33. if(deliveryTag<=3){
  34. //如果是由于某种特殊原因,消息没有发送成功,然后重回队列,
  35. channel.basicNack(deliveryTag,false,true);
  36. }
  37. //当重试次数达到一定的数量,就放进死信队列
  38. channel.basicNack(deliveryTag,false,false);
  39. } catch (IOException ex) {
  40. throw new RuntimeException(ex);
  41. }
  42. throw new RuntimeException(e);
  43. }
  44. }
  45. }

测试发送之前,我们先来到rabbitMq可视化界面观察一下

下面我们来测试一下,作者使用的是Postman

看看后台日志打印,我们可以看到我们已经实现了延迟消息的效果

还有一种方式也可以实现延迟消息

那就是延迟消息插件,RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列

1、前往RabbitMQ官网下载往RabbitMQ添加延迟消息的插件

RabbitMQ官网下载插件的网址:Community Plugins | RabbitMQ

2、下载rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下载的插件就得是什么版本的,得对应上,以下截图为官方文档的对插件版本的要求说明

这里作者的版本是3.9.13所以,作者就下载3.9版本的

选择3.9版本

                

 3、把这个插件传输到服务器上

4、拷贝下载好的插件到容器中

docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.9.0.ez 

可以看到我已经copy到容器内部了

 5、安装延迟队列插件

进入RabbitMQ安装目录的目录下

  1. //进入容器内部
  2. docker exec -it rabbitmq /bin/bash

进入安装目录

cd /opt/rabbitmq/plugins

使用如下命令启用延迟插件

  1. rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如下我们就安装好了,然后我们重启rabbitmq容器

使用 exit 命令退出容器

使用docker restart rabbitmq 重启容器

我们来rabbitmq的可视化界面查看

这样说明我们的延迟插件就安装好啦!

到此就结束啦!希望可以帮到你,可以帮作者点个关注和小心心嘛!你们的支持就是我最大的动力,以后也会努力更新的哦!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/590343
推荐阅读
相关标签
  

闽ICP备14008679号