赞
踩
定时调度基本是每个项目都会遇到的业务场景,一般地,都会通过任务调度工具执行定时任务完成,定时任务有两点缺陷:
Redis实现延时队列有两种实现方式:
Redisson的RDelayedQueue是一个封装好的zset实现的延时队列,最终选择了这个方案。其实还有一些优秀的方案可供选择,例如rocketmq、pulsar等拥有定时投递功能的消息队列;我这边优先考虑在不引入新的中间键的情况下使用RDelayedQueue技术进行实现。
注意:在不方便获得专业消息队列时可以考虑使用redissondelayqueue等基于redis的延时队列方案,但要为redis崩溃等情况设计补偿保护机制。
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.20.0</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data-27</artifactId>
<version>3.20.0</version>
</dependency>
import com.geovis.common.redis.utils.RedisUtils;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @date 2023/8/30 15:05
*/
@Configuration
public class RedissonQueueConfig {
private final String queueName = "orderQueue";
@Bean
public RBlockingQueue<String> blockingQueue() {
return RedisUtils.getClient().getBlockingQueue(queueName);
}
@Bean
public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockQueue) {
return RedisUtils.getClient().getDelayedQueue(blockQueue);
}
}
其中RedisUtils.getClient()是为了获取RedissonClient 对象,这里我使用Redis工具类直接获取,我把工具类也简单展示出来吧。
import org.redisson.api.*;
/**
*Redis工具类
*/
public class RedisUtils {
private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
/**
* 获取客户端实例
*/
public static RedissonClient getClient() {
return CLIENT;
}
}
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* @date 2023/8/30 15:09
*/
@Slf4j
@Component
public class OrderTask {
@Resource
private RBlockingQueue<Object> blockingQueue;
@PostConstruct
public void take() {
new Thread(() -> {
while (true) {
try {
log.info(blockingQueue.take().toString()); //将到期的数据取出来,如果一直没有到期数据,就一直等待。
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RDelayedQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
/**
* 测试接口类
* @date 2023/8/30 16:56
*/
@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/forest")
@Slf4j
public class ForestController {
@Autowired
private RDelayedQueue delayedQueue;
@GetMapping(value = "/offerAsync")
public void offerAsync() {
//20秒后到期,在监听现成哪里可以打印出 1234567890
delayedQueue.offerAsync("1234567890", 20, TimeUnit.SECONDS);
}
}
到这里基本就完成了Demo编码,具体要根据业务修改对应的代码,本demo亲测没有问题。
用户传进来的延迟时间必须大于0,小于0抛出异常代码结束。将用户传进来的时间转换为毫秒,并加上系统当前时间,计算出来的就是过期时间。到了过期时间消费者就可以把该任务取出来消费了。
结合上图所示,首先创建了一个Redisson实现的阻塞队列RBlockingQueue的实例blockingQueue,然后又使用该阻塞队列blockingQueue创建了一个延时队列RDelayedQueue的实例delayedQueue。延时消息添加后并不是立即进入到阻塞队列blockingQueue中,而是到达了设定的延时时间之后才会从延时队列delayedQueue进入到阻塞队列blockingQueue;因此,延时消息的添加由延时队列delayedQueue完成,而延时队列的消费则由阻塞队列blockingQueue完成。注意,这里如果直接对延时队列delayedQueue进行监听,则延时消息刚加入时就会被消费,达不到延时的效果。
相比于Redisson官网文档延时队列中给出的代码示例,这里被包装队列使用阻塞队列RBlockingQueue的好处是blockingQueue.take()会一直阻塞直至队列内有可消费延时消息,避免无意义的循环占用CPU。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。