当前位置:   article > 正文

SpringBoot项目运行初始化数据以及利用redis来实现延迟队列_springboot 启动时初始化redis数据

springboot 启动时初始化redis数据

SpringBoot提供了一种方式对应用服务全部启动成功后来进行初始操作.CommandLineRunner、ApplicationRunner,通过实现这两个接口的run方法就能保证在项目运行后为我们初始化一些操作.
1.redis延迟队列的实现,首先我们选用它的Zset来达成我们的目标
延迟队列的通用接口:

public interface RedisDelayedQueueService {
  
    void add(String queueName, long delayedSecond, String val);
    
    List<String> get(String queueName);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

为其设计的Zset实现类

@Component
public class RedisDelayedQueueServiceImpl implements RedisDelayedQueueService {
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void add(String queueName, long delayedSecond, String val) {
        long expire = System.currentTimeMillis() + delayedSecond * 1000;
        stringRedisTemplate.opsForZSet().add(queueName, val, expire);
    }
    
    @Override
    public List<String> get(String queueName) {
        String now = System.currentTimeMillis() + "";
        List<String> keyList = new ArrayList<>();
        keyList.add(queueName);
        RedisScript<List<String>> redisScript = LuaScript.ZSET_GET_BY_SCORE_AND_DEL.getRedisScript();
        return stringRedisTemplate.execute(redisScript, keyList,"0", now);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

主要关注这个get方法,这个方法是实现延迟的关键

ZSET_GET_BY_SCORE_AND_DEL("延时队列获取元素", null,
            "local zRes=redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2]) " +
                    "if #zRes == 0 then return {} end " +
                    "redis.call('zremrangebyscore',KEYS[1],ARGV[1],ARGV[2]) return zRes",
            List.class)
  • 1
  • 2
  • 3
  • 4
  • 5

利用lua脚本控制延时
2.延时线程类

@Slf4j
public abstract class AbstractDelayedThread extends Thread {
    @Resource
    private RedisDelayedQueueService redisDelayedQueueService;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void run() {
        while (true) {
            try {
                List<String> list = redisDelayedQueueService.get(queueKey());
                for (String id : list) {
                    try{
                        ThreadPool.instance().submit(() -> action(id));
                    }catch (Exception e){
                        String key = RedisKey.REDIS_DELAYED_FAULT_LOG.key(Long.valueOf(id));
                        long time = System.currentTimeMillis();
                        Map<String,Object> map=new HashedMap();
                        map.put("queueKey",queueKey());
                        map.put("id",id);
                        map.put("creatTime",time);
                        stringRedisTemplate.opsForValue().set(key, map.toString());
                        log.warn("redis延时任务执行失败");
                    }
                }
                Thread.sleep(1000);
            } catch (Exception e) {
                // nothing
                log.error("监听拼团计时结束队列异常,异常信息:", e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
            }
        }
    }
    
    protected abstract void action(String id);
    
    protected abstract String queueKey();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

run方法不停的从redis中取出key为设置好的常量再把他的value放到action中去执行
举一个他的子类作为例子

@Component
@Slf4j
public class OrderCancelThread extends AbstractDelayedThread {
    @Resource
    private McOrderService mcOrderService;

    @Override
    protected void action(String id) {
    //要重试的方法
        mcOrderService.cancel(Long.parseLong(id));
    }

    @Override
    protected String queueKey() {
        return RedisDelayedKey.ORDER_CANCEL.getKey();//常量key
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3.让它能够自己在项目初始化后动起来

@Component
@Slf4j
public class DelayedQueueService implements CommandLineRunner {
    @Resource
    private DelayedQueueProcessorRegistry delayedQueueProcessorRegistry;
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void run(String... args) {
        // redis预加载lua脚本指令
        LuaScript.loadScript(stringRedisTemplate);
        // 启动线程监听队列(定时轮询)
        delayedQueueProcessorRegistry.threadStart();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

4.实现后置处理器来筛选所有实现AbstractDelayedThread的实例bean

@Component
public class DelayedQueueProcessorRegistry implements BeanPostProcessor {
    private static final List<AbstractDelayedThread> INIT_PROCESS_LIST = new ArrayList<>();

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof AbstractDelayedThread) {
            INIT_PROCESS_LIST.add((AbstractDelayedThread) bean);
        }
        return bean;
    }

    /**
     * 所有队列线程启动
     **/
    public void threadStart(){
        for (AbstractDelayedThread thread : INIT_PROCESS_LIST) {
            thread.start();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

实现了CommandLineRunner 接口的run方法来让它在项目初始化时调用方法,DelayedQueueProcessorRegistry实现了后置处理器接口,只要继承了AbstractDelayedThread的所有类都会被添加到List中,最后在threadStart被启动.
通过这一系列操作,我们最终完成了redis实现延迟队列的操作.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/600950
推荐阅读
相关标签
  

闽ICP备14008679号