我试图从S3中读取大块文件而不切断任何并行处理的行。
让我举例说明:S3上有1G大小的文件。我想把这个文件分成64 MB的chucks。很容易我可以这样做:
S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));
InputStream stream = s3object.getObjectContent();
byte[] content = new byte[64*1024*1024];
while (stream.read(content) != -1) {
//process content here
}
但是块的问题是它可能有100个完整的行和1个不完整的行。但我不能处理不完整的行,也不想丢弃它。
有办法处理这种情况吗?意味着所有夹头都没有偏线。
aws-java-sdk已经为S3对象提供了流功能。你必须调用“getObject”,结果将是一个InputStream。
1)AmazonS3Client.getObject(GetObjectRequest getObjectRequest) - > S3Object
2)S3Object.getObjectContent()
注意:该方法是一个简单的getter,实际上并不创建流。如果检索S3Object,则应尽快关闭此输入流,因为对象内容不会缓存在内存中,而是直接从Amazon S3流式传输。此外,无法关闭此流可能会导致请求池被阻止。
我通常的方法(InputStream
- > BufferedReader.lines()
- >批次的线 - > CompletableFuture
)在这里不起作用,因为底层的S3ObjectInputStream
最终超时的大文件。
所以我创建了一个新类S3InputStream
,它不关心它的开放时间和使用短期AWS SDK调用按需读取字节块。你提供了一个可以重复使用的byte[]
。 new byte[1 << 24]
(16Mb)似乎运作良好。
package org.harrison;
import java.io.IOException;
import java.io.InputStream;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
/**
* An {@link InputStream} for S3 files that does not care how big the file is.
*
* @author stephen harrison
*/
public class S3InputStream extends InputStream {
private static class LazyHolder {
private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
}
private final String bucket;
private final String file;
private final byte[] buffer;
private long lastByteOffset;
private long offset = 0;
private int next = 0;
private int length = 0;
public S3InputStream(final String bucket, final String file, final byte[] buffer) {
this.bucket = bucket;
this.file = file;
this.buffer = buffer;
this.lastByteOffset = LazyHolder.S3.getObjectMetadata(bucket, file).getContentLength() - 1;
}
@Override
public int read() throws IOException {
if (next >= length) {
fill();
if (length <= 0) {
return -1;
}
next = 0;
}
if (next >= length) {
return -1;
}
return buffer[this.next++];
}
public void fill() throws IOException {
if (offset >= lastByteOffset) {
length = -1;
} else {
try (final InputStream inputStream = s3Object()) {
length = 0;
int b;
while ((b = inputStream.read()) != -1) {
buffer[length++] = (byte) b;
}
if (length > 0) {
offset += length;
}
}
}
}
private InputStream s3Object() {
final GetObjectRequest request = new GetObjectRequest(bucket, file).withRange(offset,
offset + buffer.length - 1);
return LazyHolder.S3.getObject(request).getObjectContent();
}
}
100条完整线和1条不完整线
你的意思是你需要逐行读取流吗?如果是这样,而不是使用一个InputStream尝试使用BufferedReader读取s3对象流,以便您可以逐行读取流,但我认为这将比块大一点慢。
S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));
BufferedReader in = new BufferedReader(new InputStreamReader(s3object.getObjectContent()));
String line;
while ((line = in.readLine()) != null) {
//process line here
}
您可以通过检查令牌来读取存储桶中的所有文件。你可以用其他java库读取文件..即Pdf。
import java.io.IOException;
import java.io.InputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import javax.swing.JTextArea;
import java.io.FileWriter;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.text.PDFTextStripper;
import org.apache.pdfbox.text.PDFTextStripperByArea;
import org.joda.time.DateTime;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import java.io.File;
//..
// in your main class
private static AWSCredentials credentials = null;
private static AmazonS3 amazonS3Client = null;
public static void intializeAmazonObjects() {
credentials = new BasicAWSCredentials(ACCESS_KEY, SECRET_ACCESS_KEY);
amazonS3Client = new AmazonS3Client(credentials);
}
public void mainMethod() throws IOException, AmazonS3Exception{
// connect to aws
intializeAmazonObjects();
ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(bucketName);
ListObjectsV2Result listObjectsResult;
do {
listObjectsResult = amazonS3Client.listObjectsV2(req);
int count = 0;
for (S3ObjectSummary objectSummary : listObjectsResult.getObjectSummaries()) {
System.out.printf(" - %s (size: %d)\n", objectSummary.getKey(), objectSummary.getSize());
// Date lastModifiedDate = objectSummary.getLastModified();
// String bucket = objectSummary.getBucketName();
String key = objectSummary.getKey();
String newKey = "";
String newBucket = "";
String resultText = "";
// only try to read pdf files
if (!key.contains(".pdf")) {
continue;
}
// Read the source file as text
String pdfFileInText = readAwsFile(objectSummary.getBucketName(), objectSummary.getKey());
if (pdfFileInText.isEmpty())
continue;
}//end of current bulk
// If there are more than maxKeys(in this case 999 default) keys in the bucket,
// get a continuation token
// and list the next objects.
String token = listObjectsResult.getNextContinuationToken();
System.out.println("Next Continuation Token: " + token);
req.setContinuationToken(token);
} while (listObjectsResult.isTruncated());
}
public String readAwsFile(String bucketName, String keyName) {
S3Object object;
String pdfFileInText = "";
try {
// AmazonS3 s3client = getAmazonS3ClientObject();
object = amazonS3Client.getObject(new GetObjectRequest(bucketName, keyName));
InputStream objectData = object.getObjectContent();
PDDocument document = PDDocument.load(objectData);
document.getClass();
if (!document.isEncrypted()) {
PDFTextStripperByArea stripper = new PDFTextStripperByArea();
stripper.setSortByPosition(true);
PDFTextStripper tStripper = new PDFTextStripper();
pdfFileInText = tStripper.getText(document);
}
} catch (Exception e) {
e.printStackTrace();
}
return pdfFileInText;
}