当前位置:   article > 正文

延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue_redisson rdelayqueue

redisson rdelayqueue

一、接着上文

RDelayedQueue作为redisson封装的一个分布式延迟队列,直接拿来使用还是比较简单的。

本文主要包括以下几部分:

  • 保存至延迟队列(生产者)
  • 读取延迟队列(消费者)
  • 从延迟队列移除任务

在这里插入图片描述

二、redission配置


import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Redisson配置类
 *
 * @author xxx
 */
@Configuration
public class RedissonConfig {
    @Value("${spring.application.name}")
    private String serverName;

    @Bean
    public RedissonClient redissonClient(RedisProperties redisProperties) {
        Config config = new Config();
        SingleServerConfig singleServerConfig = config.useSingleServer();
        singleServerConfig.setAddress("redis://" + redisProperties.getHost() + ":" + redisProperties.getPort());
        singleServerConfig.setPassword(redisProperties.getPassword());
        singleServerConfig.setKeepAlive(true);
        singleServerConfig.setDatabase(redisProperties.getDatabase());
        singleServerConfig.setConnectionMinimumIdleSize(2);
        singleServerConfig.setConnectionPoolSize(4);
        singleServerConfig.setClientName(serverName);
        return Redisson.create(config);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
spring:
  application:
    name: delay-task-service
  redis:
    host: 192.168.8.18
    port: 6379
    database: 0
    timeout: 3000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

三、保存至延迟队列(生产者)

作为延迟任务的生产者,你需要根据预期的回调时间,计算出delay延迟时间。

伪代码见下:

public static final  String REDISSON_QUEUE_NAME = "DelayTaskQueue";

private final RedissonClient redissonClient;

RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(REDISSON_QUEUE_NAME);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);

long delay = DateUtil.between(event.getNotifyDate(), new DateTime(), DateUnit.SECOND);

delayedQueue.offer(event.getTransNo(), delay < 0 ? 1 : delay, TimeUnit.SECONDS);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

四、读取延迟队列(消费者)

    public static final  String REDISSON_QUEUE_NAME = "DelayTaskQueue";

    private final RedissonClient redissonClient;
    
    @PostConstruct
    public void init() {
        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>())
                .execute(() -> {
                    while (true) {
                        try {
                            RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDISSON_QUEUE_NAME);
                            RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);

                            String transNo = blockingDeque.take();

                            if (null == transNo) {
                                return;
                            }

                            if (log.isInfoEnabled()) {
                                log.info("开始执行延迟队列中的任务,transNo={}", transNo);
                            }
                            // 异步执行你的操作
                            notifyTaskService.handleTask(transNo, null);
                        } catch (Exception e) {
                            log.error("延时队列的任务执行出现异常", e);
                        }
                    }
                });
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

五、从延迟队列移除任务

public static final  String REDISSON_QUEUE_NAME = "DelayTaskQueue";

private final RedissonClient redissonClient;

RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(REDISSON_QUEUE_NAME);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);

delayedQueue.remove(transNo);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

六、总结

本文主要是摘要一些源码,仅供参考。

附:相关系列文章链接

延时任务通知服务的设计及实现(一)-- 设计方案

延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue

延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

延时任务通知服务的设计及实现(四)-- webhook执行任务

延时任务通知服务的设计及实现(五)-- Netty时间轮HashedWheelTimer

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/823267
推荐阅读
  

闽ICP备14008679号