使用 Java SDK 在 AWS S3 上动态构建 zip 存档?

问题描述 投票:0回答:1

我目前在 S3 上有数千个对象。使用 Spring 服务,目标是将任意对象键列表作为输入,并将它们编译成同一存储桶中单独文件夹中的 zip 存档,以供以后下载。

无法在本地构建 zip,因为 zip 的最终大小可能达到数十 GB。我已通读 AWS Java SDK 文档,但没有看到动态构建 zip 的方法。所有的放置/上传功能都需要设定的内容长度(我没有)。

我本质上需要 S3 上的 zip“占位符”,我可以流式传输到该文件,而无需将所需的 S3 对象保留在内存中。我怎样才能做到这一点?

java spring amazon-s3
1个回答
0
投票

我们有类似的要求来合并GB的客户报告,并且有多种选择。

S3 使用分段上传提供了一个出色的解决方案。

https://docs.aws.amazon.com/AmazonS3/latest/userguide/CopyingObjectsMPUapi.html

https://aws.amazon.com/blogs/developer/efficient-amazon-s3-object-concatenation-using-the-aws-sdk-for-ruby/

我们还尝试编写 lambda 来合并文件夹中的 S3 文件。我们的代码可以在 75 秒内合并 32 个 Gzipped CSV 文件,总大小为 9 GB。

9GB gzip 对应 28 GB 解压后的 CSV

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.*;

/**
 * Handler for requests to Lambda function.
 */
public class App implements RequestHandler<Map<String, String>, String> {

    /**
 * code.
 *
 * @param event a.
 * @param context a.
 * @return a.
 */
 public String handleRequest(final Map<String, String> event, final Context context) {
        final Regions clientRegion = Regions.US_WEST_2;
        final String key = "";
        final String destBucketName = "";
        final String destObjectKey = "merged/merge.csv.gz";

        try {
            final AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
                    .withCredentials(new DefaultAWSCredentialsProviderChain())
                    .withRegion(clientRegion)
                    .build();

            // Initiate the multipart upload.
 final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(destBucketName, destObjectKey);
            final InitiateMultipartUploadResult initResult = s3Client.initiateMultipartUpload(initRequest);
            final List<CopyPartResult> copyResponses = Collections.synchronizedList(new ArrayList<>());
            final ListObjectsV2Result result = s3Client.listObjectsV2(new ListObjectsV2Request()
                    .withBucketName(key)
                    .withPrefix("resultparallelon/"));

            final List<String> s3Files = result.getObjectSummaries().stream()
                    .map(x -> x.getKey())
                    .collect(Collectors.toList());

            result.getObjectSummaries().stream()
                    .forEach(x -> System.out.println(x.getKey() + " " + x.getSize()));

            Map<String, Long> collect = result.getObjectSummaries().stream()
                    .collect(Collectors.toMap(S3ObjectSummary::getKey, S3ObjectSummary::getSize));

            final ExecutorService executorService = Executors.newFixedThreadPool(2 + 2);
            for (int i = 0; i < s3Files.size();) {
                if (collect.get(s3Files.get(i)) < 6000000) {
                    i++;
                    System.out.println(" bad copy complete.");
                    continue;
                }
                final int s = i;
                i++;
                executorService.execute(() -> {
                    // Copy this part.
 final CopyPartRequest copyRequest = new CopyPartRequest()
                            .withSourceBucketName(key)
                            .withSourceKey(s3Files.get(s))
                            .withDestinationBucketName(destBucketName)
                            .withDestinationKey(destObjectKey)
                            .withUploadId(initResult.getUploadId())
                            .withPartNumber(s + 1);
                    copyResponses.add(s3Client.copyPart(copyRequest));
                    System.out.println(" copy complete." + s3Files.get(s));
                });
            }

            try {
                Thread.sleep(1000);
                executorService.shutdown();
                executorService.awaitTermination(600, TimeUnit.SECONDS);
            } catch (final Exception e) {
                e.printStackTrace();
            }

            // Complete the upload request to concatenate all uploaded parts and make the copied object available.
 final CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
                    destBucketName,
                    destObjectKey,
                    initResult.getUploadId(),
                    getETags(copyResponses));
            s3Client.completeMultipartUpload(completeRequest);
            System.out.println("Multipart copy complete.");
        } catch (final AmazonServiceException e) {
            // The call was transmitted successfully, but Amazon S3 couldn't process
 // it, so it returned an error response.
 e.printStackTrace();
        } catch (final SdkClientException e) {
            // Amazon S3 couldn't be contacted for a response, or the client
 // couldn't parse the response from Amazon S3.
 e.printStackTrace();
        }

        return "";
    }

    // This is a helper function to construct a list of ETags.
 private static List<PartETag> getETags(final List<CopyPartResult> responses) {
        final List<PartETag> etags = new ArrayList<PartETag>();
        for (CopyPartResult response : responses) {
            etags.add(new PartETag(response.getPartNumber(), response.getETag()));
        }
        return etags;
    }
}

gzip 的好处是我们可以连接多个 gzip 文件来创建一个文件。

© www.soinside.com 2019 - 2024. All rights reserved.