当前位置:   article > 正文

springboot redis延迟消息队列实现_spring redis zrem

spring redis zrem

延迟消息队列,下面说一下一些业务场景

实践场景

订单支付失败,每隔一段时间提醒用户

用户并发量的情况,可以延时2分钟给用户发短信

总结就是:间隔一段时间后的,定时、重试、超时任务

可选方案

1、Rabbitmq 延时队列
通过 RabbitMQ 消息队列的 TTL和 DXL这两个属性间接实现的。
2、DelayQueue 延时队列
3、Quartz定时任务
4、时间轮
5、Redis 延迟队列

Redis 的特殊数据结构 ZSet 满足延迟的特性。数据可持久化。

Redis延时队列的实现

主要通过zadd 添加带有score(延迟时间)的有序集合,消费使用zrangebysocre取出到当前时间所到期的消息,也可以通过 zrangebyscore key min max withscores limit 0 1 查询最早的一条任务,来进行消费。取出后使用zrem删除已经消费的消息。

上代码,使用redisTemplate

RedisDelayQueue 延迟队列工具

  1. public class RedisDelayQueue {
  2. @Autowired
  3. private RedisTemplate<String, Object> redisTemplate;
  4. /**
  5. * 注意:脚本zrangebyscore的ARGV[1](score)参数必须为数字,不能传字符串。
  6. */
  7. private final String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit' , 0, 1) " +
  8. "if #resultArray > 0 then\n" +
  9. " if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then\n" +
  10. " return resultArray[1]\n" +
  11. " else\n" +
  12. " return ''\n" +
  13. " end\n" +
  14. "else\n" +
  15. " return ''\n" +
  16. "end";
  17. /**
  18. * 可能返回为null。
  19. * @param key
  20. * @return
  21. */
  22. public Object consumeDelayMessage(String key) {
  23. try {
  24. // 指定 lua 脚本,并且指定返回值类型
  25. DefaultRedisScript<Object> redisScript =new DefaultRedisScript<>();
  26. // *!* [如果要返回值,必须设置返回映射对象],不然返回会全部是null
  27. redisScript.setResultType(Object.class);
  28. redisScript.setScriptSource(new StaticScriptSource(luaScript));
  29. // 参数一:redisScript,参数二:key列表,参数三:arg(可多个)
  30. // 脚本zrangebyscore的ARGV[1](score)参数必须为数字,不能传字符串。
  31. Object result = redisTemplate.execute(redisScript,
  32. Collections.singletonList(key), System.currentTimeMillis());
  33. return result;
  34. } catch (Exception e) {
  35. e.printStackTrace();
  36. return null;
  37. }
  38. }
  39. public Boolean produceDelayMessage(String key, Object value, double score) {
  40. try {
  41. return redisTemplate.opsForZSet().add(key,value, score);
  42. } catch (Exception e) {
  43. e.printStackTrace();
  44. return false;
  45. }
  46. }
  47. }

其中的问题(坑-_-|):

1. zrangebyscore 和 zrem 两个操作不是原子的。

T1, T2 和其他更多线程调用 zrangebyscore 获取到了一条消息 A,处理后,多个线程准备开始删除消息 A,但是由于redis文件事务处理器是单线程执行,多线程下都是去调用redis的命令并不会有什么问题,其中第一个成功了,后面线程的删除均会失败了(除非同样的这个消息又被马上加进去了,还没被消费)。

但是上面的算法中同一个任务可能会被多个进程取到之后再使用 zrem 进行争抢,那些没抢到 的进程都是白取了一次任务,这是浪费。

所以使用 lua scripting 来优化一下这个逻辑,将 zrangebyscore 和 zrem 一同挪到服务器端进行原子化操作,这样多个进程之间争抢任务时就不会出现这种浪费了,并且一个消息只能被一个线程取到。

2. 使用redisTemplate来执行lua脚本,返回值问题

 之前没有设置这个,一直都返回null,弄得我以为脚本错了。如果要返回值,必须设置返回映射对象!redisTemplate.execute脚本的返回值和参数序列器,这里使用redisTemplate初始化自定义配置的,如果没自定义过,需要手动传入具体的,不然可能使用redis默认的是jdk的序列化。

 zrangebyscore key min max的min和max需要是数字!

消费延时消息定时任务

  1. public class ReCallScheduledTask {
  2. /**
  3. * timer的schedule和scheduleAtFixedRate方法一般情况下是没什么区别的,
  4. * 只在某个情况出现时会有区别--当前任务没有来得及完成下次任务又交到手上。
  5. * scheduleAtFixedRate保证执行的次数和间隔,
  6. * 所以two or more executions will occur in rapid succession to "catch up."
  7. */
  8. private Timer timer;
  9. RedisQueue redisQueue;
  10. RedisDelayQueue redisDelayQueue;
  11. public ReCallScheduledTask() {
  12. timer = new Timer("ReCallScheduledTask-1");
  13. redisQueue = (RedisQueue) SpringContextUtil.getBean("redisQueue");
  14. redisDelayQueue = (RedisDelayQueue) SpringContextUtil.getBean("redisDelayQueue");
  15. }
  16. public void start() {
  17. log.info("启动—失败重复呼叫,延迟队列消费线程...");
  18. TimerTask task = new TimerTask() {
  19. //不能抛出异常,否则会停止timer
  20. @Override
  21. public void run() {
  22. try {
  23. //获取
  24. Object recall = redisDelayQueue.consumeDelayMessage(RedisKeys.RECALL_QUEUE.key);
  25. log.debug("ReCallScheduledTask-1 gogogo获得重呼名单{}", recall);
  26. //组装,重打。
  27. if (recall instanceof CallNames) {
  28. CallNames names = (CallNames) recall;
  29. Long result = redisQueue.produce(RedisKeys.CALL_QUEUE.key, names);
  30. }
  31. } catch (Exception e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. };
  36. //指定任务task在指定延迟delay后进行固定延迟peroid的执行
  37. timer.schedule(task, 3000, 2000);
  38. }
  39. }

使用timer来定时消费。

Redis延时队列优势

Redis用来进行实现延时队列是具有这些优势的:

1.Redis zset支持高性能的 score 排序。

2.Redis是在内存上进行操作的,速度非常快。

3.Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。

4.Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性



Redis延时队列劣势

使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题

没有重试机制 - 处理消息出现异常没有重试机制, 这些需要自己去实现, 包括重试次数的实现等

没有 ACK 机制 - 例如在获取消息并已经删除了消息情况下, 正在处理消息的时候客户端崩溃了, 这条正在处理的这些消息就会丢失, MQ 是需要明确的返回一个值给 MQ 才会认为这个消息是被正确的消费了

如果对消息可靠性要求较高, 推荐使用 MQ 来实现



另三方库:Redission实现延时队列

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/600943
推荐阅读
相关标签
  

闽ICP备14008679号