无法通过 Spring Boot 从 S3 存储桶下载超过 40GB 的数据

问题描述 投票:0回答:1

我在尝试从 S3 Athena 读取超过 40 GB 的 CSV 数据时遇到问题。

2024-06-25 18:16:25.447 警告 509917 --- [pool-6-thread-1] c.a.s.s.internal.S3AbortableInputStream :并非从 S3ObjectInputStream 读取所有字节,中止 HTTP 连接。这可能是一个错误,并可能导致次优行为。通过范围 GET 仅请求您需要的字节,或在使用后耗尽输入流。

private static int saveResultFile(S3Object s3Object, String outputFile, List<ColumnInfo> columnInfoList) {
    System.out.println("This is the save file method.");
    System.out.println(outputFile + "this is output");

    String lastTwoDigits = outputFile.substring(outputFile.length() - 2);

    boolean hasData = false; // Initialize to false
    int totalSum = 0; // Initialize totalSum before the loop
    // Set the maximum records per page
    int maxRecordsPerPage = 1000000;

    try (BufferedReader reader = new BufferedReader(
            new InputStreamReader(s3Object.getObjectContent(), StandardCharsets.UTF_8));
         ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream(outputFile + ".zip"))) {

        CSVReader csvReader = new CSVReader(reader);

        String[] header = csvReader.readNext();

        if (header == null) {
            // No header, return false
            System.out.println("No header found in the CSV file.");
            return NO_HEADERS_FOUND;
        }

        int recordsProcessed = 0;
        int pageNumber = 1;
        CSVWriter writer = null;

        try {
            // Create a ZipOutputStream to write to the zip file
            writer = new CSVWriter(new FileWriter(outputFile + "_" + pageNumber + ".csv"), ',',
                    CSVWriter.DEFAULT_QUOTE_CHARACTER);

            // Write the header for the current page
            writer.writeNext(header);

            String[] line;
            while ((line = csvReader.readNext()) != null) {
                hasData = true; // Set to true if any data is processed

                if (lastTwoDigits.equals("02")) {
                    // Calculate sum for sms_split column
                    if (line.length > 8) { // Assuming index 7 is the 8th column
                        try {
                            totalSum += Integer.parseInt(line[8]); // Assuming sms_split is an integer
                        } catch (NumberFormatException e) {
                            // Handle parsing error if necessary
                            e.printStackTrace();
                        }
                    }
                }

                int lastColumnIndex = line.length - 1;
                String lastColumnName = header[lastColumnIndex];

                if (isTextColumn(lastColumnName)) {
                    // Perform encryption and decryption operations only if it's a text column
                    String encryptedValue = line[lastColumnIndex];
                    String decryptedText = decrypt(encryptedValue);

                    if (isHexString(decryptedText)) {
                        decryptedText = hexToAscii(decryptedText);
                    }

                    line[lastColumnIndex] = decryptedText;
                }

                if (recordsProcessed >= maxRecordsPerPage) {
                    // Close the current writer and create a new one for the next page
                    writer.close();
                    pageNumber++;
                    recordsProcessed = 0;
                    writer = new CSVWriter(new FileWriter(outputFile + "_" + pageNumber + ".csv"), ',',
                            CSVWriter.DEFAULT_QUOTE_CHARACTER);
                    // Write the header for the new page
                    writer.writeNext(header);
                }

                writer.writeNext(line);
                recordsProcessed++;
            }
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("Error creating ZIP file: " + outputFile + ".zip");
        } finally {
            if (writer != null) {
                writer.close();
            }
        }

        if (hasData) {
            for (int i = 1; i <= pageNumber; i++) {
                String pageFileName = outputFile + "_" + i + ".csv";

                try (FileInputStream fis = new FileInputStream(new File(pageFileName))) {
                    String zipEntryName = pageFileName.substring(pageFileName.lastIndexOf("/") + 1);
                    ZipEntry zipEntry = new ZipEntry(zipEntryName);
                    zipOutputStream.putNextEntry(zipEntry);

                    byte[] bytes = new byte[1024];
                    int length;
                    while ((length = fis.read(bytes)) >= 0) {
                        zipOutputStream.write(bytes, 0, length);
                    }

                    zipOutputStream.closeEntry();

                    if (new File(pageFileName).delete()) {
                        // System.out.println("Deleted file: " + pageFileName);
                    } else {
                        System.out.println("Failed to delete file: " + pageFileName);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    System.out.println("Error processing file: " + pageFileName);
                }
            }

            System.out.println("Result files saved successfully with decryption.");
            System.out.println("Total sum of sms_split column: " + totalSum);
        } else {
            System.out.println("No data found to save.");
            return NO_HEADERS_FOUND;
        }

    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        try {
            if (s3Object != null) {
                s3Object.close();
            }``your text``
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    return totalSum;
}
java spring-boot csv amazon-s3
1个回答
0
投票

您可以这样进行批处理:

public void readPartialObject(AmazonS3 s3Client, String bucketName, String objectKey) {
    long objectSize = s3Client.getObjectMetadata(bucketName, objectKey).getContentLength();
    long chunkSize = 20 * 1024 * 1024; // 20 MB
    long byteRangeStart = 0;

    while (byteRangeStart < objectSize) {
        long byteRangeEnd = Math.min(byteRangeStart + chunkSize - 1, objectSize - 1);
        GetObjectRequest rangeObjectRequest = new GetObjectRequest(bucketName, objectKey)
            .withRange(byteRangeStart, byteRangeEnd);

        S3Object objectPortion = s3Client.getObject(rangeObjectRequest);
        // Process the chunk (objectPortion.getObjectContent())

        byteRangeStart += chunkSize;
    }
}

您需要调整方法中的代码。

© www.soinside.com 2019 - 2024. All rights reserved.