赞
踩
SpringBoot提供了一种方式对应用服务全部启动成功后来进行初始操作.CommandLineRunner、ApplicationRunner,通过实现这两个接口的run方法就能保证在项目运行后为我们初始化一些操作.
1.redis延迟队列的实现,首先我们选用它的Zset来达成我们的目标
延迟队列的通用接口:
public interface RedisDelayedQueueService {
void add(String queueName, long delayedSecond, String val);
List<String> get(String queueName);
}
为其设计的Zset实现类
@Component public class RedisDelayedQueueServiceImpl implements RedisDelayedQueueService { @Resource private StringRedisTemplate stringRedisTemplate; @Override public void add(String queueName, long delayedSecond, String val) { long expire = System.currentTimeMillis() + delayedSecond * 1000; stringRedisTemplate.opsForZSet().add(queueName, val, expire); } @Override public List<String> get(String queueName) { String now = System.currentTimeMillis() + ""; List<String> keyList = new ArrayList<>(); keyList.add(queueName); RedisScript<List<String>> redisScript = LuaScript.ZSET_GET_BY_SCORE_AND_DEL.getRedisScript(); return stringRedisTemplate.execute(redisScript, keyList,"0", now); } }
主要关注这个get方法,这个方法是实现延迟的关键
ZSET_GET_BY_SCORE_AND_DEL("延时队列获取元素", null,
"local zRes=redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2]) " +
"if #zRes == 0 then return {} end " +
"redis.call('zremrangebyscore',KEYS[1],ARGV[1],ARGV[2]) return zRes",
List.class)
利用lua脚本控制延时
2.延时线程类
@Slf4j public abstract class AbstractDelayedThread extends Thread { @Resource private RedisDelayedQueueService redisDelayedQueueService; @Resource private StringRedisTemplate stringRedisTemplate; @Override public void run() { while (true) { try { List<String> list = redisDelayedQueueService.get(queueKey()); for (String id : list) { try{ ThreadPool.instance().submit(() -> action(id)); }catch (Exception e){ String key = RedisKey.REDIS_DELAYED_FAULT_LOG.key(Long.valueOf(id)); long time = System.currentTimeMillis(); Map<String,Object> map=new HashedMap(); map.put("queueKey",queueKey()); map.put("id",id); map.put("creatTime",time); stringRedisTemplate.opsForValue().set(key, map.toString()); log.warn("redis延时任务执行失败"); } } Thread.sleep(1000); } catch (Exception e) { // nothing log.error("监听拼团计时结束队列异常,异常信息:", e); try { Thread.sleep(1000); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } } } } protected abstract void action(String id); protected abstract String queueKey(); }
run方法不停的从redis中取出key为设置好的常量再把他的value放到action中去执行
举一个他的子类作为例子
@Component @Slf4j public class OrderCancelThread extends AbstractDelayedThread { @Resource private McOrderService mcOrderService; @Override protected void action(String id) { //要重试的方法 mcOrderService.cancel(Long.parseLong(id)); } @Override protected String queueKey() { return RedisDelayedKey.ORDER_CANCEL.getKey();//常量key } }
3.让它能够自己在项目初始化后动起来
@Component @Slf4j public class DelayedQueueService implements CommandLineRunner { @Resource private DelayedQueueProcessorRegistry delayedQueueProcessorRegistry; @Resource private StringRedisTemplate stringRedisTemplate; @Override public void run(String... args) { // redis预加载lua脚本指令 LuaScript.loadScript(stringRedisTemplate); // 启动线程监听队列(定时轮询) delayedQueueProcessorRegistry.threadStart(); } }
4.实现后置处理器来筛选所有实现AbstractDelayedThread的实例bean
@Component public class DelayedQueueProcessorRegistry implements BeanPostProcessor { private static final List<AbstractDelayedThread> INIT_PROCESS_LIST = new ArrayList<>(); @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof AbstractDelayedThread) { INIT_PROCESS_LIST.add((AbstractDelayedThread) bean); } return bean; } /** * 所有队列线程启动 **/ public void threadStart(){ for (AbstractDelayedThread thread : INIT_PROCESS_LIST) { thread.start(); } } }
实现了CommandLineRunner 接口的run方法来让它在项目初始化时调用方法,DelayedQueueProcessorRegistry实现了后置处理器接口,只要继承了AbstractDelayedThread的所有类都会被添加到List中,最后在threadStart被启动.
通过这一系列操作,我们最终完成了redis实现延迟队列的操作.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。