赞
踩
首先导入两个依赖的jar
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.0.12</version>
</dependency>
<dependency>
<groupId>com.github.tobato</groupId>
<artifactId>fastdfs-client</artifactId>
<version>1.26.2</version>
</dependency>
前端需要把文件分成片段上传,每一个片段都有一个id
在正式上传文件服务器fastdfs之前还要有一次检查功能,就是检查你这个上传的文件是否之前存在或者上传过。
如果是之前上传过没上传完成,就会给前端返回一个上一次中断在哪里的片段id
public RestResponse<CheckFileResult> checkFile(@RequestParam Map<String, Object> paramMap,HttpServletRequest request) throws BusinessException { String fileMd5 = (String) paramMap.get("fileMd5"); Long userId = Optional.ofNullable(request.getHeader("userId")).map(Long::valueOf).orElse(1L); if (StrUtil.isEmpty(fileMd5)) { return new RestResponse<>(RestRespCode.PARAM_ERROR_md5, message(MessageKeys.PARAM_ERROR_md5), null); } CheckFileResult checkFileResult = new CheckFileResult(); //查询上传过的数据 List<String> fileList= jedisClusterTemplate.lrange(UpLoadConstant.completedList,0,-1); if (CollUtil.isNotEmpty(fileList)){ for (String e:fileList){ JSONObject obj= JSONUtil.parseObj(e); if (obj.get("md5").equals(fileMd5)){ checkFileResult.setTotalSize(obj.getLong("lenght")); checkFileResult.setViewPath(obj.getStr("url")); return new RestResponse<CheckFileResult>(RestRespCode.OK, message(MessageKeys.SYSTEM_SUCCESS), checkFileResult); } } } // 查询锁占用 String lockName = UpLoadConstant.currLocks + fileMd5; Long lock = jedisClusterTemplate.incrBy(lockName, 1); String lockOwner = UpLoadConstant.lockOwner + fileMd5; String chunkCurrkey = UpLoadConstant.chunkCurr + fileMd5; if (lock > 1) { checkFileResult.setLock(1); //检查是否为锁的拥有者,如果是放行 String oWner = jedisClusterTemplate.get(lockOwner); if (StrUtil.isEmpty(oWner)) { return new RestResponse<>(RestRespCode.UNABLE_TO_OBTAIN_FILE_LOCK_OWNER, message(MessageKeys.UNABLE_TO_OBTAIN_FILE_LOCK_OWNER), null); } else { if ( Long.valueOf(oWner).equals(userId)) { String chunkCurr = jedisClusterTemplate.get(chunkCurrkey); if (StrUtil.isEmpty(chunkCurr)) { return new RestResponse<>(RestRespCode.NOT_GET_CURRENT_FILE_CHUNKCURR, message(MessageKeys.NOT_GET_CURRENT_FILE_CHUNKCURR), null); } checkFileResult.setChunkCurr(Convert.toInt(chunkCurr)); return new RestResponse<CheckFileResult>(RestRespCode.OK, message(MessageKeys.SYSTEM_SUCCESS), checkFileResult); } else { return new RestResponse<>(RestRespCode.PARAM_ERROR, message(MessageKeys.SYS_ERROR), null); } } } else { // 初始化锁.分块 saveRedisDataToJedis(lockOwner, String.valueOf(userId)); saveRedisDataToJedis(chunkCurrkey, "0"); checkFileResult.setChunkCurr(0); return new RestResponse<CheckFileResult>(RestRespCode.OK, message(MessageKeys.SYSTEM_SUCCESS), checkFileResult); } }
文件分片上传功能需要依赖redis来记录文件上传的片数,防止重复上传,
public RestResponse<List<Map<String, Object>>> upload_do(@RequestParam Map<String, Object> paramMap, HttpServletRequest request) throws BusinessException { List<Map<String, Object>> resultMap = new ArrayList<Map<String, Object>>(); String noGroupPath;//存储在fastdfs不带组的路径 String fileMd5= (String) paramMap.get("fileMd5"); String chunklockName= UpLoadConstant.chunkLock+fileMd5; String temOwner= RandomUtil.randomUUID(); //真正的拥有者 boolean currOwner=false; try { if (!paramMap.containsKey("chunk")){ paramMap.put("chunk","0"); } if (!paramMap.containsKey("chunks")){ paramMap.put("chunks","1"); } Long lock= jedisClusterTemplate.incrBy(chunklockName,1); if (lock>1){ return new RestResponse<>(RestRespCode.PARAM_ERROR, message(MessageKeys.SYS_ERROR), null); } //写入锁的当前拥有者 currOwner=true; List<MultipartFile> files = ((MultipartHttpServletRequest) request).getFiles("file"); MultipartFile file = null; BufferedOutputStream stream = null; String chunk= (String) paramMap.get("chunk"); String chunkCurrkey= UpLoadConstant.chunkCurr+fileMd5; //redis中记录当前应该穿第几块(从0开始) String chunkCurr= jedisClusterTemplate.get(chunkCurrkey); noGroupPath = ""; Integer chunkSize= Convert.toInt(paramMap.get("chunkSize")); if (StrUtil.isEmpty(chunkCurr)){ return new RestResponse<>(RestRespCode.NOT_GET_CURRENT_FILE_CHUNKCURR, message(MessageKeys.NOT_GET_CURRENT_FILE_CHUNKCURR), null); } Integer chunkCurr_int= Convert.toInt(chunkCurr); Integer chunk_int= Convert.toInt(chunk); if (chunk_int<chunkCurr_int){ return new RestResponse<>(RestRespCode.REPEAT_UPLOAD, message(MessageKeys.REPEAT_UPLOAD), null); }else if (chunk_int>chunkCurr_int){ return new RestResponse<>(RestRespCode.WAIT_A_MOMENT, message(MessageKeys.WAIT_A_MOMENT), null); } StorePath path = null; String name =null; // 暂时不支持多文件上传 for (int i = 0; i < files.size(); ++i) { Map<String, Object> map = new HashMap<String, Object>(); file = files.get(i); String originalFilename = file.getOriginalFilename(); if (!file.isEmpty()) { try { // 获取已经上传文件大小 Long historyUpload = 0L; String historyUploadStr = jedisClusterTemplate.get(UpLoadConstant.historyUpload + fileMd5); if (StrUtil.isNotEmpty(historyUploadStr)) { historyUpload = Convert.toLong(historyUploadStr); } LOG.debug("historyUpload大小:" + historyUpload); if (chunk_int == 0) { String s = Convert.toStr(chunk_int + 1); saveRedisDataToJedis(chunkCurrkey, Convert.toStr(chunkCurr_int+1)); LOG.debug(chunk+":redis块+1"); try { name = FileUtil.extName((String) paramMap.get("fileName")); path = appendFileStorageClient.uploadAppenderFile(UpLoadConstant.DEFAULT_GROUP, file.getInputStream(), file.getSize(), name); LOG.debug(chunk+":更新完fastdfs"); if (path== null ){ jedisClusterTemplate.get(chunkCurrkey, Convert.toStr(chunkCurr_int)); return new RestResponse<>(RestRespCode.ERROR_GETTING_REMOTE_FILE_PATH, message(MessageKeys.ERROR_GETTING_REMOTE_FILE_PATH), null); } } catch (Exception e) { jedisClusterTemplate.get(chunkCurrkey, Convert.toStr(chunkCurr_int)); // 还原历史块 LOG.error("初次上传远程文件出错", e); return new RestResponse<>(RestRespCode.FILE_FOR_THE_FIRST_TIME_ERROR, message(MessageKeys.FILE_FOR_THE_FIRST_TIME_ERROR), null); } noGroupPath = path.getPath(); saveRedisDataToJedis(UpLoadConstant.fastDfsPath + fileMd5, path.getPath()); LOG.debug("上传文件 result={}", noGroupPath); } else { saveRedisDataToJedis(chunkCurrkey, Convert.toStr(chunkCurr_int + 1)); LOG.debug(chunk + ":redis块+1"); noGroupPath = jedisClusterTemplate.get(UpLoadConstant.fastDfsPath + fileMd5); if (noGroupPath == null) { return new RestResponse<>(RestRespCode.UPLOADED_REMOTE_SERVER_FILE_ERROR, message(MessageKeys.UPLOADED_REMOTE_SERVER_FILE_ERROR), null); } try { //追加方式实际实用如果中途出错多次,可能会出现重复追加情况,这里改成修改模式,即时多次传来重复文件块,依然可以保证文件拼接正确 appendFileStorageClient.modifyFile(UpLoadConstant.DEFAULT_GROUP, noGroupPath, file.getInputStream(), file.getSize(),historyUpload); LOG.debug(chunk+":更新完fastdfs"); } catch (Exception e) { saveRedisDataToJedis(chunkCurrkey, Convert.toStr(chunkCurr_int)); LOG.error("更新远程文件出错", e); return new RestResponse<>(RestRespCode.ERROR_UPDATING_REMOTE_FILE, message(MessageKeys.ERROR_UPDATING_REMOTE_FILE), null); } } // 修改历史上传大小 historyUpload = historyUpload + file.getSize(); saveRedisDataToJedis(UpLoadConstant.historyUpload + fileMd5, Convert.toStr(historyUpload)); // 最后一块,清空upload,写入数据库 String fileName = (String) paramMap.get("name"); Long size = Convert.toLong(paramMap.get("size")); Integer chunks_int = Convert.toInt(paramMap.get("chunks")); if (chunk_int + 1 == chunks_int) { // 持久化上传完成文件,也可以存储在mysql中 FileResult fileResult = new FileResult(); fileResult.setMd5(fileMd5); fileResult.setName(fileName); fileResult.setLenght(size); fileResult.setUrl(UpLoadConstant.DEFAULT_GROUP+"/"+noGroupPath); // todo jedisClusterTemplate.lpush(UpLoadConstant.completedList, JSONUtil.toJsonStr(fileResult)); jedisClusterTemplate.del(UpLoadConstant.chunkCurr + fileMd5); jedisClusterTemplate.del(UpLoadConstant.fastDfsPath + fileMd5); jedisClusterTemplate.del(UpLoadConstant.currLocks + fileMd5); jedisClusterTemplate.del(UpLoadConstant.lockOwner + fileMd5); } map.put("originalFilename", originalFilename); map.put("url", UpLoadConstant.DEFAULT_GROUP+"/"+noGroupPath); resultMap.add(map); } catch (Exception e) { LOG.error("上传文件错误", e); return new RestResponse<>(RestRespCode.ERROR_UPLOADING_FILE, message(MessageKeys.ERROR_UPLOADING_FILE), null); } } break; } } finally { // 锁的当前拥有者才能释放块上传锁 if (currOwner) { saveRedisDataToJedis(chunklockName,"0"); } } return new RestResponse<List<Map<String, Object>>>(RestRespCode.OK, message(MessageKeys.SYSTEM_SUCCESS), resultMap); }
文件分片上传所依赖的工具类
public class CheckFileResult {
private String fileMd5;
//0:锁未占用,1:锁占用
private Integer lock;
//文件分块数量
private Integer chunkNum;
//每块文件大小
private Integer chunkSize;
//当前已传输到第几块
private Integer chunkCurr;
//当前文件总大小
private Long totalSize;
//访问路径
private String viewPath;
}
public class FileResult {
//文件名
private String url;
//文件md5
private String md5;
//文件名称
private String name;
//文件大小
private Long lenght;
}
public class UpLoadConstant { private UpLoadConstant() { } private final static String uploading="Uploading:"; private final static String lock=uploading+"lock:"; private final static String file=uploading+"file:"; //当前所有锁(用在不同用户的上传前或重传前对整个文件的锁) public final static String currLocks=lock+"currLocks:"; //当前锁的拥有者 public final static String lockOwner=lock+"lockOwner:"; //当前文件传输到第几块 public final static String chunkCurr=file+"chunkCurr:"; //当前文件上传到fastdfs路径 public final static String fastDfsPath=file+"fastDfsPath:"; //默认分组 public final static String DEFAULT_GROUP = "group1"; //全部上传成功已完成 public final static String completedList=uploading+"completedList"; //文件块锁(解决同一个用户正在上传时并发解决,比如后端正在上传一个大文件块,前端页面点击删除按钮, // 继续添加删除的文件,这时候要加锁来阻止其上传,否则会出现丢块问题, /*因为fastdfs上传不像迅雷下载一样,下载时会创建一个完整的文件,如果上传第一块时, 服务器能快速创建一个大文件0填充,那么这样可以支持并发乱序来下载文件块,上传速度会成倍提升, 要实现乱序下载文件块,估计就得研究fastdfs源码了)*/ public final static String chunkLock=lock+"chunkLock:"; public final static String historyUpload="historyUpload:"; }
如果是springboot的项目就不需要下面这个工具,ssm架构的配置文件加载和springboot的配置加载不一样,如果ssm架构的项目在这个文件分片上传的代码里是读取不到你的fastdfs文件服务器的位置的,所以下面这个文件就是写个工具类让它帮你加载你的文件服务器的ip,如果有多台直接在list集合里面添加
@Configuration @Import(FdfsClientConfig.class) // 解决jmx重复注册bean的问题 @EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING) //类名无特定限制 @Component public class FastClient implements BeanPostProcessor , InitializingBean, ApplicationContextAware { @Override public Object postProcessBeforeInitialization(Object o, String s) throws BeansException { if (o instanceof TrackerConnectionManager){ List list=new ArrayList(); String ip="106.52.47.126:22122"; list.add(ip); ((TrackerConnectionManager) o).setTrackerList(list); } return o; } @Override public Object postProcessAfterInitialization(Object o, String s) throws BeansException { return o; } @Override public void afterPropertiesSet() throws Exception { } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。