当前位置:   article > 正文

Redis第18讲——Redis和Redission实现延迟消息_redis延迟消息

redis延迟消息

即使不是做电商业务的同学,也一定知道订单超时关闭这种业务场景,这个场景大致就是用户下单后,如果在一定时间内未支付(比如15分钟、半小时),那么系统就会把这笔订单给关闭掉。这个功能实现的方式有很多种,比如JDK中自带的DelayQueue延迟队列、Timer、ScheduledThreadPoolExecutor,强烈推荐的RocketMQ、RabblitMQ及Kafka等消息队列,还有就是Hutool的SystemTimer、Netty的HashedWheelTimer等等,感兴趣的可以去了解一下今天我们就先看看Redis和Redisson是如何实现延迟消息的。

一、Redis如何实现延迟消息

1.1 过期key监听

很多人都知道Redis有一个过期监听的功能,在redis.conf中加一条notify-keyspace-events开启过期监听,然后在代码中实现KeyExpirationMessageListener就可以监听key的过期消息了。

这样就可以在接收到过期消息的时候进行关单操作了,但是这个方案并不推荐,Redis官方明确说过Redis并不保证key在过期的时候就能被立即删除,更不保证这个消息能被立即发出,所以,消息延迟是必然的,数据量越大延迟的时间越长。

而且,在Redis5.0之前,这个消息是通过PUB/SUB模式发出的,不会做持久化,至于你有没有接收到,有没有消费成功,它不管,所以,如果发消息的时候客户端挂了,之后再恢复的话,这个消息也就彻底丢了。

1.2 Zset

我们可以借助Redis中的有序集合——zset来实现这个功能,zset是一个有序集合,每一个元素(member)都关联了一个score,可以通过score排序来取集合中的值。

我们可以将订单超时时间的时间戳(下单时间+超时时间)作为score订单号orderId作为成员(member),这样redis会对zset按照score进行排序,再通过定时任务获取“当前时间>=score"的延迟任务,获取到之后就可以根据订单号(member)进行关单操作。

这么实现的优点就是可以借助redis持久化和高可用机制,避免数据丢失。

1.3 zset实现超时关单Demo

步骤:

  • 用户下单后,将信息写入redis zset缓存中,score为当前时间+延迟时间的时间戳,member为订单号。

  • 使用ZRangeByScore和WithScores命令,获取当前时间戳之前的所有任务,并通过score判断哪些任务已到期,进行关单处理。

  • 启动一个额外的定时任务周期性检查并处理已到期的订单。

导入依赖:

  1. <dependency>
  2.    <groupId>org.springframework.boot</groupId>
  3.    <artifactId>spring-boot-starter-data-redis</artifactId>
  4. </dependency>

Demo:

  1. @Service
  2. public class OrderDelayService {
  3.    @Autowired
  4.    private RedisTemplate<String,String> redisTemplate;
  5.    //延迟15分钟
  6.    private final long delayTime = 15*60*1000;
  7.    //订单号key前缀
  8.    private static final String ORDER_KEY="order:";
  9.    //延迟关单key
  10.    private static final String DELAY_KEY="close_orders";
  11.    /**
  12.     * 创建订单
  13.     * @param orderId
  14.     */
  15.    public void createOrder(String orderId){
  16.        //...
  17.        //创建订单成功
  18.        //1.获取当前时间戳
  19.        long currentTime = System.currentTimeMillis();
  20.        //2.score:当前时间+延迟时间
  21.        long score=currentTime+delayTime;
  22.        //3.加入redis zset集合
  23.        redisTemplate.opsForZSet().add(
  24.                DELAY_KEY,  //redis key
  25.                ORDER_KEY+orderId, //member
  26.                score //score
  27.       );
  28.   }
  29.    /**
  30.     * 任务间隔一秒执行一次
  31.     */
  32.    @Scheduled(fixedDelay = 1000)
  33.    public void closeExpiredOrders(){
  34.        //当前时间戳
  35.        long currentTime = System.currentTimeMillis();
  36.        ZSetOperations<String, String> zSetOps = redisTemplate.opsForZSet();
  37.        //取出所有数据(已排好序)
  38.        Set<String> orderKeys = zSetOps.range(DELAY_KEY, 0, -1);
  39.        for (String orderKey : orderKeys) {
  40.            //获取score,
  41.            double score = zSetOps.score(DELAY_KEY, orderKey);
  42.            if(currentTime>=score){
  43.                String orderId = orderKey.substring(ORDER_KEY.length());
  44.                //进行关单操作...closeOrder(orderId)
  45.                //从zset里移除该订单
  46.                zSetOps.remove(DELAY_KEY,orderKey);
  47.           }
  48.       }
  49.   }
  50. }

二、Redission如何实现延迟消息

2.1 实现原理

Redission中定义了分布式延迟队列DelayedQueue,其实就是在zset的基础上增加了一个基于内存的延迟队列。

大致的流程如下:

当我们要添加一个数据到延迟队列的时候,redission会把数据+超时时间放到zset中,并且起一个延时任务,当任务到期的时候,再去zset中把数据取出来,返回给客户端使用。

2.2 案例

导入依赖:

  1. <dependency>
  2.     <groupId>org.redisson</groupId>
  3.     <artifactId>redisson-spring-boot-starter</artifactId>
  4.     <version>3.13.1</version>
  5. </dependency>

定义一个客户端:

  1. @Component
  2. public class RedissionConfig {
  3.    @Bean(destroyMethod = "shutdown")
  4.    public RedissonClient redssion(){
  5.        Config config=new Config();
  6.        config.useSingleServer().setAddress("redis://127.0.0.1");
  7.        return Redisson.create(config);
  8.   }
  9. }

实现:

  1. @Component
  2. public class RedissionDelay {
  3.    @Autowired
  4.    private RedissonClient client;
  5.    /**
  6.     * 创建订单,并设置过期时间
  7.     * @param orderId
  8.     */
  9.    public void createOrder(String orderId){
  10.        //...
  11.        //创建订单成功
  12.        RBlockingDeque<Object> blockingDeque = client.getBlockingDeque("orderQueue");
  13.        RDelayedQueue<Object> delayedQueue = client.getDelayedQueue(blockingDeque);
  14.        //将订单加入延迟队列,延迟时间为15分钟
  15.        delayedQueue.offer(orderId,15, TimeUnit.MINUTES);
  16.   }
  17.    /**
  18.     * 关单操作
  19.     */
  20.    public void closeOrder(){
  21.        RBlockingDeque<Object> orderQueue = client.getBlockingDeque("orderQueue");
  22.        new Thread(() -> {
  23.            while (true){
  24.                try {
  25.                    String orderId = (String) orderQueue.take();
  26.                    //进行关单操作,closeOrder(orderId)
  27.               } catch (InterruptedException e) {
  28.                  e.printStackTrace();
  29.               }
  30.           }
  31.       });
  32.   }
  33. }

上述例子,我们用RDelayedQueue的offer方法将订单添加到了延迟队列,并指定了延迟时间,当元素的延迟时间到达时,Redission会将元素从RDelayedQueue转移到与之关联的RBlockingDeque。

然后在检查是否要关单的时候,另起了一个线程,不断循环读取到期的订单。值得注意的是 take方法从RBlockingDeque中获取元素,这是一个阻塞操作,如果没有元素,它会一直等到,直到有元素。

End:希望对大家有所帮助,如果有纰漏或者更好的想法,请您一定不要吝啬你的赐教声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】

推荐阅读
相关标签