当前位置:   article > 正文

延时任务三种实现方式——延时队列、Redis、Rabbitmq_zset延时队列 和 rabbit

zset延时队列 和 rabbit

前言

延迟任务在工作的业务中有很多的应用场景,比如下订单后监听是否支付成功,若多少分钟后未支付则自动取消订单;比如注册用户后,再三分钟后提醒用户完成某项操作等,应用的场景很多,同时实现的方式也很多,在这里我引入一个我看过的博客,使用Redis实现延时任务(lua语言),写的应该还是可以的。
在我这篇博客里,介绍三种实现延时任务的方式,它们的优劣点可以看一下这个图吧,总结的很好
在这里插入图片描述
这三种分别是JDK的延时队列、Redis、Rabbitmq,这里解释一下为什么没有介绍另外两种,因为基于数据库的方式在现在的项目中,尤其是大数据的项目中,很难用到。第二种我对其中的技术不太了解,不好误人子弟。
好了,正式开始吧

一、延时队列

延时队列DelayQueue是JDK1.5提出来的,当前在这之前我不太了解这个类,这个也是我的薄弱点,看一下官方的解释吧
在这里插入图片描述
这块的解释吧,先用网上比较普遍的总结,那就是DelayQueue即具备队列的功能,又具备延时的功能。重点点说,可以将元素放到DelayQueue中,并且只有元素到期才可以取出来,如何判断元素到期呢?其实这个地方就要注意了,继承实现Delayed的时候需要去重写一个方法,那就是getDelay,当这个getDelay返回是0或者比0小的时候,则认为任务过期了,可以拿出来。
从DelayQueue取出任务的方式呢,就是take(),这个方法也就有解释的,就是当任务过期才可以取出来了,如果没有任务过期,则阻塞当前线程。还有一些其他的比如不可以存储null等等,这些就不解释了,最重要的就是take以及getDelay
接下来看一下我写的例子吧

1.1 业务实际执行类

/**
 * @program: delayTasks
 * @description: 实际业务执行类
 * @author: hs
 * @create: 2021-01-10 20:14
 **/
public class DelayTask implements Runnable{
   

    private String taskName;

    public DelayTask(String name){
   
        taskName = name;
    }

    @Override
    public void run() {
   
        System.out.println("********START************");
        System.out.println("当前线程名:" + Thread.currentThread().getName());
        System.out.println("当前任务名:" + taskName);
        System.out.println("实际业务执行位置");
        System.out.println("********END************");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

1.2 任务封装类

/**
 * @program: delayTasks
 * @description: 延时队列中消息体,对任务进行了封装
 * @author: hs
 * @create: 2021-01-10 20:17
 **/
public class DelayMessage<T extends Runnable> implements Delayed {
   

    private final Long time;

    private final T task;

    public DelayMessage(Long time, T task) {
   
        this.time = System.nanoTime()+time;
        this.task = task;
    }

    public Long getTime() {
   
        return time;
    }

    public T getTask() {
   
        return task;
    }

    /**
     * 根据给定的时间单位,给出当前任务的剩余延迟时间
     * @param unit TimeUnit
     * @return 返回剩余延迟时间
     */
    @Override
    public long getDelay(TimeUnit unit) {
   
        return unit.convert(this.time-System.nanoTime(),TimeUnit.NANOSECONDS);
    }

    /**
     * 比较方法,未到期返回1,消息到期返回0或-1
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
   
        DelayMessage other = (DelayMessage) o;
        Long diff = this.time-other.time;
        if(diff>0){
   
            return 1;
        }else if(diff<0){
   
            return -1;
        }
        return 0;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

1.3 自定义延时队列管理类

/**
 * @program: delayTasks
 * @description: 自定义延迟队列管理类
 * @author: hs
 * @create: 2021-01-13 20:09
 **/
public class DelayQueueCostomizedManager {
   
    /**
     * 核心线程,长期保持的线程数——自定义线程池所用参数
     */
    private final static Integer COREPOOLSIZE = 10;
    /**
     * 最大线程数——自定义线程池所用参数
     */
    private final static Integer MAXIMUMPOOLSIZE = 100;
    /**
     * 闲置线程存活时间——自定义线程池所用参数
     */
    private final static Long KEEPALIVETIME = 50L;
    /**
     * 线程池执行器
     */
    private ExecutorService executorService;
    /**
     * 守护线程,由此线程去维护执行延时队列操作
     */
    private Thread daemonThread;
    /**
     * 延时队列存放位置
     */
    private DelayQueue<DelayMessage<?>> delayQueue;
    /**
     * 单例模式——饿汉模式
     */
    private static DelayQueueCostomizedManager instance = new DelayQueueCostomizedManager();


    private DelayQueueCostomizedManager(){
   
        // 创建线程池执行器
        executorService = new ThreadPoolExecutor(COREPOOLSIZE,MAXIMUMPOOLSIZE,KEEPALIVETIME, TimeUnit.MILLISECONDS,new LinkedBlockingDeque<Runnable>(),new ThreadPoolExecutor.AbortPolicy());
        // 初始化延迟队列
        delayQueue = new DelayQueue<DelayMessage<?>>();
        init();
    }

    public void init(){
   
        // 自启后,监听延迟队列
        daemonThread = new Thread(() -> {
   
            execute();
        });
        daemonThread.setName("守护线程,以此线程去执行");
        // 因为是测试,所以将设置守护线程注释掉,否则main线程结束,守护线程也随之关闭,这样就看不到延迟队列的效果了
        // daemonThread.setDaemon(true);
        daemonThread.start();
    }
    // 单例,饿汉模式
    public static DelayQueueCostomizedManager getInstance(){
   
        return instance;
    }

    /**
     * 监听延迟队列
     */
    private void execute(){
   
        while (true){
   
            // 获得当前进程中线程痕迹
            Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
            System.out.println("当前存活线程数量:" + map.size());
            System.out.println("当前延时任务数量:" + delayQueue.size());
            try {
   
                // 获得延迟队列中的消息,必须消息到期才可以被获取,否则等待
                DelayMessag
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/443191
推荐阅读
相关标签
  

闽ICP备14008679号