赞
踩
前言:有收到一个需求需要对上报上来的告警通过配置的规则(收到消息后x秒后发送、发送x次、每次间隔x秒)发送对应的短信提醒用户,当时想到用mq的延时队列来实现,但是只有这一个地方需要使用到队列,引入mq有点杀鸡用牛刀了,后续了解到Redis也可以实现类型延时队列的功能,就使用Redis实现了,记录一下
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.3.6.RELEASE</version>
</dependency>
public interface RedisDelayTask { /** * 任务ID * @return */ String getId(); /** * 队列中的值 * @return */ String getValue(); /** * 延迟时间(单位:s) * @return */ long getDelayTime(); /** * 任务执行 */ void execute(); /** * 任务类型(具体业务类型) */ String getTopic(); }
import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; import com.platform.maintain.queue.RedisDelayTask; import io.swagger.annotations.ApiModelProperty; import lombok.Data; @Data public class AbstractRedisDelayTask implements RedisDelayTask { /** * 延迟任务的唯一标识,用于检索任务 */ @JsonSerialize(using = ToStringSerializer.class) protected String id; protected String value; @ApiModelProperty(value = "过期时间(秒)") private long delayTime; protected String topic; public AbstractRedisDelayTask(String id, String value, long delayTime) { this.id = id; this.value = value; this.delayTime = delayTime; } public AbstractRedisDelayTask(String id, String value, long delayTime,String topic) { this.id = id; this.value = value; this.delayTime = delayTime; this.topic=topic; } public AbstractRedisDelayTask(String value) { this.value=value; } @Override public void execute() { //业务 } }
import cn.hutool.core.util.ObjectUtil; import com.platform.maintain.queue.RedisDelayTask; import com.platform.maintain.queue.imple.SecureTask; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ZSetOperations; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.Set; import java.util.concurrent.TimeUnit; @Component public class RedisSecureQueueManager implements InitializingBean { @Autowired private RedisTemplate redisTemplate; /** * 添加延迟任务到队列 * * @param task */ public void addTask(RedisDelayTask task) { Long rank = redisTemplate.opsForZSet().rank(task.getId(), task.getValue()); if(ObjectUtil.isNull(rank)){ long delayedTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(task.getDelayTime(), TimeUnit.SECONDS); boolean r = redisTemplate.opsForZSet().add(task.getId(), task.getValue(), delayedTime); } } public void deleteTask(RedisDelayTask task){ redisTemplate.opsForZSet().remove("SecureTask", task.getValue()); } /** * 检查并执行任务 */ private void checkAndExecuteTask() { while (true) { // score就是任务要执行的时间点,如果<=当前时间,说明任务该执行了 Set<ZSetOperations.TypedTuple<String>> tuples = redisTemplate.opsForZSet().rangeByScoreWithScores("SecureTask", 0, System.currentTimeMillis()); if (!CollectionUtils.isEmpty(tuples)) { for (ZSetOperations.TypedTuple<String> tuple : tuples) { // 移除并执行任务 RedisDelayTask task = new SecureTask(tuple.getValue()); if (task != null) { // 从队列中删除 redisTemplate.opsForZSet().remove("SecureTask", tuple.getValue()); task.execute(); } } } } } @Override public void afterPropertiesSet() throws Exception { // 新起一个线程执行任务 new Thread(() -> { System.out.println("已开启延时任务!!"); checkAndExecuteTask(); }, "redis-SecureTask").start(); } }
import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.platform.common.api.RemoteUserYhService; import com.platform.common.api.cache.IUserYhCache; import com.platform.common.api.domain.SysUser; import com.platform.common.api.model.LoginUser; import com.platform.common.core.domain.R; import com.platform.common.core.utils.SpringUtils; import com.platform.maintain.queue.manager.RedisMessageTaskQueueManager; import com.platform.maintain.queue.vo.WaterLevelDto; import com.platform.maintain.util.SmsSendSimple; import java.util.Date; import java.util.List; public class SecureTask extends AbstractRedisDelayTask { private RedisMessageTaskQueueManager redisMessageTaskQueueManager = SpringUtils.getBean(RedisMessageTaskQueueManager.class); public SecureTask(String id, String value, long delayTime) { super(id, value, delayTime); } public SecureTask(String id, String value, long delayTime, String topic) { super(id, value, delayTime, topic); } public SecureTask(String value) { super(value); } @Override public void execute() { //实现具体的业务流程 WaterLevelDto waterLevelDto = JSONObject.parseObject(this.value, WaterLevelDto.class); //如果执行失败可以重新将任务放回队列 RedisDelayTask task = new NoticeTask("MessageTask",JSONObject.toJSONString(waterLevel), smsTemplate.getSendInterval(),"失败重试"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。