赞
踩
针对于Redis实现延时队列有两种实现方式:
借助redis zset来实现延时队列,具体的实现代码很简单,就是从zset中取出score小于当前时间戳的数据
import cn.hutool.json.JSONUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @Component public class RedisDelayDemo { @Autowired private RedisTemplate redisTemplate; private static final Long DELETE_SUCCESS = 1L; //线程安全set private Set<String> topic = new CopyOnWriteArraySet<>(); //放入数据 public void push(String key, Object val, long delayTime) { topic.add(key); //hutool的json转换工具会有些问题,性能相对来说比较原生的jackson也会差一些 我这边是为了图方便 String strVal = val instanceof String ? (String) val : JSONUtil.toJsonStr(val); redisTemplate.opsForZSet().add(key, strVal, System.currentTimeMillis() + delayTime); } //获取数据 public String get(String key) { Set<String> sets = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis(), 0, 3); if (CollectionUtils.isEmpty(sets)) { return null; } for (String val : sets) { if (DELETE_SUCCESS.equals(redisTemplate.opsForZSet().remove(key, val))) { // 删除成功,表示抢占到 return val; } } return null; } }
为什么会这么设计?
为啥将数据返回包在删除中?
redis
的单线程机制,只可能会有一个实例会删除成功,所以拿到并删除成功的那个实例就是幸运儿为啥一次取3个数据
Demo
final String key = "myzset";
for (int i = 0; i < 10; i++) {
redisDelayDemo.push(key, "xiaozhang"+ i, 9000);
}
while (true) {
String value = redisDelayDemo.get(key);
if (StringUtils.isBlank(value)) {
continue;
}
System.out.println(value);
}
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; public class RedisExpiredListenter extends KeyExpirationEventMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(RedisExpiredListenter.class); public RedisExpiredListenter(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Override public void onMessage(Message message, byte[] pattern) { String channel = new String(message.getChannel(), StandardCharsets.UTF_8); //过期的key String key = new String(message.getBody(),StandardCharsets.UTF_8); LOGGER.info("redis key 过期:pattern={},channel={},key={}",new String(pattern),channel,key); /** *你可以将你的内容一起拼接到key中或者可以在redis中存储两个key (例如mykey和mykey_backup) *针对mykey设置过期时间,对于mykey_backup不设置过期时间,这样就可以通过mykey_backup获取到value了 **/ } } }
使用Redis
过期监听实现延时队列方法较为便捷,但是该方法也存在一个很大的问题
因为当过期的key数量高于一个阈值的时候, Redis
不能确保 key 在指定时间被删除 , 也就造成了消息可能比你设置的延时时间更长
所以如果你的系统针对于延时队列这个时间要求十分严格,并且在同一时间内会有多个消息需要发生那我就不推荐使用Redis
的延时队列,如果你的系统对于该业务并没有如此严格的要求,并且数量不多的情况下是可以使用的。
使用Redis
实现延时队列还有一个比较大的问题,他并不像消息队列一样保证送达。当订阅事件的客户端会丢失所有在断线期间所有分发给它的事件。 这个问题也是开发者需要考虑的,根据自己的业务场景去判断。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。