当前位置:   article > 正文

SpringBoot 整合 MinIO 实现视频的分片上传/断点续传(亲测可行)

springboot实现视频上传、视频播放功能

点击上方“Java基基”,选择“设为星标”

做积极的人,而不是积极废人!

每天 14:00 更新文章,每天掉亿点点头发...

源码精品专栏

 

来源:blog.csdn.net/weixin_44153131/

article/details/129249169

7930278d814708e719cba3bafa5426d5.jpeg


1、前言

之前做了一个慕课网上的仿短视频开发,里面有很多比较粗糙的实现,比如视频上传部分是直接由前端上传云服务,没考虑到客户的网络环境质量等问题,如果一个视频快上传完了,但是网断了没有上传完成需要客户重新上传,这对于用户体验是极差的。

那么我们对于视频文件的上传可以采取断点续传,上传过程中,如果出现网络异常或程序崩溃导致文件上传失败时,将从断点记录处继续上传未上传完成的部分,断点续传依赖于MD5和分片上传,对于本demo分片上传的流程如图

24eb185ed049266d39f81541aefa64d7.jpeg

通过文件唯一标识MD5,在数据库中查询此前是否创建过该SysUploadTask,如果存在,直接返回TaskInfo;如果不存在,通过amazonS3获取到UploadId并新建一个SysUploadTask返回。

前端将文件分好片后,通过服务器得到每一片的一个预地址,然后由前端直接向minio服务器发起真正的上传请求,避免上传时占用应用服务器的带宽,影响系统稳定。最后再向后端服务器发起合并请求。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro

  • 视频教程:https://doc.iocoder.cn/video/

2、数据库结构

293124c964961200bbd1de5e1a1d2772.png

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud

  • 视频教程:https://doc.iocoder.cn/video/

3、后端实现

3.1、根据MD5获取是否存在相同文件

Controller层

  1. /**
  2.  * 查询是否上传过,若存在,返回TaskInfoDTO
  3.  * @param identifier 文件md5
  4.  * @return
  5.  */
  6. @GetMapping("/{identifier}")
  7. public GraceJSONResult taskInfo (@PathVariable("identifier") String identifier) {
  8.     return GraceJSONResult.ok(sysUploadTaskService.getTaskInfo(identifier));
  9. }

Service层

  1. /**
  2.  * 查询是否上传过,若存在,返回TaskInfoDTO
  3.  * @param identifier
  4.  * @return
  5.  */
  6. public TaskInfoDTO getTaskInfo(String identifier) {
  7.     SysUploadTask task = getByIdentifier(identifier);
  8.     if (task == null) {
  9.         return null;
  10.     }
  11.     TaskInfoDTO result = new TaskInfoDTO().setFinished(true).setTaskRecord(TaskRecordDTO.convertFromEntity(task)).setPath(getPath(task.getBucketName(), task.getObjectKey()));
  12.     boolean doesObjectExist = amazonS3.doesObjectExist(task.getBucketName(), task.getObjectKey());
  13.     if (!doesObjectExist) {
  14.         // 未上传完,返回已上传的分片
  15.         ListPartsRequest listPartsRequest = new ListPartsRequest(task.getBucketName(), task.getObjectKey(), task.getUploadId());
  16.         PartListing partListing = amazonS3.listParts(listPartsRequest);
  17.         result.setFinished(false).getTaskRecord().setExitPartList(partListing.getParts());
  18.     }
  19.     return result;
  20. }
3.2、初始化一个上传任务

Controller层

  1. /**
  2.  * 创建一个上传任务
  3.  * @return
  4.  */
  5. @PostMapping
  6. public GraceJSONResult initTask (@Valid @RequestBody InitTaskParam param) {
  7.     return GraceJSONResult.ok(sysUploadTaskService.initTask(param));
  8. }

Service层

  1. /**
  2.  * 初始化一个任务
  3.  */
  4. public TaskInfoDTO initTask(InitTaskParam param) {
  5.     Date currentDate = new Date();
  6.     String bucketName = minioProperties.getBucketName();
  7.     String fileName = param.getFileName();
  8.     String suffix = fileName.substring(fileName.lastIndexOf(".")+1, fileName.length());
  9.     String key = StrUtil.format("{}/{}.{}", DateUtil.format(currentDate, "YYYY-MM-dd"), IdUtil.randomUUID(), suffix);
  10.     String contentType = MediaTypeFactory.getMediaType(key).orElse(MediaType.APPLICATION_OCTET_STREAM).toString();
  11.     ObjectMetadata objectMetadata = new ObjectMetadata();
  12.     objectMetadata.setContentType(contentType);
  13.     InitiateMultipartUploadResult initiateMultipartUploadResult = amazonS3
  14.             .initiateMultipartUpload(new InitiateMultipartUploadRequest(bucketName, key).withObjectMetadata(objectMetadata));
  15.     String uploadId = initiateMultipartUploadResult.getUploadId();
  16.     SysUploadTask task = new SysUploadTask();
  17.     int chunkNum = (int) Math.ceil(param.getTotalSize() * 1.0 / param.getChunkSize());
  18.     task.setBucketName(minioProperties.getBucketName())
  19.             .setChunkNum(chunkNum)
  20.             .setChunkSize(param.getChunkSize())
  21.             .setTotalSize(param.getTotalSize())
  22.             .setFileIdentifier(param.getIdentifier())
  23.             .setFileName(fileName)
  24.             .setObjectKey(key)
  25.             .setUploadId(uploadId);
  26.     sysUploadTaskMapper.insert(task);
  27.     return new TaskInfoDTO().setFinished(false).setTaskRecord(TaskRecordDTO.convertFromEntity(task)).setPath(getPath(bucketName, key));
  28. }
3.3、获取每个分片的预签名上传地址

Controller层

  1. /**
  2.  * 获取每个分片的预签名上传地址
  3.  * @param identifier
  4.  * @param partNumber
  5.  * @return
  6.  */
  7. @GetMapping("/{identifier}/{partNumber}")
  8. public GraceJSONResult preSignUploadUrl (@PathVariable("identifier") String identifier, @PathVariable("partNumber") Integer partNumber) {
  9.     SysUploadTask task = sysUploadTaskService.getByIdentifier(identifier);
  10.     if (task == null) {
  11.         return GraceJSONResult.error("分片任务不存在");
  12.     }
  13.     Map<String, String> params = new HashMap<>();
  14.     params.put("partNumber", partNumber.toString());
  15.     params.put("uploadId", task.getUploadId());
  16.     return GraceJSONResult.ok(sysUploadTaskService.genPreSignUploadUrl(task.getBucketName(), task.getObjectKey(), params));
  17. }

Service层

  1. /**
  2.  * 生成预签名上传url
  3.  * @param bucket 桶名
  4.  * @param objectKey 对象的key
  5.  * @param params 额外的参数
  6.  * @return
  7.  */
  8. public String genPreSignUploadUrl(String bucket, String objectKey, Map<String, String> params) {
  9.     Date currentDate = new Date();
  10.     Date expireDate = DateUtil.offsetMillisecond(currentDate, PRE_SIGN_URL_EXPIRE.intValue());
  11.     GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucket, objectKey)
  12.             .withExpiration(expireDate).withMethod(HttpMethod.PUT);
  13.     if (params != null) {
  14.         params.forEach((key, val) -> request.addRequestParameter(key, val));
  15.     }
  16.     URL preSignedUrl = amazonS3.generatePresignedUrl(request);
  17.     return preSignedUrl.toString();
  18. }
3.4、合并分片

Controller层

  1. /**
  2.  * 合并分片
  3.  * @param identifier
  4.  * @return
  5.  */
  6. @PostMapping("/merge/{identifier}")
  7. public GraceJSONResult merge (@PathVariable("identifier") String identifier) {
  8.     sysUploadTaskService.merge(identifier);
  9.     return GraceJSONResult.ok();
  10. }

Service层

  1. /**
  2.  * 合并分片
  3.  * @param identifier
  4.  */
  5. public void merge(String identifier) {
  6.     SysUploadTask task = getByIdentifier(identifier);
  7.     if (task == null) {
  8.         throw new RuntimeException("分片任务不存");
  9.     }
  10.     ListPartsRequest listPartsRequest = new ListPartsRequest(task.getBucketName(), task.getObjectKey(), task.getUploadId());
  11.     PartListing partListing = amazonS3.listParts(listPartsRequest);
  12.     List<PartSummary> parts = partListing.getParts();
  13.     if (!task.getChunkNum().equals(parts.size())) {
  14.         // 已上传分块数量与记录中的数量不对应,不能合并分块
  15.         throw new RuntimeException("分片缺失,请重新上传");
  16.     }
  17.     CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest()
  18.             .withUploadId(task.getUploadId())
  19.             .withKey(task.getObjectKey())
  20.             .withBucketName(task.getBucketName())
  21.             .withPartETags(parts.stream().map(partSummary -> new PartETag(partSummary.getPartNumber(), partSummary.getETag())).collect(Collectors.toList()));
  22.     CompleteMultipartUploadResult result = amazonS3.completeMultipartUpload(completeMultipartUploadRequest);
  23. }

4、分片文件清理问题

视频上传一半不上传了,怎么清理碎片分片。

可以考虑在sys_upload_task表中新加一个status字段,表示是否合并分片,默认为false,merge请求结束后变更为true,通过一个定时任务定期清理为status为false的记录。另外MinIO自身对于临时上传的分片,会实施定时清理。

Demo地址
  • https://github.com/robinsyn/MinIO_Demo



欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

e0e6fa382a1470e57fd5cfa59136cb3b.png

已在知识星球更新源码解析如下:

31b97f09f4d5ea2e900392ded2cbcc71.jpeg

0a99830a659d2c3d89436e30dc340c6b.jpeg

020f3a88b3ebae28bbc86938267b12ae.jpeg

0531c8e19b8923bc2054f6bd48c0d71d.jpeg

最近更新《芋道 SpringBoot 2.X 入门》系列,已经 101 余篇,覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。

提供近 3W 行代码的 SpringBoot 示例,以及超 6W 行代码的电商微服务项目。

获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

  1. 文章有帮助的话,在看,转发吧。
  2. 谢谢支持哟 (*^__^*)
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/267847
推荐阅读
相关标签
  

闽ICP备14008679号