当前位置:   article > 正文

redisson实现延迟队列_redisson延迟队列

redisson延迟队列

1.pom引入redisson

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.20.1</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  1. 整合springboot配置,这个可以参考之前整合redisson的文章,或者上面一步直接引用
    redisson整合好的springboot的包 如下(本人是引入的redisson自己整合的springboot,实际一样的
    只要redisson可以使用,就成功)
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.27.0</version>
</dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3.配置redis的队列

import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @email 1965654634@qq.com
 */
@Configuration
public class RedisQueueConfig {

    @Bean
    public RBlockingQueue<String> blockingQueue(RedissonClient redissonClient) {
        //队列名称可以自己定义
        return redissonClient.getBlockingQueue("delayedQueue");
    }

    @Bean
    public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockingQueue,
                                              RedissonClient redissonClient) {
        return redissonClient.getDelayedQueue(blockingQueue);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

4.创建redis的队列类


import cn.hutool.core.date.LocalDateTimeUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


/**
 * @email 1965654634@qq.com
 */
@Slf4j
@Service
public class DelayedQueueService{

    @Autowired
    private RDelayedQueue<String> delayedQueue;
    @Autowired
    private  RBlockingQueue<String> blockingQueue;

    @PostConstruct
    public void init() {
        // 创建延迟队列
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.submit(() -> {
            while (true) {
                try {
                    // 从延迟队列中取出任务
                    String task = blockingQueue.take();
                    // 处理延迟任务,例如执行某个操作
                    log.info("redis的延迟队列:{},当前时间:{}", task, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));
                } catch (Exception e) {
                    log.error("redis的延迟队列抛出异常", e);
                }
            }
        });
    }

    public String addDelayedTask(String task, long delay) {
        log.info("redis的延迟队列(添加)的key:{},time:{},当前时间:{}",task,delay, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));
        // 将任务加入延迟队列
        delayedQueue.offerAsync(task, delay, TimeUnit.SECONDS);
        // 返回任务的唯一标识
        return task;
    }

    public void cancelDelayedTask(String task) {
        log.info("redis的延迟队列(移除)的key:{},当前时间:{}",task, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));
        // 从延迟队列中移除任务
        delayedQueue.remove(task);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

5.测试controller

    @PostMapping("/queueAdd")
    public ResponseResult<Boolean> queueAdd(String task,Long time){
        log.info("测试添加的key:{},time:{},当前时间:{}",task,time, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));
        delayedQueueService.addDelayedTask(task,time);
        return ResponseResult.ok();
    }

    @PostMapping("/queueDel")
    public ResponseResult<Boolean> dianTiAlarm(String task,Long time){
        log.info("测试删除的key:{},time:{},当前时间:{}",task,time, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));
        delayedQueueService.cancelDelayedTask(task);
        return ResponseResult.ok();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/823222
推荐阅读
  

闽ICP备14008679号