赞
踩
目录
全文内容
提升程序运行稳定性。批量上传dcm至obs存储。
removeRange
清空指定范围的元素
zCard
获取集合中元素个数
range
根据范围获取队列中元素, 返回元素按照 分值先后顺序排列
zadd
向集合中添加元素 注:使用zadd重复向集合中插入相同数据, 集合数量保持不变
new Date().getTime()作为分值使用,OBS_UPLOAD_KEY是自定义的KEY
redisUtils.zadd(DcmUploadService.OBS_UPLOAD_KEY
, JSONObject.toJSONString(obsUploadMessage), new Date().getTime());
判断集合中是否存在元素, 若存在使用定时任务定时获取集合中的元素
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.cloud.dcm.common.config.redis.RedisUtils; import com.cloud.dcm.consts.AccessLogStatus; import com.cloud.dcm.entity.req.StudyReq.StudyInsertReq; import com.cloud.dcm.entity.req.StudyReq.StudyParam; import com.cloud.dcm.service.dcm.config.UploadDcmThreadConfig; import com.cloud.dcm.service.ocrservice.AccessLogService; import com.cloud.dcm.util.obs.OBSClientUtils; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.File; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock; @Scheduled(cron = "${uploaddcm.schedualCron}") public void consum() { long count = getZSetCount(OBS_UPLOAD_KEY, reenTrantLock); if(count <= 0) { printLogFormat("没有消费任务. {}", count); return; } ... }
获取元素进行消费。这里为了提升消费性能使用了两次线程池并发处理(第一次:包括业务代码 impl方法, 第二次:与并发上传obs)
@Scheduled(cron = "${uploaddcm.schedualCron}") public void consum() { ... // 默认获取的数量 int getJob = uploadDcmThreadConfig.getConcurrentRegJobNum(); if(getJob == 0) { getJob = 10; } printLogFormat("配置: 待消费任务: {}, 每次获取带消费任务数: {}", count, getJob); while(count > 0) { int realJobNum; Set set; try{ reenTrantLock.lock(); // 获取最早时间的任务 set = redisUtils.range(OBS_UPLOAD_KEY, 0, getJob); realJobNum = set.size(); printLogFormat("实际获取任务数: {}", realJobNum); redisUtils.removeRange(OBS_UPLOAD_KEY, 0, realJobNum - 1); } finally { reenTrantLock.unlock(); } threadJob(OBS_UPLOAD_KEY, realJobNum, set, o -> impl(o)); // 清楚消费后的任务 count = getZSetCount(OBS_UPLOAD_KEY, reenTrantLock); printLogFormat("队列中剩余任务数: {}", count); } printLogFormat("消费完成."+ redisUtils.zCard(OBS_UPLOAD_KEY)); } private void impl(Object setObj) { DcmUploadService.ObsUploadMessage obsUploadMessage = JSON.parseObject(setObj.toString(), DcmUploadService.ObsUploadMessage.class); // 查询dcm文件地址 StudyInsertReq studyInsertReq = obsUploadMessage.getStudyInsertReq(); List<StudyParam> studyList = obsUploadMessage.getStudyInsertReq().getStudyList(); if(CollUtil.isEmpty(studyList)) { printLogFormat("查询的study表数据个数为 0."); return; } printLogFormat("查询到study对象数据为: {}", studyList.stream().mapToInt(study -> study.getSeriesList().stream().mapToInt(series -> series.getInstancesList().size()).sum())); // path 前不能有 斜杠, obs生成 tempurl时会因path前斜杠 生成 双斜杠的路径,访问会报错 String obsPath = StringUtils.joinWith(StrUtil.SLASH, orgCode, obsUploadMessage.getObsRealPath()); int getJob = uploadDcmThreadConfig.getConcurrentDcmUploadJobNum(); // printLogFormat("接收到文件数量: {}", studyList); studyList.forEach(study -> { // 获取路径 String filePath = study.getFilePath(); String regId = obsUploadMessage.getRegId(); File dir = new File(filePath); study.setFilePath(obsPath); if(dir.exists() && dir.isDirectory()) { Object key = study.getDcmStudyID()+new Date().getTime(); try{ regLock.lock(); Arrays.stream(dir.listFiles()).forEach(file -> { ZsetFileMsg zsetFileMsg = new ZsetFileMsg(); zsetFileMsg.setFileAbsPath(file.getAbsolutePath()); zsetFileMsg.setRegId(regId); zsetFileMsg.setObsPath(obsPath); redisUtils.zadd(key, JSON.toJSONString(zsetFileMsg), new Date().getTime()); }); }finally { regLock.unlock(); } long count = getZSetCount(key, regLock); while(count > 0) { Set set; int realJobNum; try{ regLock.lock(); set = redisUtils.range(key, 0, getJob); realJobNum = set.size(); redisUtils.removeRange(key, 0, realJobNum -1); }finally { regLock.unlock(); } threadJob(key.toString(), realJobNum, set, s -> { ZsetFileMsg zsetFileMsg = JSON.parseObject(s.toString(), ZsetFileMsg.class); executeUpload(new File(zsetFileMsg.getFileAbsPath()), zsetFileMsg.getObsPath(), zsetFileMsg.getRegId()); }); // 清楚消费后的任务 count = getZSetCount(key, regLock); } } }); ... // 消费后更新任务 printLogFormat("更新任务日志. 任务regId: {}", obsUploadMessage.getRegId()); } private void printLogFormat(String msg, Object ... value) { log.info("obs上传任务: "+msg, value); }
抽象出线程处理任务(第一、二次线程池并发处理时用)
private void threadJob(String jobName, int threadNum, Set set, JobInterface<Object> jobInterface) { CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (Object o : set) { ExecutorService service = Executors.newFixedThreadPool(threadNum); service.execute(() -> { try{ jobInterface.apply(o); countDownLatch.countDown(); }catch (Throwable e) { countDownLatch.countDown(); log.error("消费任务异常. e: ", e); } }); } try { log.info("当前线程: {}, 线程任务: {}, 并行线程总数: {}, 剩余线程数: {}", Thread.currentThread().getName(), jobName, threadNum, countDownLatch.getCount()); countDownLatch.await(); } catch (InterruptedException e) { log.error("InterruptedException: ", e); } }
使用同步锁获取集合数量, 避免并发重复获取
private long getZSetCount(Object key, ReentrantLock reenTrantLock) {
try{
reenTrantLock.lock();
return redisUtils.zCard(key);
}finally {
reenTrantLock.unlock();
}
}
创建接口,方便使用匿名方法调用
public interface JobInterface<T> {
void apply(T t);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。