当前位置:   article > 正文

在springboot中利用Redis实现延迟队列_spring boot 实现延迟任务

spring boot 实现延迟任务


前言

在开发过程中,有很多场景都需要用到延迟队列来解决。目前支持延迟队列的中间件也不少,特别是基于JMS模式下的消息中间件基本上都支持延迟队列。但是有时我们项目规模可能比较小,用不上JMS这些中间件。那么利用Redis也可以实现延迟队列的功能。


一、基本思路

利用Redis来实现延迟队列的主要思路是借助Redis的Sorted Set数据类型来实现。

具体做法是将任务的执行时间作为分数(score),任务的内容作为值(value),将任务按照执行时间排序存储在有序集合中。然后周期性地检查有序集合中的任务,根据当前时间和任务的执行时间来决定是否执行任务。

当需要添加新的延迟任务时,只需将任务的执行时间和内容添加到有序集合中即可。当然,你可能需要一个后台进程或定时任务来不断地检查有序集合,以执行到期的任务。

二、springboot实现案例

根据上面的思路,我们可以直接来写代码,本案例的完整代码点击下载。

首先,确保你的Spring Boot项目中已经配置好了Redis依赖。你可以在pom.xml文件中添加如下依赖:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

然后,创建一个延迟队列管理器(DelayQueueManager)类,用于添加任务到延迟队列和处理到期任务:

package com.test.spring.redisdelayqueue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Set;
/**
 * @author code-long
 * @version 1.0.0
 * @ClassName DelayQueueManager.java
 * @description
 */
@Component
public class DelayQueueManager {
    private static final String key = "delayQueue";

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public void addToDelayQueue(String value, long delayMillis) {
        redisTemplate.opsForZSet().add(key, value, System.currentTimeMillis() + delayMillis);
    }

    public Set<String> getExpiredItems(long currentTime) {
        return redisTemplate.opsForZSet().rangeByScore(key, 0, currentTime);
    }

    public void removeItems(Set<String> items) {
        redisTemplate.opsForZSet().remove(key, items.toArray());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

接下来,我们利用spring的定时任务创建一个定时任务,用于定期检查延迟队列中的到期任务并执行。
同时我们还需要单独创建一个线程来专门处理逻辑,因为如果在定时任务直接处理逻辑可能会导致定时任务阻塞的现象,在这个线程中我们为了保证队列的顺序性,在使用BlockingDeque来模拟一个队列。当然如果你的队列逻辑处理不需要保持顺序性,完全可以使用多线程来处理任务。
具体实现代码:

package com.test.spring.redisdelayqueue;

import lombok.extern.slf4j.Slf4j;
import org.h2.util.DateTimeUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.DateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * @author code-long
 * @version 1.0.0
 * @ClassName RedisDelayQueueApplication.java
 * @description
 */
@SpringBootApplication(scanBasePackages = "com.test.spring.redisdelayqueue")
@EnableScheduling
@Slf4j
@RestController
public class RedisDelayQueueApplication {

    @Autowired
    private DelayQueueManager delayQueueManager;
    @Scheduled(fixedRate = 1000) // 每秒执行一次
    public void processDelayQueue() {
        long currentTime = System.currentTimeMillis();
        Set<String> expiredItems = delayQueueManager.getExpiredItems(currentTime);
        // 处理到期任务,这里就可以达到延迟队列的模型
        //如果在这里直接处理逻辑,会影响到定时任务执行不完全的现象,比如一个任务执行需要2秒,那么就会阻塞JOB的执行,所以我们要另外启动一个线程来专门处理逻辑
        for (String item : expiredItems) {
            //将过期数据加入到执行队列
            DelayQueueInstance.getInstance().receive(item);
        }
        // 从延迟队列中移除已处理的任务:这里的删除可以放到线程中逻辑执行完成再删除
        if(!expiredItems.isEmpty()){
            delayQueueManager.removeItems(expiredItems);
        }
    }
    //应用启动成功后,就启动线程
    @EventListener
    void listener(ApplicationReadyEvent event) {
        DelayQueueInstance.getInstance().start();
    }
    //模拟入队操作
    @RequestMapping("/push")
    @Transactional
    public String test1(){
        //模拟一个30秒的延迟队列
        delayQueueManager.addToDelayQueue("{这里可以使json数据:30}",30000);
        delayQueueManager.addToDelayQueue("{这里可以使json数据:10}",10000);
        System.out.println("["+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) +"]添加数据到队列");
        return "success";
    }
    public static void main(String[] args) {
        SpringApplication.run(RedisDelayQueueApplication.class, args);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

三、测试

我们在浏览器中访问或者使用curl调用接口:

curl http://localhost:8881/push
  • 1

后台打印结果为:

[2024-03-14 22:28:54]添加数据到队列
[2024-03-14 22:29:05]收到数据---{这里可以使json数据:10}
[2024-03-14 22:29:25]收到数据---{这里可以使json数据:30}
  • 1
  • 2
  • 3

我们可以看到,基本上能实现延迟队列的功能,只是这里有一点小小的瑕疵,任务可能会存在1秒的误差,但是这依赖于我们定时任务的循环时间,如果时间越短,误差的时间也就越短,定时任务间隔时间越长,误差也就越大。但1秒误差在实际的业务过程中已经是可以接受的了,对服务器来说性能也可以接受。


总结

使用Redis实现延迟队列的好处包括简单、高效,并且Redis本身就具有持久化和高可用性的特性,使得延迟队列的实现更加可靠。如果项目没有必要上JMS中间件,那么使用Redis是一个不错的方案。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/572401
推荐阅读
相关标签
  

闽ICP备14008679号