赞
踩
利用redis的排序列表,ZSet进行需求实现, 下面是我的流程图和思路导线
我们把Zset中的score当成时间戳, 这样我们就可以获得以时间戳排序的任务列表, 这我们通过score区间进行拉取任务,进行消费.
- package com.zjrcinfo.zjguahao.common.redis.delayqueue;
-
- import com.zjrcinfo.zjguahao.common.redis.cluster.JedisClusterCache;
- import com.zjrcinfo.zjguahao.common.utils.ThreadPoolUtil;
- import com.zjrcinfo.zjguahao.common.web.log.LoggerName;
- import org.apache.shiro.util.CollectionUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import redis.clients.jedis.Tuple;
-
- import javax.annotation.PostConstruct;
- import java.util.Calendar;
- import java.util.Set;
- import java.util.concurrent.TimeUnit;
-
- /**
- * Description: 延时队列工厂
- * User: zhouzhou
- * Date: 2019-09-26
- * Time: 14:32
- */
- public abstract class AbstractDelayQueueMachineFactory {
-
- protected Logger logger = LoggerFactory.getLogger(LoggerName.KAFKA);
-
- @Autowired
- protected JedisClusterCache jedisClusterCache;
-
- /**
- * 插入任务id
- * @param jobId 任务id(队列内唯一)
- * @param time 延时时间(单位 :秒)
- * @return 是否插入成功
- */
- public boolean addJobId(String jobId, Integer time) {
- Calendar instance = Calendar.getInstance();
- instance.add(Calendar.SECOND, time);
- long delaySeconds = instance.getTimeInMillis() / 1000;
- Long zadd = jedisClusterCache.zadd(setDelayQueueName(), delaySeconds, jobId);
- return zadd > 0;
-
- }
-
- private void startDelayQueueMachine() {
- logger.info(String.format("延时队列机器{%s}开始运作", setDelayQueueName()));
-
- // 发生异常捕获并且继续不能让战斗停下来
- while (true) {
- try {
- // 获取当前时间的时间戳
- long now = System.currentTimeMillis() / 1000;
- // 获取当前时间前的任务列表
- Set<Tuple> tuples = jedisClusterCache.zrangeByScoreWithScores(setDelayQueueName(), 0, now);
- // 如果不为空则遍历判断其是否满足取消要求
- if (!CollectionUtils.isEmpty(tuples)) {
- for (Tuple tuple : tuples) {
-
- String jobId = tuple.getElement();
- Long num = jedisClusterCache.zrem(setDelayQueueName(), jobId);
- // 如果移除成功, 则取消订单
- if (num > 0) {
- ThreadPoolUtil.execute(() ->invoke(jobId));
- }
- }
- }
-
- } catch (Exception e) {
- logger.warn(String.format("处理延时任务发生异常,异常原因为{%s}", e.getMessage()), e);
- } finally {
- // 间隔一秒钟搞一次
- try {
- TimeUnit.SECONDS.sleep(1L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }
- }
-
- }
-
-
- /**
- * 最终执行的任务方法
- * @param jobId 任务id
- */
- public abstract void invoke(String jobId);
-
-
- /**
- * 要实现延时队列的名字
- *
- */
- public abstract String setDelayQueueName();
-
-
- @PostConstruct
- public void init(){
- new Thread(this::startDelayQueueMachine).start();
- }
-
- }
- /**
- * Description: 测试订单延时队列
- * User: zhouzhou
- * Date: 2019-09-26
- * Time: 15:14
- */
- @Component
- public class TestOrderDelayQueue extends AbstractDelayQueueMachineFactory {
-
- @Autowired
- private TestOrderDelayQueueService testOrderDelayQueueService;
-
- @Override
- public void invoke(String jobId) {
- testOrderDelayQueueService.cancelOrder(jobId);
- }
-
- @Override
- public String setDelayQueueName() {
- return "TestOrder";
- }
-
-
- }
- package com.zjrcinfo.zjguahao.product.service.impl;
-
- import org.springframework.stereotype.Service;
-
- /**
- * Description:
- * User: zhouzhou
- * Date: 2019-09-26
- * Time: 15:21
- */
- @Service
- public class TestOrderDelayQueueService {
-
-
- public void cancelOrder(String orderNumber) {
- System.out.println(System.currentTimeMillis() + "ms:redis消费了一个任务:消费的订单OrderId为" + orderNumber);
- }
-
- public void cancelReg(String orderNumber) {
- System.out.println(System.currentTimeMillis() + "ms:redis消费了一个任务:消费的挂号订单OrderId为" + orderNumber);
- }
- }
- package com.zjrcinfo.zjguahao.product.controller;
-
-
- import com.zjrcinfo.zjguahao.common.web.log.LoggerName;
- import com.zjrcinfo.zjguahao.product.service.delay.TestOrderDelayQueue;
- import io.swagger.annotations.Api;
- import io.swagger.annotations.ApiOperation;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RequestMethod;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.Random;
-
- /**
- * Description:
- * User: zhouzhou
- * Date: 2019-06-03
- * Time: 17:47
- */
- @RestController
- @Api("缓存测试")
- @RequestMapping("/redis")
- public class RedisTestController {
-
- private Logger logger = LoggerFactory.getLogger(LoggerName.REMOTE);
-
- @Autowired
- private TestOrderDelayQueue testOrderDelayQueue;
-
- // ------------------------ 延时队列 -------------------
- @ApiOperation("添加定时orderId")
- @RequestMapping(value = "/addDelayOrder/{orderId}/{time}", method = RequestMethod.POST)
- public Object addZset(@PathVariable String orderId, @PathVariable Integer time) {
-
- boolean flag = testOrderDelayQueue.addJobId(orderId, time);
- return String.format("已经存入了订单id{%s},延时{%s}秒", orderId, time);
-
- }
-
- }
最后奉上, github代码示例: https://github.com/zjhzzhouzhou/redis-project 希望帮助到大家.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。