当前位置:   article > 正文

java使用Redis实现延时队列_springmvc+redis实现消息队列及延迟队列

springmvc+redis实现消息队列及延迟队列

java使用Redis实现延时队列需求

前言:有收到一个需求需要对上报上来的告警通过配置的规则(收到消息后x秒后发送、发送x次、每次间隔x秒)发送对应的短信提醒用户,当时想到用mq的延时队列来实现,但是只有这一个地方需要使用到队列,引入mq有点杀鸡用牛刀了,后续了解到Redis也可以实现类型延时队列的功能,就使用Redis实现了,记录一下

1.引入Redis

 <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
        <version>2.3.6.RELEASE</version>
 </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2.定义任务抽象接口

public interface RedisDelayTask {

    /**
     *  任务ID
     * @return
     */
    String getId();

    /**
     *  队列中的值
     * @return
     */
    String getValue();

    /**
     *  延迟时间(单位:s)
     * @return
     */
    long getDelayTime();

    /**
     *  任务执行
     */
    void execute();

    /**
     * 任务类型(具体业务类型)
     */
     String getTopic();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

3.实现任务抽象接口


import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.platform.maintain.queue.RedisDelayTask;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

@Data
public class AbstractRedisDelayTask implements RedisDelayTask {

    /**
     * 延迟任务的唯一标识,用于检索任务
     */
    @JsonSerialize(using = ToStringSerializer.class)
    protected String id;


    protected String value;

    @ApiModelProperty(value = "过期时间(秒)")
    private long delayTime;

    protected String topic;

    public AbstractRedisDelayTask(String id, String value, long delayTime) {
        this.id = id;
        this.value = value;
        this.delayTime = delayTime;
    }

    public AbstractRedisDelayTask(String id, String value, long delayTime,String topic) {
        this.id = id;
        this.value = value;
        this.delayTime = delayTime;
        this.topic=topic;
    }

    public AbstractRedisDelayTask(String value) {
        this.value=value;
    }

    @Override
    public void execute() {
        //业务
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

4.定义任务管理器

import cn.hutool.core.util.ObjectUtil;
import com.platform.maintain.queue.RedisDelayTask;
import com.platform.maintain.queue.imple.SecureTask;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public class RedisSecureQueueManager implements InitializingBean {
    
    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 添加延迟任务到队列
     *
     * @param task
     */
    public void addTask(RedisDelayTask task) {
        Long rank = redisTemplate.opsForZSet().rank(task.getId(), task.getValue());
        if(ObjectUtil.isNull(rank)){
            long delayedTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(task.getDelayTime(), TimeUnit.SECONDS);
            boolean r = redisTemplate.opsForZSet().add(task.getId(), task.getValue(), delayedTime);
        }
    }

    public void deleteTask(RedisDelayTask task){
        redisTemplate.opsForZSet().remove("SecureTask", task.getValue());
    }
    /**
     * 检查并执行任务
     */
    private void checkAndExecuteTask() {
        while (true) {
            // score就是任务要执行的时间点,如果<=当前时间,说明任务该执行了
            Set<ZSetOperations.TypedTuple<String>> tuples = redisTemplate.opsForZSet().rangeByScoreWithScores("SecureTask", 0, System.currentTimeMillis());
            if (!CollectionUtils.isEmpty(tuples)) {
                for (ZSetOperations.TypedTuple<String> tuple : tuples) {
                    // 移除并执行任务
                    RedisDelayTask task = new SecureTask(tuple.getValue());
                    if (task != null) {
                        // 从队列中删除
                        redisTemplate.opsForZSet().remove("SecureTask", tuple.getValue());
                        task.execute();
                    }
                }
            }
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 新起一个线程执行任务
        new Thread(() -> {
            System.out.println("已开启延时任务!!");
            checkAndExecuteTask();
        }, "redis-SecureTask").start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

5.继承任务实现类并重写业务接口


import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.platform.common.api.RemoteUserYhService;
import com.platform.common.api.cache.IUserYhCache;
import com.platform.common.api.domain.SysUser;
import com.platform.common.api.model.LoginUser;
import com.platform.common.core.domain.R;
import com.platform.common.core.utils.SpringUtils;
import com.platform.maintain.queue.manager.RedisMessageTaskQueueManager;
import com.platform.maintain.queue.vo.WaterLevelDto;
import com.platform.maintain.util.SmsSendSimple;

import java.util.Date;
import java.util.List;

public class SecureTask extends AbstractRedisDelayTask {

    private RedisMessageTaskQueueManager redisMessageTaskQueueManager = SpringUtils.getBean(RedisMessageTaskQueueManager.class);

    public SecureTask(String id, String value, long delayTime) {
        super(id, value, delayTime);
    }

    public SecureTask(String id, String value, long delayTime, String topic) {
        super(id, value, delayTime, topic);
    }

    public SecureTask(String value) {
        super(value);
    }

    @Override
    public void execute() {
    	//实现具体的业务流程
        WaterLevelDto waterLevelDto = JSONObject.parseObject(this.value, WaterLevelDto.class);
        //如果执行失败可以重新将任务放回队列
        RedisDelayTask task = new NoticeTask("MessageTask",JSONObject.toJSONString(waterLevel), smsTemplate.getSendInterval(),"失败重试");

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/484288
推荐阅读
相关标签
  

闽ICP备14008679号