当前位置:   article > 正文

redis之zset配合多线程并发提升执行效率_redis zset并发

redis zset并发

目录

一. 应用场景

二. 程序中使用到的zset方法

三. 代码示范

3.1 add 方法向集合中添加元素, 提供给定时任务进行处理
3.2 定时任务获取元素进行消费(上传obs存储)

四. 总结

全文内容

1. 应用场景

提升程序运行稳定性。批量上传dcm至obs存储。

2. zset使用的方法

removeRange 清空指定范围的元素
zCard 获取集合中元素个数
range 根据范围获取队列中元素, 返回元素按照 分值先后顺序排列
zadd 向集合中添加元素 注:使用zadd重复向集合中插入相同数据, 集合数量保持不变

3. 代码示范

  • add 方法向集合中添加元素, 提供给定时任务进行处理new Date().getTime()作为分值使用,OBS_UPLOAD_KEY是自定义的KEY
redisUtils.zadd(DcmUploadService.OBS_UPLOAD_KEY
, JSONObject.toJSONString(obsUploadMessage), new Date().getTime());
  • 1
  • 2

判断集合中是否存在元素, 若存在使用定时任务定时获取集合中的元素

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.cloud.dcm.common.config.redis.RedisUtils;
import com.cloud.dcm.consts.AccessLogStatus;
import com.cloud.dcm.entity.req.StudyReq.StudyInsertReq;
import com.cloud.dcm.entity.req.StudyReq.StudyParam;
import com.cloud.dcm.service.dcm.config.UploadDcmThreadConfig;
import com.cloud.dcm.service.ocrservice.AccessLogService;
import com.cloud.dcm.util.obs.OBSClientUtils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.File;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
@Scheduled(cron = "${uploaddcm.schedualCron}")
public void consum() {
    long count = getZSetCount(OBS_UPLOAD_KEY, reenTrantLock);
    if(count <= 0) {
        printLogFormat("没有消费任务. {}", count);
        return;
    }
    ...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

获取元素进行消费。这里为了提升消费性能使用了两次线程池并发处理(第一次:包括业务代码 impl方法, 第二次:与并发上传obs)

@Scheduled(cron = "${uploaddcm.schedualCron}")
public void consum() {
    ...
    // 默认获取的数量
    int getJob = uploadDcmThreadConfig.getConcurrentRegJobNum();
     if(getJob == 0) {
         getJob = 10;
     }
     printLogFormat("配置: 待消费任务: {}, 每次获取带消费任务数: {}", count, getJob);
     while(count > 0) {
         int realJobNum;
         Set set;
         try{
             reenTrantLock.lock();
             // 获取最早时间的任务
             set = redisUtils.range(OBS_UPLOAD_KEY, 0, getJob);
             realJobNum = set.size();
             printLogFormat("实际获取任务数: {}", realJobNum);
             redisUtils.removeRange(OBS_UPLOAD_KEY, 0, realJobNum - 1);
         } finally {
             reenTrantLock.unlock();
         }
         threadJob(OBS_UPLOAD_KEY, realJobNum, set, o -> impl(o));
         // 清楚消费后的任务
         count = getZSetCount(OBS_UPLOAD_KEY, reenTrantLock);
         printLogFormat("队列中剩余任务数: {}", count);
     }
     printLogFormat("消费完成."+ redisUtils.zCard(OBS_UPLOAD_KEY));
}
private void impl(Object setObj) {
    DcmUploadService.ObsUploadMessage obsUploadMessage = JSON.parseObject(setObj.toString(), DcmUploadService.ObsUploadMessage.class);
    // 查询dcm文件地址
    StudyInsertReq studyInsertReq = obsUploadMessage.getStudyInsertReq();
    List<StudyParam> studyList = obsUploadMessage.getStudyInsertReq().getStudyList();
    if(CollUtil.isEmpty(studyList)) {
        printLogFormat("查询的study表数据个数为 0.");
        return;
    }
    printLogFormat("查询到study对象数据为: {}", studyList.stream().mapToInt(study -> study.getSeriesList().stream().mapToInt(series -> series.getInstancesList().size()).sum()));
    // path 前不能有 斜杠, obs生成 tempurl时会因path前斜杠 生成 双斜杠的路径,访问会报错
    String obsPath = StringUtils.joinWith(StrUtil.SLASH, orgCode, obsUploadMessage.getObsRealPath());
    int getJob = uploadDcmThreadConfig.getConcurrentDcmUploadJobNum();
//        printLogFormat("接收到文件数量: {}", studyList);
    studyList.forEach(study -> {
        // 获取路径
        String filePath = study.getFilePath();
        String regId = obsUploadMessage.getRegId();
        File dir = new File(filePath);
        study.setFilePath(obsPath);
        if(dir.exists() && dir.isDirectory()) {
            Object key = study.getDcmStudyID()+new Date().getTime();
            try{
                regLock.lock();
                Arrays.stream(dir.listFiles()).forEach(file -> {
                    ZsetFileMsg zsetFileMsg = new ZsetFileMsg();
                    zsetFileMsg.setFileAbsPath(file.getAbsolutePath());
                    zsetFileMsg.setRegId(regId);
                    zsetFileMsg.setObsPath(obsPath);
                    redisUtils.zadd(key, JSON.toJSONString(zsetFileMsg), new Date().getTime());
                });
            }finally {
                regLock.unlock();
            }
            long count = getZSetCount(key, regLock);
            while(count > 0) {
                Set set;
                int realJobNum;
                try{
                    regLock.lock();
                    set = redisUtils.range(key, 0, getJob);
                    realJobNum = set.size();
                    redisUtils.removeRange(key, 0, realJobNum -1);
                }finally {
                    regLock.unlock();
                }
                threadJob(key.toString(), realJobNum, set, s -> {
                    ZsetFileMsg zsetFileMsg = JSON.parseObject(s.toString(), ZsetFileMsg.class);
                    executeUpload(new File(zsetFileMsg.getFileAbsPath()), zsetFileMsg.getObsPath(), zsetFileMsg.getRegId());
                });
                // 清楚消费后的任务
                count = getZSetCount(key, regLock);
            }
        }
    });
    ...
    // 消费后更新任务
    printLogFormat("更新任务日志. 任务regId: {}", obsUploadMessage.getRegId());
}
private void printLogFormat(String msg, Object ... value) {
   log.info("obs上传任务: "+msg, value);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91

抽象出线程处理任务(第一、二次线程池并发处理时用)

private void threadJob(String jobName, int threadNum, Set set, JobInterface<Object> jobInterface) {
        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        for (Object o : set) {
            ExecutorService service = Executors.newFixedThreadPool(threadNum);
            service.execute(() -> {
                try{
                    jobInterface.apply(o);
                    countDownLatch.countDown();
                }catch (Throwable e) {
                    countDownLatch.countDown();
                    log.error("消费任务异常. e: ", e);
                }
            });
        }
        try {
            log.info("当前线程: {}, 线程任务: {}, 并行线程总数: {}, 剩余线程数: {}", Thread.currentThread().getName(), jobName, threadNum, countDownLatch.getCount());
            countDownLatch.await();
        } catch (InterruptedException e) {
            log.error("InterruptedException: ", e);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

使用同步锁获取集合数量, 避免并发重复获取

private long getZSetCount(Object key, ReentrantLock reenTrantLock) {
    try{
        reenTrantLock.lock();
        return redisUtils.zCard(key);
    }finally {
        reenTrantLock.unlock();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

创建接口,方便使用匿名方法调用

public interface JobInterface<T> {
 void apply(T t);
}
  • 1
  • 2
  • 3

4. 总结

  1. 使用过程中对于多线程部分需要注意局部变量的使用,避免并发导致多线程内容交叉访问产生了脏数据。
  2. 同步锁必须释放。通过finally块处理。
  3. countdownlatch同步块可以多个线程自己使用。线程之间使用,不同线程之间使用countdownlatch.await()方法不会阻塞其他线程。
  4. 使用 removeRange方法时一定要注意不要误删了元素。只删除实际获取到的set集合中的元素(通过同步锁控制)。
  5. zset.range获取最先进入集合的元素。最先进入的最先消费
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/728109
推荐阅读
相关标签
  

闽ICP备14008679号