我目前在 S3 上有数千个对象。使用 Spring 服务,目标是将任意对象键列表作为输入,并将它们编译成同一存储桶中单独文件夹中的 zip 存档,以供以后下载。
无法在本地构建 zip,因为 zip 的最终大小可能达到数十 GB。我已通读 AWS Java SDK 文档,但没有看到动态构建 zip 的方法。所有的放置/上传功能都需要设定的内容长度(我没有)。
我本质上需要 S3 上的 zip“占位符”,我可以流式传输到该文件,而无需将所需的 S3 对象保留在内存中。我怎样才能做到这一点?
我们有类似的要求来合并GB的客户报告,并且有多种选择。
S3 使用分段上传提供了一个出色的解决方案。
https://docs.aws.amazon.com/AmazonS3/latest/userguide/CopyingObjectsMPUapi.html
https://aws.amazon.com/blogs/developer/efficient-amazon-s3-object-concatenation-using-the-aws-sdk-for-ruby/
我们还尝试编写 lambda 来合并文件夹中的 S3 文件。我们的代码可以在 75 秒内合并 32 个 Gzipped CSV 文件,总大小为 9 GB。
9GB gzip 对应 28 GB 解压后的 CSV
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.*;
/**
* Handler for requests to Lambda function.
*/
public class App implements RequestHandler<Map<String, String>, String> {
/**
* code.
*
* @param event a.
* @param context a.
* @return a.
*/
public String handleRequest(final Map<String, String> event, final Context context) {
final Regions clientRegion = Regions.US_WEST_2;
final String key = "";
final String destBucketName = "";
final String destObjectKey = "merged/merge.csv.gz";
try {
final AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain())
.withRegion(clientRegion)
.build();
// Initiate the multipart upload.
final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(destBucketName, destObjectKey);
final InitiateMultipartUploadResult initResult = s3Client.initiateMultipartUpload(initRequest);
final List<CopyPartResult> copyResponses = Collections.synchronizedList(new ArrayList<>());
final ListObjectsV2Result result = s3Client.listObjectsV2(new ListObjectsV2Request()
.withBucketName(key)
.withPrefix("resultparallelon/"));
final List<String> s3Files = result.getObjectSummaries().stream()
.map(x -> x.getKey())
.collect(Collectors.toList());
result.getObjectSummaries().stream()
.forEach(x -> System.out.println(x.getKey() + " " + x.getSize()));
Map<String, Long> collect = result.getObjectSummaries().stream()
.collect(Collectors.toMap(S3ObjectSummary::getKey, S3ObjectSummary::getSize));
final ExecutorService executorService = Executors.newFixedThreadPool(2 + 2);
for (int i = 0; i < s3Files.size();) {
if (collect.get(s3Files.get(i)) < 6000000) {
i++;
System.out.println(" bad copy complete.");
continue;
}
final int s = i;
i++;
executorService.execute(() -> {
// Copy this part.
final CopyPartRequest copyRequest = new CopyPartRequest()
.withSourceBucketName(key)
.withSourceKey(s3Files.get(s))
.withDestinationBucketName(destBucketName)
.withDestinationKey(destObjectKey)
.withUploadId(initResult.getUploadId())
.withPartNumber(s + 1);
copyResponses.add(s3Client.copyPart(copyRequest));
System.out.println(" copy complete." + s3Files.get(s));
});
}
try {
Thread.sleep(1000);
executorService.shutdown();
executorService.awaitTermination(600, TimeUnit.SECONDS);
} catch (final Exception e) {
e.printStackTrace();
}
// Complete the upload request to concatenate all uploaded parts and make the copied object available.
final CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
destBucketName,
destObjectKey,
initResult.getUploadId(),
getETags(copyResponses));
s3Client.completeMultipartUpload(completeRequest);
System.out.println("Multipart copy complete.");
} catch (final AmazonServiceException e) {
// The call was transmitted successfully, but Amazon S3 couldn't process
// it, so it returned an error response.
e.printStackTrace();
} catch (final SdkClientException e) {
// Amazon S3 couldn't be contacted for a response, or the client
// couldn't parse the response from Amazon S3.
e.printStackTrace();
}
return "";
}
// This is a helper function to construct a list of ETags.
private static List<PartETag> getETags(final List<CopyPartResult> responses) {
final List<PartETag> etags = new ArrayList<PartETag>();
for (CopyPartResult response : responses) {
etags.add(new PartETag(response.getPartNumber(), response.getETag()));
}
return etags;
}
}
gzip 的好处是我们可以连接多个 gzip 文件来创建一个文件。