我在尝试从 S3 Athena 读取超过 40 GB 的 CSV 数据时遇到问题。
2024-06-25 18:16:25.447 警告 509917 --- [pool-6-thread-1] c.a.s.s.internal.S3AbortableInputStream :并非从 S3ObjectInputStream 读取所有字节,中止 HTTP 连接。这可能是一个错误,并可能导致次优行为。通过范围 GET 仅请求您需要的字节,或在使用后耗尽输入流。
private static int saveResultFile(S3Object s3Object, String outputFile, List<ColumnInfo> columnInfoList) {
System.out.println("This is the save file method.");
System.out.println(outputFile + "this is output");
String lastTwoDigits = outputFile.substring(outputFile.length() - 2);
boolean hasData = false; // Initialize to false
int totalSum = 0; // Initialize totalSum before the loop
// Set the maximum records per page
int maxRecordsPerPage = 1000000;
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(s3Object.getObjectContent(), StandardCharsets.UTF_8));
ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream(outputFile + ".zip"))) {
CSVReader csvReader = new CSVReader(reader);
String[] header = csvReader.readNext();
if (header == null) {
// No header, return false
System.out.println("No header found in the CSV file.");
return NO_HEADERS_FOUND;
}
int recordsProcessed = 0;
int pageNumber = 1;
CSVWriter writer = null;
try {
// Create a ZipOutputStream to write to the zip file
writer = new CSVWriter(new FileWriter(outputFile + "_" + pageNumber + ".csv"), ',',
CSVWriter.DEFAULT_QUOTE_CHARACTER);
// Write the header for the current page
writer.writeNext(header);
String[] line;
while ((line = csvReader.readNext()) != null) {
hasData = true; // Set to true if any data is processed
if (lastTwoDigits.equals("02")) {
// Calculate sum for sms_split column
if (line.length > 8) { // Assuming index 7 is the 8th column
try {
totalSum += Integer.parseInt(line[8]); // Assuming sms_split is an integer
} catch (NumberFormatException e) {
// Handle parsing error if necessary
e.printStackTrace();
}
}
}
int lastColumnIndex = line.length - 1;
String lastColumnName = header[lastColumnIndex];
if (isTextColumn(lastColumnName)) {
// Perform encryption and decryption operations only if it's a text column
String encryptedValue = line[lastColumnIndex];
String decryptedText = decrypt(encryptedValue);
if (isHexString(decryptedText)) {
decryptedText = hexToAscii(decryptedText);
}
line[lastColumnIndex] = decryptedText;
}
if (recordsProcessed >= maxRecordsPerPage) {
// Close the current writer and create a new one for the next page
writer.close();
pageNumber++;
recordsProcessed = 0;
writer = new CSVWriter(new FileWriter(outputFile + "_" + pageNumber + ".csv"), ',',
CSVWriter.DEFAULT_QUOTE_CHARACTER);
// Write the header for the new page
writer.writeNext(header);
}
writer.writeNext(line);
recordsProcessed++;
}
} catch (IOException e) {
e.printStackTrace();
System.out.println("Error creating ZIP file: " + outputFile + ".zip");
} finally {
if (writer != null) {
writer.close();
}
}
if (hasData) {
for (int i = 1; i <= pageNumber; i++) {
String pageFileName = outputFile + "_" + i + ".csv";
try (FileInputStream fis = new FileInputStream(new File(pageFileName))) {
String zipEntryName = pageFileName.substring(pageFileName.lastIndexOf("/") + 1);
ZipEntry zipEntry = new ZipEntry(zipEntryName);
zipOutputStream.putNextEntry(zipEntry);
byte[] bytes = new byte[1024];
int length;
while ((length = fis.read(bytes)) >= 0) {
zipOutputStream.write(bytes, 0, length);
}
zipOutputStream.closeEntry();
if (new File(pageFileName).delete()) {
// System.out.println("Deleted file: " + pageFileName);
} else {
System.out.println("Failed to delete file: " + pageFileName);
}
} catch (IOException e) {
e.printStackTrace();
System.out.println("Error processing file: " + pageFileName);
}
}
System.out.println("Result files saved successfully with decryption.");
System.out.println("Total sum of sms_split column: " + totalSum);
} else {
System.out.println("No data found to save.");
return NO_HEADERS_FOUND;
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (s3Object != null) {
s3Object.close();
}``your text``
} catch (IOException e) {
e.printStackTrace();
}
}
return totalSum;
}
您可以这样进行批处理:
public void readPartialObject(AmazonS3 s3Client, String bucketName, String objectKey) {
long objectSize = s3Client.getObjectMetadata(bucketName, objectKey).getContentLength();
long chunkSize = 20 * 1024 * 1024; // 20 MB
long byteRangeStart = 0;
while (byteRangeStart < objectSize) {
long byteRangeEnd = Math.min(byteRangeStart + chunkSize - 1, objectSize - 1);
GetObjectRequest rangeObjectRequest = new GetObjectRequest(bucketName, objectKey)
.withRange(byteRangeStart, byteRangeEnd);
S3Object objectPortion = s3Client.getObject(rangeObjectRequest);
// Process the chunk (objectPortion.getObjectContent())
byteRangeStart += chunkSize;
}
}
您需要调整方法中的代码。