赞
踩
最近项目中,有一个需求是需要用到延迟队列的,最开始使用的RocketMQ来实现,后面发现使用RocketMQ时,总是会丢消息,在网上百度下,发现还可以使用redis来做延迟消息,所以就把之前的RocketMQ改成Redis。
延迟消息任务:
- @Data
- public class DelayTask<T> {
- // 消息id
- private String id;
- // 任务名称
- private String taskName;
- // 具体任务内容
- private T msg;
-
- }
Redis延迟队列监听:
- @Component
- @Slf4j
- public class RedisDealyTaskLister implements CommandLineRunner {
-
- @Resource
- private RedisTemplate redisTemplate;
-
- @Resource
- private ISplitOrderService iSplitOrderService;
-
- @Resource
- private SccOrderProperties sccOrderProperties;
-
- private String delayQueueName = "delayQueue";
-
- @Override
- public void run(String... args) throws Exception {
- Executors.newSingleThreadExecutor().submit(new Runnable() {
- @Override
- public void run() {
- while (true) {
- // 获取一个到点的消息
- Set<String> set = redisTemplate.opsForZSet().rangeByScore(delayQueueName, 0, System.currentTimeMillis(), 0, 1);
- log.info("RedisDealyTaskLister set={},time={}",Func.toJson(set),LocalDateTime.now());
- // 如果没有,就等等
- if (set.isEmpty()) {
- try {
- Thread.sleep(sccOrderProperties.getDelayRedisTime());
- } catch (InterruptedException e) {
- log.info("RedisDealyTaskLister run e={}",e);
- }
- // 继续执行
- continue;
- }
- // 获取具体消息的key
- String it = set.iterator().next();
- // 删除成功
- if (redisTemplate.opsForZSet().remove(delayQueueName, it) > 0) {
- // 拿到任务
- DelayTask delayTask = JSONObject.parseObject(it, DelayTask.class);
- log.info("RedisDealyTaskLister delayTask={},time={}",Func.toJson(delayTask),LocalDateTime.now());
- OrderDetailInfo orderDetailInfo = JSONObject.parseObject(Func.toJson(delayTask.getMsg()), OrderDetailInfo.class);
- iSplitOrderService.invokeInsertNearestCargo(orderDetailInfo);
- log.info("消息到期msg={},time={}",delayTask.getMsg().toString(), LocalDateTime.now());
- }
- }
- }
- });
-
-
- }
- }
Redis延迟队列:
- @Slf4j
- public class RedisDelayQueue<T> {
- /**
- * 延迟队列名称
- */
- private String delayQueueName = "delayQueue";
-
- private RedisTemplate redisTemplate;
-
- // 传入redis客户端操作
- public RedisDelayQueue(RedisTemplate redisTemplate, String delayQueueName) {
- this.redisTemplate = redisTemplate;
- this.delayQueueName = delayQueueName;
- }
- /**
- * 设置延迟消息
- */
- public void setDelayTasks(T msg, long delayTime) {
- DelayTask<T> delayTask = new DelayTask<>();
- delayTask.setId(UUID.randomUUID().toString());
- delayTask.setMsg(msg);
- Boolean addResult = redisTemplate.opsForZSet().add(delayQueueName, JSONObject.toJSONString(delayTask), delayTime);
- if(addResult){
- log.info("添加任务成功!delayTask={},当前时间为time={}",JSONObject.toJSONString(delayTask),LocalDateTime.now());
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。