当前位置:   article > 正文

redis实现延时队列

redis实现延时队列
redis的Zset特点

redis的zset它结合了set和list的特点
 1、集合内元素不会重复
 2、元素以有序的方式排列
zset中的元素都会关联一个分数score,内部将通过这个score对集合元素进行的排序。
虽然zset集合中元素不会重复,但score可以重复。

如果有两个score相同的元素,将按照元素的字典序进行排序

score保证了队列中的消息有序性
延迟队列的实现:
将数据存到redis的zset中并指定score(double),zset会对score进行排序,让最早消费的数据位于最前,拿最前的数据跟当前时间比较,时间到了则消费

延迟消息队列使用场景
  1. 打车场景,在规定时间内,没有车主接单,那么平台就会推送消息给你,提示暂时没有车主接单。
  2. 支付场景,下单了,如果没有在规定时间付款,平台通常会发消息提示订单在有效期内没有支付完成,自动取消订单。
  3. 闹钟场景,时间到了则执行播报声音。
redis作为消息队列的优缺点
  • 优点
    • 使用相对简单
    • 不用专门维护专业的消息中间件,降低服务和运维成本
  • 缺点
    • 没有ack,消息确认机制,存在消息丢失的可能
    • 没有重试机制,建议写代码补偿
    • 对消息的可靠性有很大的要求,建议还是不要使用redis作为延时消息队列

如果是简单的日志推送,消息推送等,可以使用redis队列。

代码实现

生产者

  1. @Slf4j
  2. @Component
  3. public class MessageProvider {
  4. private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();
  5. @Lazy
  6. @Resource
  7. MsgMapper msgMapper;
  8. @Lazy
  9. @Resource
  10. RedisUtil redisUtil;//这里就不放出来了,大家都有的
  11. private static String USER_CHANNEL = "随便_CHANNEL_";
  12. public static final String KEY_PREFIX ="你的前缀_msg:";
  13. /**
  14. * @Description: 发送消息添加到延迟队列
  15. * @param delay 延迟时间(排序的score)
  16. * @Author: fan
  17. * @Date: 2024/7/2 15:55
  18. */
  19. public void sendMessage(Long id,Long taskId,String messageContent, Long delay,String channel) {
  20. //消息体格式,可根据自己需要调整,这里就不放出来了
  21. Message message = new Message();
  22. String msgId = USER_CHANNEL + id;
  23. long time = System.currentTimeMillis();
  24. LocalDateTime dateTime = Instant.ofEpochMilli(time).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
  25. message.setCreateTime(dateTime);
  26. message.setDelayTime(delay);
  27. message.setBody(messageContent);
  28. message.setMsgId(msgId);
  29. message.setStatus(ImmobilizationEnum.not_fixed_broadcast.getCode());
  30. message.setChannel(USER_CHANNEL + channel);
  31. Boolean b = false;
  32. try {
  33. //推送到队列
  34. b = pushQUeue(message);
  35. } catch (Exception e) {
  36. log.error("[sendMessage_Exception异常] e={}", e);
  37. } finally {
  38. String value = redisUtil.getKey(YOUR_KEY_PREFIX + msgId);
  39. if (StringUtil.isEmpty(value )) {
  40. //如果没有则插入数据库,代码补偿
  41. Message msg = msgMapper.selectByMsgId(msgId);
  42. if (msg == null) {
  43. msgMapper.insert(message);
  44. redisUtil.setKey(YOUR_KEY_PREFIX + msgId, UUID.toString, 20);
  45. }
  46. }
  47. }
  48. }
  49. public Boolean pushQUeue(Message queueManager){
  50. Boolean b = false;
  51. try {
  52. String messageStr = mapper.writeValueAsString(queueManager);
  53. b = redisUtil.addZset(YOUR_QUEUE_NAME_KEY, messageStr, queueManager.getDelayTime());
  54. redisUtil.expire(YOUR_QUEUE_NAME_KEY, 20, TimeUnit.SECONDS);
  55. } catch (Exception e) {
  56. log.error("[push_Exception异常] e={} ",e);
  57. }
  58. return b;
  59. }
  60. public List<Message> pullZset(){
  61. long currentTimeMillis = System.currentTimeMillis();
  62. List<Message> messageList = new ArrayList<>();
  63. try{
  64. Set<String> strings = redisUtil.rangeByScore(YOUR_QUEUE_NAME_KEY, 0, Long.MAX_VALUE);
  65. if (CollectionUtils.isEmpty(strings)) {
  66. return null;
  67. }
  68. messageList = strings.stream().map(msg -> {
  69. Message message = null;
  70. try {
  71. message = mapper.readValue(msg,Message.class);
  72. } catch (Exception e) {
  73. log.error("[pull_Exception异常] e:{}",e);
  74. }
  75. return message;
  76. }).collect(Collectors.toList());
  77. } catch (Exception e) {
  78. log.error("[pull_Exception异常] Exception={} ", e);
  79. } finally {
  80. if (CollectionUtils.isEmpty(messageList)) {
  81. //如果缓存没有则从数据库取并下发到队列
  82. messageList = MsgMapper.selectByStatus("你的查询条件,这里按状态,具体SQL就不贴了");
  83. }
  84. }
  85. return messageList;
  86. }
  87. }

消费者

  1. //消费者方法
  2. public void delayingQueueConsumer(){
  3. List<Message> msgList = pullZset();
  4. if (!CollectionUtils.isEmpty(msgList)) {
  5. long current = getCurrentTime();
  6. for (int i = 0; i < msgList.size(); i++) {
  7. Message msg = msgList.get(i);
  8. //到点执行
  9. if (current >= msg.getDelayTime()) {
  10. //你的业务
  11. }
  12. }
  13. }
  14. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/823754
推荐阅读
相关标签
  

闽ICP备14008679号