当前位置:   article > 正文

redis延迟队列实现_redis实现延时消息队列 java redisdelayqueuehandle

redis实现延时消息队列 java redisdelayqueuehandle

最近项目中,有一个需求是需要用到延迟队列的,最开始使用的RocketMQ来实现,后面发现使用RocketMQ时,总是会丢消息,在网上百度下,发现还可以使用redis来做延迟消息,所以就把之前的RocketMQ改成Redis。

延迟消息任务:

  1. @Data
  2. public class DelayTask<T> {
  3. // 消息id
  4. private String id;
  5. // 任务名称
  6. private String taskName;
  7. // 具体任务内容
  8. private T msg;
  9. }

Redis延迟队列监听:

  1. @Component
  2. @Slf4j
  3. public class RedisDealyTaskLister implements CommandLineRunner {
  4. @Resource
  5. private RedisTemplate redisTemplate;
  6. @Resource
  7. private ISplitOrderService iSplitOrderService;
  8. @Resource
  9. private SccOrderProperties sccOrderProperties;
  10. private String delayQueueName = "delayQueue";
  11. @Override
  12. public void run(String... args) throws Exception {
  13. Executors.newSingleThreadExecutor().submit(new Runnable() {
  14. @Override
  15. public void run() {
  16. while (true) {
  17. // 获取一个到点的消息
  18. Set<String> set = redisTemplate.opsForZSet().rangeByScore(delayQueueName, 0, System.currentTimeMillis(), 0, 1);
  19. log.info("RedisDealyTaskLister set={},time={}",Func.toJson(set),LocalDateTime.now());
  20. // 如果没有,就等等
  21. if (set.isEmpty()) {
  22. try {
  23. Thread.sleep(sccOrderProperties.getDelayRedisTime());
  24. } catch (InterruptedException e) {
  25. log.info("RedisDealyTaskLister run e={}",e);
  26. }
  27. // 继续执行
  28. continue;
  29. }
  30. // 获取具体消息的key
  31. String it = set.iterator().next();
  32. // 删除成功
  33. if (redisTemplate.opsForZSet().remove(delayQueueName, it) > 0) {
  34. // 拿到任务
  35. DelayTask delayTask = JSONObject.parseObject(it, DelayTask.class);
  36. log.info("RedisDealyTaskLister delayTask={},time={}",Func.toJson(delayTask),LocalDateTime.now());
  37. OrderDetailInfo orderDetailInfo = JSONObject.parseObject(Func.toJson(delayTask.getMsg()), OrderDetailInfo.class);
  38. iSplitOrderService.invokeInsertNearestCargo(orderDetailInfo);
  39. log.info("消息到期msg={},time={}",delayTask.getMsg().toString(), LocalDateTime.now());
  40. }
  41. }
  42. }
  43. });
  44. }
  45. }

Redis延迟队列:

  1. @Slf4j
  2. public class RedisDelayQueue<T> {
  3. /**
  4. * 延迟队列名称
  5. */
  6. private String delayQueueName = "delayQueue";
  7. private RedisTemplate redisTemplate;
  8. // 传入redis客户端操作
  9. public RedisDelayQueue(RedisTemplate redisTemplate, String delayQueueName) {
  10. this.redisTemplate = redisTemplate;
  11. this.delayQueueName = delayQueueName;
  12. }
  13. /**
  14. * 设置延迟消息
  15. */
  16. public void setDelayTasks(T msg, long delayTime) {
  17. DelayTask<T> delayTask = new DelayTask<>();
  18. delayTask.setId(UUID.randomUUID().toString());
  19. delayTask.setMsg(msg);
  20. Boolean addResult = redisTemplate.opsForZSet().add(delayQueueName, JSONObject.toJSONString(delayTask), delayTime);
  21. if(addResult){
  22. log.info("添加任务成功!delayTask={},当前时间为time={}",JSONObject.toJSONString(delayTask),LocalDateTime.now());
  23. }
  24. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/484302
推荐阅读
相关标签
  

闽ICP备14008679号