当前位置:   article > 正文

springcloud+vue3实现分片上传,断点续传_vue3分片上传插件

vue3分片上传插件

背景

        现在做一些应用级的服务平台时,有时会遇到用户需要上传一个较大的文件,但是上传失败后需要支持下次从失败的地方开始上传,这时候需要用到端点续传,分片上传的解决方案。本文介绍了一种策略来实现这个场景,涉及技术栈有 ng,springboot,vue,minio,amazonS3等框架和组件。

准备工作

1、ng设置文件大小

做这一步的目的是为了让文件分片后不会因为size而导致传不上去。

client_max_body_size 100m;  # 目前我们项目需要,这里设置的100m

  1. http {
  2. include /etc/nginx/mime.types;
  3. default_type application/octet-stream;
  4. log_format main '$remote_addr - $remote_user [$time_local] "$request" '
  5. '$status $body_bytes_sent "$http_referer" '
  6. '"$http_user_agent" "$http_x_forwarded_for"';
  7. access_log /var/log/nginx/access.log main;
  8. # 在这里设置一下最大支持传输文件大小
  9. client_max_body_size 100m;
  10. sendfile on;
  11. #tcp_nopush on;
  12. keepalive_timeout 65;
  13. ....
  14. }

2、springboot配置

  1. servlet:
  2. multipart:
  3. max-file-size: 100MB
  4. # 最大支持请求大小
  5. max-request-size: 500MB

接入

前端上传组件 UploadParallel.vue

  1. <script setup>
  2. import { UploadFilled } from '@element-plus/icons-vue'
  3. import md5 from "../lib/md5";
  4. import { taskInfo, initTask, preSignUrl, merge } from '../lib/api';
  5. import {ElNotification} from "element-plus";
  6. import Queue from 'promise-queue-plus';
  7. import axios from 'axios'
  8. import { ref } from 'vue'
  9. // 文件上传分块任务的队列(用于移除文件时,停止该文件的上传队列) key:fileUid value: queue object
  10. const fileUploadChunkQueue = ref({}).value
  11. /**
  12. * 获取一个上传任务,没有则初始化一个
  13. */
  14. const getTaskInfo = async (file) => {
  15. let task;
  16. const identifier = await md5(file)
  17. const { code, data, msg } = await taskInfo(identifier)
  18. if (code === 200) {
  19. task = data
  20. if (!task || Object.keys(task).length === 0) {
  21. const initTaskData = {
  22. identifier,
  23. fileName: file.name,
  24. totalSize: file.size,
  25. chunkSize: 5 * 1024 * 1024
  26. }
  27. const { code, data, msg } = await initTask(initTaskData)
  28. if (code === 200) {
  29. task = data
  30. } else {
  31. ElNotification.error({
  32. title: '文件上传错误',
  33. message: msg
  34. })
  35. }
  36. }
  37. } else {
  38. ElNotification.error({
  39. title: '文件上传错误',
  40. message: msg
  41. })
  42. }
  43. return task
  44. }
  45. /**
  46. * 上传逻辑处理,如果文件已经上传完成(完成分块合并操作),则不会进入到此方法中
  47. */
  48. const handleUpload = (file, taskRecord, options) => {
  49. let lastUploadedSize = 0; // 上次断点续传时上传的总大小
  50. let uploadedSize = 0 // 已上传的大小
  51. const totalSize = file.size || 0 // 文件总大小
  52. let startMs = new Date().getTime(); // 开始上传的时间
  53. const { exitPartList, chunkSize, chunkNum, fileIdentifier } = taskRecord
  54. // 获取从开始上传到现在的平均速度(byte/s)
  55. const getSpeed = () => {
  56. // 已上传的总大小 - 上次上传的总大小(断点续传)= 本次上传的总大小(byte)
  57. const intervalSize = uploadedSize - lastUploadedSize
  58. const nowMs = new Date().getTime()
  59. // 时间间隔(s)
  60. const intervalTime = (nowMs - startMs) / 1000
  61. return intervalSize / intervalTime
  62. }
  63. const uploadNext = async (partNumber) => {
  64. const start = new Number(chunkSize) * (partNumber - 1)
  65. const end = start + new Number(chunkSize)
  66. const blob = file.slice(start, end)
  67. const { code, data, msg } = await preSignUrl({ identifier: fileIdentifier, partNumber: partNumber} )
  68. if (code === 200 && data) {
  69. await axios.request({
  70. url: data,
  71. method: 'PUT',
  72. data: blob,
  73. headers: {'Content-Type': 'application/octet-stream'}
  74. })
  75. return Promise.resolve({ partNumber: partNumber, uploadedSize: blob.size })
  76. }
  77. return Promise.reject(`分片${partNumber}, 获取上传地址失败`)
  78. }
  79. /**
  80. * 更新上传进度
  81. * @param increment 为已上传的进度增加的字节量
  82. */
  83. const updateProcess = (increment) => {
  84. increment = new Number(increment)
  85. const { onProgress } = options
  86. let factor = 1000; // 每次增加1000 byte
  87. let from = 0;
  88. // 通过循环一点一点的增加进度
  89. while (from <= increment) {
  90. from += factor
  91. uploadedSize += factor
  92. const percent = Math.round(uploadedSize / totalSize * 100).toFixed(2);
  93. onProgress({percent: percent})
  94. }
  95. const speed = getSpeed();
  96. const remainingTime = speed != 0 ? Math.ceil((totalSize - uploadedSize) / speed) + 's' : '未知'
  97. console.log('剩余大小:', (totalSize - uploadedSize) / 1024 / 1024, 'mb');
  98. console.log('当前速度:', (speed / 1024 / 1024).toFixed(2), 'mbps');
  99. console.log('预计完成:', remainingTime);
  100. }
  101. return new Promise(resolve => {
  102. const failArr = [];
  103. const queue = Queue(5, {
  104. "retry": 3, //Number of retries
  105. "retryIsJump": false, //retry now?
  106. "workReject": function(reason,queue){
  107. failArr.push(reason)
  108. },
  109. "queueEnd": function(queue){
  110. resolve(failArr);
  111. }
  112. })
  113. fileUploadChunkQueue[file.uid] = queue
  114. for (let partNumber = 1; partNumber <= chunkNum; partNumber++) {
  115. const exitPart = (exitPartList || []).find(exitPart => exitPart.partNumber == partNumber)
  116. if (exitPart) {
  117. // 分片已上传完成,累计到上传完成的总额中,同时记录一下上次断点上传的大小,用于计算上传速度
  118. lastUploadedSize += new Number(exitPart.size)
  119. updateProcess(exitPart.size)
  120. } else {
  121. queue.push(() => uploadNext(partNumber).then(res => {
  122. // 单片文件上传完成再更新上传进度
  123. updateProcess(res.uploadedSize)
  124. }))
  125. }
  126. }
  127. if (queue.getLength() == 0) {
  128. // 所有分片都上传完,但未合并,直接return出去,进行合并操作
  129. resolve(failArr);
  130. return;
  131. }
  132. queue.start()
  133. })
  134. }
  135. /**
  136. * el-upload 自定义上传方法入口
  137. */
  138. const handleHttpRequest = async (options) => {
  139. const file = options.file
  140. const task = await getTaskInfo(file)
  141. if (task) {
  142. const { finished, path, taskRecord } = task
  143. const { fileIdentifier: identifier } = taskRecord
  144. if (finished) {
  145. return path
  146. } else {
  147. const errorList = await handleUpload(file, taskRecord, options)
  148. if (errorList.length > 0) {
  149. ElNotification.error({
  150. title: '文件上传错误',
  151. message: '部分分片上次失败,请尝试重新上传文件'
  152. })
  153. return;
  154. }
  155. const { code, data, msg } = await merge(identifier)
  156. if (code === 200) {
  157. return path;
  158. } else {
  159. ElNotification.error({
  160. title: '文件上传错误',
  161. message: msg
  162. })
  163. }
  164. }
  165. } else {
  166. ElNotification.error({
  167. title: '文件上传错误',
  168. message: '获取上传任务失败'
  169. })
  170. }
  171. }
  172. /**
  173. * 移除文件列表中的文件
  174. * 如果文件存在上传队列任务对象,则停止该队列的任务
  175. */
  176. const handleRemoveFile = (uploadFile, uploadFiles) => {
  177. const queueObject = fileUploadChunkQueue[uploadFile.uid]
  178. if (queueObject) {
  179. queueObject.stop()
  180. fileUploadChunkQueue[uploadFile.uid] = undefined
  181. }
  182. }
  183. </script>
  184. <template>
  185. <el-card style="width: 80%; margin: 80px auto" header="文件分片上传">
  186. <el-upload
  187. class="upload-demo"
  188. drag
  189. action="/"
  190. multiple
  191. :http-request="handleHttpRequest"
  192. :on-remove="handleRemoveFile">
  193. <el-icon class="el-icon--upload"><upload-filled /></el-icon>
  194. <div class="el-upload__text">
  195. 请拖拽文件到此处或 <em>点击此处上传</em>
  196. </div>
  197. </el-upload>
  198. </el-card>
  199. </template>

前端api文件

  1. import axios from 'axios'
  2. import axiosExtra from 'axios-extra'
  3. const baseUrl = 'http://172.16.10.74:10003/dsj-file'
  4. const http = axios.create({
  5. baseURL: baseUrl,
  6. headers: {
  7. 'Dsj-Auth':'bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfaWQiOiIwMDAwMDAiLCJkc2pJZCI6ImFqZGhxaWRicSIsImRpc3RyaWN0Q29kZSI6IjQyMTEwMDAwMDAwMCIsInVzZXJfbmFtZSI6ImFkbWluIiwic29jaWFsQWNjb3VudElkIjpudWxsLCJyZWFsX25hbWUiOiLotoXnuqfnrqHnkIblkZgiLCJjbGllbnRfaWQiOiJzd29yZCIsInJvbGVfaWQiOiIxNDkzODIwMjY0OTY4OTk0ODE3IiwiaXNEZWZhdWx0UGFzc3dvcmQiOmZhbHNlLCJpZGVudGl0eVR5cGUiOiIxIiwic2NvcGUiOlsiYWxsIl0sImRlcHRTQ0NvZGUiOiIxMTQyMTEwMDc0NDYyMTAxMDgiLCJleHAiOjE2OTU4NjczMDUsImp0aSI6IjEwMmE3YzhhLTdiMWYtNDU4NC04ZWJjLWZiYmUyZTQyYmUzNCIsImlkZW50aXR5RHluYURhdGEiOnt9LCJhdmF0YXIiOiIxNjM1MTA4Nzk5MzQ3OTU3NzYyIiwiYXV0aG9yaXRpZXMiOlsiYWRtaW5pc3RyYXRvciJdLCJyb2xlX25hbWUiOiJhZG1pbmlzdHJhdG9yIiwiYWNjb3VudElkIjoiMTQ1Mzk5MzI5MjAxMDM5OTgxMSIsImxpY2Vuc2UiOiJwb3dlcmVkIGJ5IGRzaiIsInBvc3RfaWQiOiIxNTYwMTQ1MzUwMDY5NjY5ODg5IiwidXNlcl9pZCI6IjE1NDczOTY0ODAwNzkyMzcxMjEiLCJwaG9uZSI6IjE1ODcxOTI2MDczIiwibmlja19uYW1lIjoi6LaF57qn566h55CG5ZGYIiwiZGVwdF9pZCI6IjExNDIxMTAwNzQ0NjIxMDEwOCIsImFjY291bnQiOiJhZG1pbiIsImRlcHRDb2RlIjoiMTE0MjExMDA3NDQ2MjEwMTA4In0.5r85ctPgWxNarVdF9kwTNoub7IqQM6RxTHYIU-ajxio'
  8. }
  9. })
  10. const httpExtra = axiosExtra.create({
  11. maxConcurrent: 5, //并发为1
  12. queueOptions: {
  13. retry: 3, //请求失败时,最多会重试3次
  14. retryIsJump: false //是否立即重试, 否则将在请求队列尾部插入重试请求
  15. }
  16. })
  17. http.interceptors.response.use(response => {
  18. return response.data
  19. })
  20. /**
  21. * 根据文件的md5获取未上传完的任务
  22. * @param identifier 文件md5
  23. * @returns {Promise<AxiosResponse<any>>}
  24. */
  25. const taskInfo = (identifier) => {
  26. return http.get(`/parallel-upload/${identifier}`)
  27. }
  28. /**
  29. * 初始化一个分片上传任务
  30. * @param identifier 文件md5
  31. * @param fileName 文件名称
  32. * @param totalSize 文件大小
  33. * @param chunkSize 分块大小
  34. * @returns {Promise<AxiosResponse<any>>}
  35. */
  36. const initTask = ({identifier, fileName, totalSize, chunkSize}) => {
  37. return http.post('/parallel-upload/init-task', {identifier, fileName, totalSize, chunkSize})
  38. }
  39. /**
  40. * 获取预签名分片上传地址
  41. * @param identifier 文件md5
  42. * @param partNumber 分片编号
  43. * @returns {Promise<AxiosResponse<any>>}
  44. */
  45. const preSignUrl = ({identifier, partNumber}) => {
  46. return http.get(`/parallel-upload/${identifier}/${partNumber}`)
  47. }
  48. /**
  49. * 合并分片
  50. * @param identifier
  51. * @returns {Promise<AxiosResponse<any>>}
  52. */
  53. const merge = (identifier) => {
  54. return http.post(`/parallel-upload/merge/${identifier}`)
  55. }
  56. export {
  57. taskInfo,
  58. initTask,
  59. preSignUrl,
  60. merge,
  61. httpExtra
  62. }

文件服务minio配置

  1. minio:
  2. endpoint: http://172.16.10.74:9000
  3. address: http://172.16.10.74
  4. port: 9000
  5. secure: false
  6. access-key: minioadmin
  7. secret-key: XXXXXXXXXX
  8. bucket-name: gpd
  9. internet-address: http://XXXXXXXXX:9000

MinioProperties.java

  1. package com.dsj.prod.file.biz.properties;
  2. import lombok.Data;
  3. import org.springframework.boot.context.properties.ConfigurationProperties;
  4. import org.springframework.cloud.context.config.annotation.RefreshScope;
  5. import org.springframework.stereotype.Component;
  6. import java.util.List;
  7. @Data
  8. @RefreshScope
  9. @Component
  10. @ConfigurationProperties(prefix = "dsj.minio")
  11. public class MinioProperties {
  12. /**
  13. * The constant endpoint.
  14. */
  15. public String endpoint;
  16. /**
  17. * The constant address.
  18. */
  19. public String address;
  20. /**
  21. * The constant port.
  22. */
  23. public String port;
  24. /**
  25. * The constant accessKey.
  26. */
  27. public String accessKey;
  28. /**
  29. * The constant secretKey.
  30. */
  31. public String secretKey;
  32. /**
  33. * The constant bucketName.
  34. */
  35. public String bucketName;
  36. /**
  37. * The constant internetAddress.
  38. */
  39. public String internetAddress;
  40. /**
  41. * The Limit file extension.
  42. * doc docx xls xlsx 图片() pdf
  43. */
  44. public List<String> limitFileExtension;
  45. }

控制器 ParallelUploadController.java

  1. package com.dsj.prod.file.biz.controller;
  2. import com.dsj.plf.arch.tool.api.R;
  3. import com.dsj.prod.file.api.dto.parallelUpload.InitTaskParam;
  4. import com.dsj.prod.file.api.dto.parallelUpload.TaskInfoDTO;
  5. import com.dsj.prod.file.api.entity.ParallelUploadTask;
  6. import com.dsj.prod.file.biz.service.ParallelUploadTaskService;
  7. import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;
  8. import io.swagger.annotations.Api;
  9. import io.swagger.annotations.ApiOperation;
  10. import org.springframework.web.bind.annotation.*;
  11. import javax.annotation.Resource;
  12. import javax.validation.Valid;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. @Api(value = "文件分片上传接口", tags = "文件分片上传接口")
  16. @RestController
  17. @RequestMapping("/parallel-upload")
  18. public class ParallelUploadController {
  19. @Resource
  20. private ParallelUploadTaskService sysUploadTaskService;
  21. @ApiOperationSupport(order = 1)
  22. @ApiOperation(value = "获取上传进度", notes = "传入 identifier:文件md5")
  23. @GetMapping("/{identifier}")
  24. public R<TaskInfoDTO> taskInfo(@PathVariable("identifier") String identifier) {
  25. TaskInfoDTO result = sysUploadTaskService.getTaskInfo(identifier);
  26. return R.data(result);
  27. }
  28. /**
  29. * 创建一个上传任务
  30. *
  31. * @param param the param
  32. * @return result
  33. */
  34. @ApiOperationSupport(order = 2)
  35. @ApiOperation(value = "创建一个上传任务", notes = "传入 param")
  36. @PostMapping("/init-task")
  37. public R<TaskInfoDTO> initTask(@Valid @RequestBody InitTaskParam param) {
  38. return R.data(sysUploadTaskService.initTask(param));
  39. }
  40. @ApiOperationSupport(order = 3)
  41. @ApiOperation(value = "获取每个分片的预签名上传地址", notes = "传入 identifier文件md5,partNumber分片序号")
  42. @GetMapping("/{identifier}/{partNumber}")
  43. public R preSignUploadUrl(@PathVariable("identifier") String identifier, @PathVariable("partNumber") Integer partNumber) {
  44. ParallelUploadTask task = sysUploadTaskService.getByIdentifier(identifier);
  45. if (task == null) {
  46. return R.fail("分片任务不存在");
  47. }
  48. Map<String, String> params = new HashMap<>();
  49. params.put("partNumber", partNumber.toString());
  50. params.put("uploadId", task.getUploadId());
  51. return R.data(sysUploadTaskService.genPreSignUploadUrl(task.getBucketName(), task.getObjectKey(), params));
  52. }
  53. @ApiOperationSupport(order = 4)
  54. @ApiOperation(value = "合并分片", notes = "传入 identifier文件md5")
  55. @PostMapping("/merge/{identifier}")
  56. public R merge(@PathVariable("identifier") String identifier) {
  57. sysUploadTaskService.merge(identifier);
  58. return R.success("合并成功");
  59. }
  60. }

服务层 ParallelUploadTaskService.java

  1. package com.dsj.prod.file.biz.service;
  2. import com.baomidou.mybatisplus.extension.service.IService;
  3. import com.dsj.prod.file.api.dto.parallelUpload.InitTaskParam;
  4. import com.dsj.prod.file.api.dto.parallelUpload.TaskInfoDTO;
  5. import com.dsj.prod.file.api.entity.ParallelUploadTask;
  6. import java.util.Map;
  7. /**
  8. * 分片上传-分片任务记录(ParallelUploadTask)表服务接口
  9. *
  10. * @since 2022-08-22 17:47:30
  11. */
  12. public interface ParallelUploadTaskService extends IService<ParallelUploadTask> {
  13. /**
  14. * 根据md5标识获取分片上传任务
  15. * @param identifier
  16. * @return
  17. */
  18. ParallelUploadTask getByIdentifier (String identifier);
  19. /**
  20. * 初始化一个任务
  21. */
  22. TaskInfoDTO initTask (InitTaskParam param);
  23. /**
  24. * 获取文件地址
  25. * @param bucket
  26. * @param objectKey
  27. * @return
  28. */
  29. String getPath (String bucket, String objectKey);
  30. /**
  31. * 获取上传进度
  32. * @param identifier
  33. * @return
  34. */
  35. TaskInfoDTO getTaskInfo (String identifier);
  36. /**
  37. * 生成预签名上传url
  38. * @param bucket 桶名
  39. * @param objectKey 对象的key
  40. * @param params 额外的参数
  41. * @return
  42. */
  43. String genPreSignUploadUrl (String bucket, String objectKey, Map<String, String> params);
  44. /**
  45. * 合并分片
  46. * @param identifier
  47. */
  48. void merge (String identifier);
  49. }

实现层 ParallelUploadTaskServiceImpl 

  1. package com.dsj.prod.file.biz.service.impl;
  2. import cn.hutool.core.date.DateUtil;
  3. import cn.hutool.core.util.IdUtil;
  4. import cn.hutool.core.util.StrUtil;
  5. import com.amazonaws.HttpMethod;
  6. import com.amazonaws.services.s3.AmazonS3;
  7. import com.amazonaws.services.s3.model.*;
  8. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  9. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  10. import com.dsj.prod.file.api.constants.MinioConstant;
  11. import com.dsj.prod.file.api.dto.parallelUpload.InitTaskParam;
  12. import com.dsj.prod.file.api.dto.parallelUpload.TaskInfoDTO;
  13. import com.dsj.prod.file.api.dto.parallelUpload.TaskRecordDTO;
  14. import com.dsj.prod.file.api.entity.ParallelUploadTask;
  15. import com.dsj.prod.file.biz.mapper.ParallelUploadMapper;
  16. import com.dsj.prod.file.biz.properties.MinioProperties;
  17. import com.dsj.prod.file.biz.service.ParallelUploadTaskService;
  18. import lombok.extern.slf4j.Slf4j;
  19. import org.springframework.http.MediaType;
  20. import org.springframework.http.MediaTypeFactory;
  21. import org.springframework.stereotype.Service;
  22. import javax.annotation.Resource;
  23. import java.net.URL;
  24. import java.util.Date;
  25. import java.util.List;
  26. import java.util.Map;
  27. import java.util.stream.Collectors;
  28. /**
  29. * 分片上传-分片任务记录(ParallelUploadTask)表服务实现类
  30. *
  31. * @since 2022-08-22 17:47:31
  32. */
  33. @Slf4j
  34. @Service("sysUploadTaskService")
  35. public class ParallelUploadTaskServiceImpl extends ServiceImpl<ParallelUploadMapper, ParallelUploadTask> implements ParallelUploadTaskService {
  36. @Resource
  37. private AmazonS3 amazonS3;
  38. @Resource
  39. private MinioProperties minioProperties;
  40. @Resource
  41. private ParallelUploadMapper sysUploadTaskMapper;
  42. @Override
  43. public ParallelUploadTask getByIdentifier(String identifier) {
  44. return sysUploadTaskMapper.selectOne(new QueryWrapper<ParallelUploadTask>().lambda().eq(ParallelUploadTask::getFileIdentifier, identifier));
  45. }
  46. @Override
  47. public TaskInfoDTO initTask(InitTaskParam param) {
  48. Date currentDate = new Date();
  49. String bucketName = minioProperties.getBucketName();
  50. String fileName = param.getFileName();
  51. String suffix = fileName.substring(fileName.lastIndexOf(".") + 1);
  52. String key = StrUtil.format("{}/{}.{}", DateUtil.format(currentDate, "YYYY-MM-dd"), IdUtil.randomUUID(), suffix);
  53. String contentType = MediaTypeFactory.getMediaType(key).orElse(MediaType.APPLICATION_OCTET_STREAM).toString();
  54. ObjectMetadata objectMetadata = new ObjectMetadata();
  55. objectMetadata.setContentType(contentType);
  56. InitiateMultipartUploadResult initiateMultipartUploadResult = amazonS3.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucketName, key)
  57. .withObjectMetadata(objectMetadata));
  58. String uploadId = initiateMultipartUploadResult.getUploadId();
  59. ParallelUploadTask task = new ParallelUploadTask();
  60. int chunkNum = (int) Math.ceil(param.getTotalSize() * 1.0 / param.getChunkSize());
  61. task.setBucketName(minioProperties.getBucketName())
  62. .setChunkNum(chunkNum)
  63. .setChunkSize(param.getChunkSize())
  64. .setTotalSize(param.getTotalSize())
  65. .setFileIdentifier(param.getIdentifier())
  66. .setFileName(fileName)
  67. .setObjectKey(key)
  68. .setUploadId(uploadId);
  69. sysUploadTaskMapper.insert(task);
  70. return new TaskInfoDTO().setFinished(false).setTaskRecord(TaskRecordDTO.convertFromEntity(task)).setPath(getPath(bucketName, key));
  71. }
  72. @Override
  73. public String getPath(String bucket, String objectKey) {
  74. return StrUtil.format("{}/{}/{}", minioProperties.getEndpoint(), bucket, objectKey);
  75. }
  76. @Override
  77. public TaskInfoDTO getTaskInfo(String identifier) {
  78. ParallelUploadTask task = getByIdentifier(identifier);
  79. if (task == null) {
  80. return null;
  81. }
  82. TaskInfoDTO result = new TaskInfoDTO().setFinished(true).setTaskRecord(TaskRecordDTO.convertFromEntity(task)).setPath(getPath(task.getBucketName(), task.getObjectKey()));
  83. boolean doesObjectExist = amazonS3.doesObjectExist(task.getBucketName(), task.getObjectKey());
  84. if (!doesObjectExist) {
  85. // 未上传完,返回已上传的分片
  86. ListPartsRequest listPartsRequest = new ListPartsRequest(task.getBucketName(), task.getObjectKey(), task.getUploadId());
  87. PartListing partListing = amazonS3.listParts(listPartsRequest);
  88. result.setFinished(false).getTaskRecord().setExitPartList(partListing.getParts());
  89. }
  90. return result;
  91. }
  92. @Override
  93. public String genPreSignUploadUrl(String bucket, String objectKey, Map<String, String> params) {
  94. Date currentDate = new Date();
  95. Date expireDate = DateUtil.offsetMillisecond(currentDate, MinioConstant.PRE_SIGN_URL_EXPIRE.intValue());
  96. GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucket, objectKey)
  97. .withExpiration(expireDate).withMethod(HttpMethod.PUT);
  98. if (params != null) {
  99. params.forEach(request::addRequestParameter);
  100. }
  101. URL preSignedUrl = amazonS3.generatePresignedUrl(request);
  102. return preSignedUrl.toString();
  103. }
  104. @Override
  105. public void merge(String identifier) {
  106. ParallelUploadTask task = getByIdentifier(identifier);
  107. if (task == null) {
  108. log.error("分片任务不存在,任务id:{}", identifier);
  109. throw new RuntimeException("分片任务不存在");
  110. }
  111. log.info("开始合并分片,任务id:{}", task.getId());
  112. ListPartsRequest listPartsRequest = new ListPartsRequest(task.getBucketName(), task.getObjectKey(), task.getUploadId());
  113. PartListing partListing = amazonS3.listParts(listPartsRequest);
  114. List<PartSummary> parts = partListing.getParts();
  115. if (!task.getChunkNum().equals(parts.size())) {
  116. // 已上传分块数量与记录中的数量不对应,不能合并分块
  117. log.error("分片缺失,任务id:{},已上传分块数量:{},记录中的数量:{}", task.getId(), parts.size(), task.getChunkNum());
  118. throw new RuntimeException("分片缺失,请重新上传");
  119. }
  120. CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest()
  121. .withUploadId(task.getUploadId())
  122. .withKey(task.getObjectKey())
  123. .withBucketName(task.getBucketName())
  124. .withPartETags(parts.stream().map(partSummary -> new PartETag(partSummary.getPartNumber(), partSummary.getETag())).collect(Collectors.toList()));
  125. CompleteMultipartUploadResult result = amazonS3.completeMultipartUpload(completeMultipartUploadRequest);
  126. log.info("合并分片完成,返回结果:{}", result);
  127. }
  128. }

亚马逊S3工具类AmazonS3Config.java

  1. package com.dsj.prod.file.biz.config;
  2. import com.amazonaws.ClientConfiguration;
  3. import com.amazonaws.Protocol;
  4. import com.amazonaws.auth.AWSCredentials;
  5. import com.amazonaws.auth.AWSStaticCredentialsProvider;
  6. import com.amazonaws.auth.BasicAWSCredentials;
  7. import com.amazonaws.client.builder.AwsClientBuilder;
  8. import com.amazonaws.regions.Regions;
  9. import com.amazonaws.services.s3.AmazonS3;
  10. import com.amazonaws.services.s3.AmazonS3ClientBuilder;
  11. import com.dsj.prod.file.biz.properties.MinioProperties;
  12. import org.springframework.context.annotation.Bean;
  13. import org.springframework.context.annotation.Configuration;
  14. import javax.annotation.Resource;
  15. @Configuration
  16. public class AmazonS3Config {
  17. @Resource
  18. private MinioProperties minioProperties;
  19. @Bean(name = "amazonS3Client")
  20. public AmazonS3 amazonS3Client () {
  21. ClientConfiguration config = new ClientConfiguration();
  22. config.setProtocol(Protocol.HTTP);
  23. config.setConnectionTimeout(60000);
  24. config.setUseExpectContinue(true);
  25. AWSCredentials credentials = new BasicAWSCredentials(minioProperties.getAccessKey(), minioProperties.getSecretKey());
  26. AwsClientBuilder.EndpointConfiguration end_point = new AwsClientBuilder.EndpointConfiguration(minioProperties.getEndpoint(), Regions.CN_NORTH_1.name());
  27. AmazonS3 amazonS3 = AmazonS3ClientBuilder.standard()
  28. .withClientConfiguration(config)
  29. .withCredentials(new AWSStaticCredentialsProvider(credentials))
  30. .withEndpointConfiguration(end_point)
  31. .withPathStyleAccessEnabled(true).build();
  32. return amazonS3;
  33. }
  34. }

整体效果预览

大致流程。

1、初始化一个任务,获取预期链接。(会通过md5校验,来判断OSS中是否有分片或整个文件。存在断点分片则从最近分片开始上传,存在整个文件则直接返回链接。)

2、文件切片上传

3、合并分片

4、完成并提示合并成功

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/66616
推荐阅读
相关标签
  

闽ICP备14008679号