当前位置:   article > 正文

Java 使用Redis实现延时队列_redis paramvalidatorschecker消息延时队列 java

redis paramvalidatorschecker消息延时队列 java

A:需求说明:

  1. 如果系统中需要用到定时执行计划的,又不想用到中间件,如果轮询数据库的话,会导致大量资源消耗,这样我们就可以使用Redis来实现类似功(需要使用rabbitMQ的请看这里:https://blog.csdn.net/u010096717/article/details/82148681
  2. 业务类型,如订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论,还有排队到时提醒等

B:实现思路:

  1. 将整个Redis当做消息池,以kv形式存储消息,key为id,value为具体的消息body
  2. 使用ZSET做优先队列,按照score维持优先级(用当前时间+需要延时的时间作为score)
  3. 轮询ZSET,拿出score比当前时间戳大的数据(已过期的)
  4. 根据id拿到消息池的具体消息进行消费
  5. 消费成功,删除改队列和消息
  6. 消费失败,让该消息重新回到队列

C:代码实现

  1. Message消息封装类
    1. @Data
    2. public class Message {
    3. /**
    4. * 消息id
    5. */
    6. private String id;
    7. /**
    8. * 消息延迟/毫秒
    9. */
    10. private long delay;
    11. /**
    12. * 消息存活时间
    13. */
    14. private int ttl;
    15. /**
    16. * 消息体,对应业务内容
    17. */
    18. private String body;
    19. /**
    20. * 创建时间,如果只有优先级没有延迟,可以设置创建时间为0
    21. * 用来消除时间的影响
    22. */
    23. private long createTime;
    24. }

 

2.基于redis的消息队列

  1. @Component
  2. public class RedisMQ {
  3. /**
  4. * 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link Message}
  5. * 的消息体body作为值存储
  6. */
  7. public static final String MSG_POOL = "Message:Pool:";
  8. /**
  9. * zset队列 名称 queue
  10. */
  11. public static final String QUEUE_NAME = "Message:Queue:";
  12. private static final int SEMIH = 30*60;
  13. @Autowired
  14. private RedisService redisService;
  15. /**
  16. * 存入消息池
  17. * @param message
  18. * @return
  19. */
  20. public boolean addMsgPool(Message message) {
  21. if (null != message) {
  22. return redisService.setExp(MSG_POOL + message.getId(), message.getBody(), Long.valueOf(message.getTtl() + SEMIH));
  23. }
  24. return false;
  25. }
  26. /**
  27. * 从消息池中删除消息
  28. * @param id
  29. * @return
  30. */
  31. public void deMsgPool(String id) {
  32. redisService.remove(MSG_POOL + id);
  33. }
  34. /**
  35. * 向队列中添加消息
  36. * @param key
  37. * @param score 优先级
  38. * @param val
  39. * @return 返回消息id
  40. */
  41. public void enMessage(String key, long score, String val) {
  42. redisService.zsset(key,val,score);
  43. }
  44. /**
  45. * 从队列删除消息
  46. * @param id
  47. * @return
  48. */
  49. public boolean deMessage(String key, String id) {
  50. return redisService.zdel(key, id);
  51. }
  52. }

 

3Redis操作工具类,这个工具类比较多方法,就不贴在这里了(https://blog.csdn.net/u010096717/article/details/83783865)

4.编写消息发送(生产者)

  1. @Component
  2. public class MessageProvider {
  3. static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
  4. private static int delay = 30;//30秒,可自己动态传入
  5. @Resource
  6. private RedisMQ redisMQ;
  7. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  8. //改造成redis
  9. public void sendMessage(String messageContent) {
  10. try {
  11. if (messageContent != null){
  12. String seqId = UUID.randomUUID().toString();
  13. // 将有效信息放入消息队列和消息池中
  14. Message message = new Message();
  15. // 可以添加延迟配置
  16. message.setDelay(delay*1000);
  17. message.setCreateTime(System.currentTimeMillis());
  18. message.setBody(messageContent);
  19. message.setId(seqId);
  20. // 设置消息池ttl,防止长期占用
  21. message.setTtl(delay + 360);
  22. redisMQ.addMsgPool(message);
  23. //当前时间加上延时的时间,作为score
  24. Long delayTime = message.getCreateTime() + message.getDelay();
  25. String d = sdf.format(message.getCreateTime());
  26. System.out.println("当前时间:" + d+",消费的时间:" + sdf.format(delayTime));
  27. redisMQ.enMessage(RedisMQ.QUEUE_NAME,delayTime, message.getId());
  28. }else {
  29. logger.warn("消息内容为空!!!!!");
  30. }
  31. }catch (Exception e){
  32. e.printStackTrace();
  33. }
  34. }
  35. }

5.消息消费者

  1. @Component
  2. public class RedisMQConsumer {
  3. @Resource
  4. private RedisMQ redisMQ;
  5. @Autowired
  6. private RedisService redisService;
  7. @Autowired
  8. private MessageProvider provider;
  9. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  10. /**
  11. * 消息队列监听器<br>
  12. *
  13. */
  14. @Scheduled(cron = "*/1 * * * * *")
  15. public void monitor() {
  16. Set<String> set = redisService.rangeByScore(RedisMQ.QUEUE_NAME, 0, System.currentTimeMillis());
  17. if (null != set) {
  18. long current = System.currentTimeMillis();
  19. for (String id : set) {
  20. long score = redisService.getScore(RedisMQ.QUEUE_NAME, id).longValue();
  21. if (current >= score) {
  22. // 已超时的消息拿出来消费
  23. String str = "";
  24. try {
  25. str = redisService.get(RedisMQ.MSG_POOL + id);
  26. System.out.println("消费了:" + str+ ",消费的时间:" + sdf.format(System.currentTimeMillis()));
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. //如果出了异常,则重新放回队列
  30. System.out.println("消费异常,重新回到队列");
  31. provider.sendMessage(str);
  32. } finally {
  33. redisMQ.deMessage(RedisMQ.QUEUE_NAME, id);
  34. redisMQ.deMsgPool(id);
  35. }
  36. }
  37. }
  38. }
  39. }
  40. }

6.配置信息

  1. <!--1依赖引入-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-data-redis</artifactId>
  5. </dependency>
  6. 2yml配置
  7. spring:
  8. redis:
  9. database: 1
  10. host: 127.0.0.1
  11. port: 6379

以上代码已经实现了延迟消费功能,现在来测试一下,调用MessageProvider的sendMessage方法,我设定了30秒

可以看到结果

因为我们是用定时器去轮询的,会出现误差

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/451980
推荐阅读
相关标签
  

闽ICP备14008679号