赞
踩
延迟消息队列,下面说一下一些业务场景
订单支付失败,每隔一段时间提醒用户
用户并发量的情况,可以延时2分钟给用户发短信
总结就是:间隔一段时间后的,定时、重试、超时任务
1、Rabbitmq 延时队列
通过 RabbitMQ 消息队列的 TTL和 DXL这两个属性间接实现的。
2、DelayQueue 延时队列
3、Quartz定时任务
4、时间轮
5、Redis 延迟队列
Redis 的特殊数据结构 ZSet 满足延迟的特性。数据可持久化。
主要通过zadd 添加带有score(延迟时间)的有序集合,消费使用zrangebysocre取出到当前时间所到期的消息,也可以通过 zrangebyscore key min max withscores limit 0 1
查询最早的一条任务,来进行消费。取出后使用zrem删除已经消费的消息。
上代码,使用redisTemplate
- public class RedisDelayQueue {
- @Autowired
- private RedisTemplate<String, Object> redisTemplate;
-
- /**
- * 注意:脚本zrangebyscore的ARGV[1](score)参数必须为数字,不能传字符串。
- */
- private final String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit' , 0, 1) " +
- "if #resultArray > 0 then\n" +
- " if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then\n" +
- " return resultArray[1]\n" +
- " else\n" +
- " return ''\n" +
- " end\n" +
- "else\n" +
- " return ''\n" +
- "end";
-
- /**
- * 可能返回为null。
- * @param key
- * @return
- */
- public Object consumeDelayMessage(String key) {
- try {
- // 指定 lua 脚本,并且指定返回值类型
- DefaultRedisScript<Object> redisScript =new DefaultRedisScript<>();
- // *!* [如果要返回值,必须设置返回映射对象],不然返回会全部是null。
- redisScript.setResultType(Object.class);
- redisScript.setScriptSource(new StaticScriptSource(luaScript));
-
- // 参数一:redisScript,参数二:key列表,参数三:arg(可多个)
- // 脚本zrangebyscore的ARGV[1](score)参数必须为数字,不能传字符串。
- Object result = redisTemplate.execute(redisScript,
- Collections.singletonList(key), System.currentTimeMillis());
- return result;
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- }
-
- public Boolean produceDelayMessage(String key, Object value, double score) {
- try {
- return redisTemplate.opsForZSet().add(key,value, score);
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-
- }
其中的问题(坑-_-|):
1. zrangebyscore 和 zrem 两个操作不是原子的。
T1, T2 和其他更多线程调用 zrangebyscore 获取到了一条消息 A,处理后,多个线程准备开始删除消息 A,但是由于redis文件事务处理器是单线程执行,多线程下都是去调用redis的命令并不会有什么问题,其中第一个成功了,后面线程的删除均会失败了(除非同样的这个消息又被马上加进去了,还没被消费)。
但是上面的算法中同一个任务可能会被多个进程取到之后再使用 zrem 进行争抢,那些没抢到 的进程都是白取了一次任务,这是浪费。
所以使用 lua scripting 来优化一下这个逻辑,将 zrangebyscore 和 zrem 一同挪到服务器端进行原子化操作,这样多个进程之间争抢任务时就不会出现这种浪费了,并且一个消息只能被一个线程取到。
2. 使用redisTemplate来执行lua脚本,返回值问题
之前没有设置这个,一直都返回null,弄得我以为脚本错了。如果要返回值,必须设置返回映射对象!redisTemplate.execute脚本的返回值和参数序列器,这里使用redisTemplate初始化自定义配置的,如果没自定义过,需要手动传入具体的,不然可能使用redis默认的是jdk的序列化。
zrangebyscore key min max的min和max需要是数字!
- public class ReCallScheduledTask {
- /**
- * timer的schedule和scheduleAtFixedRate方法一般情况下是没什么区别的,
- * 只在某个情况出现时会有区别--当前任务没有来得及完成下次任务又交到手上。
- * scheduleAtFixedRate保证执行的次数和间隔,
- * 所以two or more executions will occur in rapid succession to "catch up."
- */
- private Timer timer;
- RedisQueue redisQueue;
- RedisDelayQueue redisDelayQueue;
-
- public ReCallScheduledTask() {
- timer = new Timer("ReCallScheduledTask-1");
- redisQueue = (RedisQueue) SpringContextUtil.getBean("redisQueue");
- redisDelayQueue = (RedisDelayQueue) SpringContextUtil.getBean("redisDelayQueue");
- }
-
- public void start() {
- log.info("启动—失败重复呼叫,延迟队列消费线程...");
- TimerTask task = new TimerTask() {
- //不能抛出异常,否则会停止timer
- @Override
- public void run() {
- try {
- //获取
- Object recall = redisDelayQueue.consumeDelayMessage(RedisKeys.RECALL_QUEUE.key);
- log.debug("ReCallScheduledTask-1 gogogo获得重呼名单{}", recall);
- //组装,重打。
- if (recall instanceof CallNames) {
- CallNames names = (CallNames) recall;
-
- Long result = redisQueue.produce(RedisKeys.CALL_QUEUE.key, names);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- //指定任务task在指定延迟delay后进行固定延迟peroid的执行
- timer.schedule(task, 3000, 2000);
- }
-
- }
使用timer来定时消费。
Redis用来进行实现延时队列是具有这些优势的:
1.Redis zset支持高性能的 score 排序。
2.Redis是在内存上进行操作的,速度非常快。
3.Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。
4.Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性
使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题
没有重试机制 - 处理消息出现异常没有重试机制, 这些需要自己去实现, 包括重试次数的实现等
没有 ACK 机制 - 例如在获取消息并已经删除了消息情况下, 正在处理消息的时候客户端崩溃了, 这条正在处理的这些消息就会丢失, MQ 是需要明确的返回一个值给 MQ 才会认为这个消息是被正确的消费了
如果对消息可靠性要求较高, 推荐使用 MQ 来实现
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。