当前位置:   article > 正文

Amazon S3上传和分段上传_flutter 上传大文件到亚马逊 s3 ,如何进行分片

flutter 上传大文件到亚马逊 s3 ,如何进行分片

一、简介

Amazon Simple Storage Service (Amazon S3) 是一种对象存储服务,提供行业领先的可扩展性、数据可用性、安全性和性能。

Amazon S3 提供了一个简单 Web 服务接口,可用于随时在 Web 上的任何位置存储和检索任何数量的数据。此服务让所有开发人员都能访问同一个具备高扩展性、可靠性、安全性和快速价廉的数据存储基础设施。

本文主要介绍一次上传整个文件和将大文件分为几段上传的内容。

二、上传文件

当上传的文件不是太大时,可以采用一次上传整个文件的方式上传,即使失败了,重新开始上传也不会太浪费时间。

// MultipartFile file是接收自前端的文件
String bucketName = "bucketName";
String key = "key";
long size = file.getSize();
AmazonS3 s3 = AmazonS3ClientBuilder.defaultClient();
ObjectMetadata metadata = new ObjectMetadata();
// 必须设置ContentLength
metadata.setContentLength(size);
try {
    //metadata可以为空
    long time1 = System.currentTimeMillis();
    s3.putObject(bucketName, key, file.getInputStream(), metadata);
    long time2 = System.currentTimeMillis();
    log.info("上传耗时:" + (time2 - time1));
} catch (Exception e) {
    log.error("文件上传异常");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

主要看s3.putObject(bucketName, key, file.getInputStream(), metadata);这句。

其中,bucketName是桶的名称,key是文件在存储桶中显示的上传的文件路径以及文件名,metadata如果没什么特别的,可以为null。

三、分段上传

3.1、什么是分段上传

S3 的分段上传就像是我们见过的文件分块下载一样,或者说是一个个 Chunk。它可让我们上传大数据(或文件时),分成最小 5M 大小段往 S3 上传,待分段全部成功上传到 S3 后再执行一条指令通知 S3 合并文件。只有最后一个段是可以小于 5MB 的,其他段小于 5MB 上传会有异常的。

S3 不分段上传,单个文件最大 5GB, 而分段后,每个段的大小在 5MB 到 5GB 之间,可以有 10000 个分段数量,所以最大单个文件可以达到 5TB. 分段上传大数据(文件) 的好处是可以提高吞吐量与上传的可靠性,分段可以同时上传,单个分段上传失败只需重传该分段,而无需全部重传。

注意的就是,会采用分段上传的 Bucket 应该设置好它的生命周期,否则烂在上面的未成功合并的并段将得不到清理。

3.2、代码实现(多线程)

Java 代码进行数据的分段上传,主要分以下几步:

  1. 初始化,声明说要开始一个 Multipart Upload, 并获得一个批次 ID, 大概意思是下面应用这个 ID 的分段将会被合并
  2. 上传每一个分段,并指定分段号(从 1 开始), 当前分段大小,并把每次分段请求的 ETag 记录下来
  3. completeMultipartUpload(…) 方法完成分段上传

然后我在这基础上,使用了多线程来实现。

// MultipartFile file是接收自前端的文件
String bucketName = "bucketName";
String key = "key";
long size = file.getSize();
long minPartSize = 5 * 1024 * 1024;
// 得到总共的段数,和 分段后,每个段的开始上传的字节位置
List<Long> positions = new ArrayList<>();
long filePosition = 0;
while (filePosition < size) {
    positions.add(filePosition);
    filePosition += Math.min(minPartSize, (size - filePosition));
}
log.info("总大小:{},分为{}段", size, positions.size());
// 创建一个列表保存所有分传的 PartETag, 在分段完成后会用到
List<PartETag> partETags = new ArrayList<>();
// 第一步,初始化,声明下面将有一个 Multipart Upload
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, key);
InitiateMultipartUploadResult initResponse = s3.initiateMultipartUpload(initRequest);
log.info("开始上传");
long begin = System.currentTimeMillis();
try {
	// MultipartFile 转 File
    File toFile = multipartFileToFile(file);
    // 使用CountDownLatch来协调多个线程之间的同步
    final CountDownLatch latch = new CountDownLatch(positions.size());
    for (int i = 0; i < positions.size(); i++) {
        int finalI = i;
        threadTaskExecutor.execute(() -> {
            try {
                long time1 = System.currentTimeMillis();
                UploadPartRequest uploadRequest = new UploadPartRequest()
                        .withBucketName(bucketName)
                        .withKey(key)
                        .withUploadId(initResponse.getUploadId())
                        .withPartNumber(finalI + 1)
                        .withFileOffset(positions.get(finalI))
                        .withFile(toFile)
                        .withPartSize(Math.min(minPartSize, (size - positions.get(finalI))));
                // 第二步,上传分段,并把当前段的 PartETag 放到列表中
                partETags.add(s3.uploadPart(uploadRequest).getPartETag());
                long time2 = System.currentTimeMillis();
                log.info("第{}段上传耗时:{}", finalI + 1, (time2 - time1));
            } catch (Exception e) {
                log.error("Failed to upload, " + e.getMessage());
            } finally {
                latch.countDown();
            }
        });
    }
    latch.await();
    // 第三步,完成上传,合并分段
    CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName, key,
            initResponse.getUploadId(), partETags);
    s3.completeMultipartUpload(compRequest);
} catch (Exception e) {
    s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, key, initResponse.getUploadId()));
    log.error("Failed to upload, " + e.getMessage());
}
long end = System.currentTimeMillis();
log.info("总上传耗时:{}", (end - begin));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

第一步没什么说的,主要看for循环的内容。

循环上传,根据minPartSize与剩余没有上传的部分进行比较,得出这一分段需要上传的大小。将这一分段的信息设置到UploadPartRequest,设置的信息分别为bucketName(文件将要上载到的现有存储桶的名称)、key(在存储桶中显示上传的文件路径以及文件名)、uploadId(文件上传的id)、partNumber(这个分段是第几段)、fileOffset(从文件的哪个字节开始上传)、filepartSize(这个分段的大小),设置这些信息后,就可以开始上传分段了。

UploadPartRequest设置完毕后,使用s3的uploadPart方法将Request上传,得到UploadPartResult,这里要从result里面获取ETag信息出来,方便后续合并分段时使用。

最后所有的分段上传完毕,初始化CompleteMultipartUploadRequest对象,这里需要uploadId,以及每个分段上传的ETag,使用s3.completeMultipartUpload合并分段。

其中,多线程的配置和使用、CountDownLatch 的使用可以参考我之前写的博客:Spring Security中,多线程操作导致安全上下文丢失(附CountDownLatch的用法)

其中,multipartFileToFile是我在网上随便找的一个MultipartFile 转 File 的方法,代码如下:

/**
 * MultipartFile 转 File
 */
public static File multipartFileToFile(MultipartFile file) throws Exception {
    File toFile = null;
	if (file.equals("") || file.getSize() <= 0) {
        file = null;
    } else {
        InputStream ins = null;
        ins = file.getInputStream();
        toFile = new File(file.getOriginalFilename());
        inputStreamToFile(ins, toFile);
        ins.close();
    }
    return toFile;
}

/**
 * 获取流文件
 */
private static void inputStreamToFile(InputStream ins, File file) {
    try {
        OutputStream os = new FileOutputStream(file);
        int bytesRead = 0;
        byte[] buffer = new byte[8192];
        while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
            os.write(buffer, 0, bytesRead);
        }
        os.close();
        ins.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

3.3、总结

分段上传可以提升我们上传大文件的效率,它不是为了解决流式向 S3 写入数据而产生的。当然,若应用它的 Lambda 中,也确实可以缓解内存的紧张。它一定程度上是像流式写入,只是它的写入单位不是字节,或任意大小的字节数据,而是至少 5MB 的 Chunk.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/285773
推荐阅读
相关标签
  

闽ICP备14008679号