如何将非常大的 Spark 数据帧写入 AWS S3 中的单个 csv 文件?

问题描述 投票:0回答:1

我有一个非常大的 Spark DataFrame,我需要将其作为单个 CSV 文件写入 AWS S3 存储桶(我使用 pySpark)。

我无法使用标准的

csv_df.coalesce(1).write.csv()
方法,因为文件太大,无法仅由一个节点处理(内存不足)。

我无法找到有关具有某种追加模式的 S3 的任何信息。

如何实现这一目标?如果可能的话,我更喜欢对 S3 进行某种并行写入以加快速度。

apache-spark amazon-s3 pyspark
1个回答
0
投票

我不知道这个解决方案的性能如何,但它确实允许并行地将 CSV 写入 S3。

import boto3
import csv
from io import StringIO
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
from typing import Iterator


def func(partition_num: int, itr: Iterator[Row]):
    global upload_id
    global bucket_name
    global key
    partition_num += 1
    with StringIO() as buffer:
        for i, row in enumerate(itr):
            row = row.asDict()
            if not i:
                writer = csv.DictWriter(buffer, row.keys(), lineterminator="\n")
                if partition_num == 1:
                    writer.writeheader()
            writer.writerow(row)
        buffer.truncate(buffer.tell() - 1)  # Remove extra newline

        # Unfortunatly, s3_client isn't pickleable, 
        # so we have to create a new one for each partition.
        s3_client = boto3.client("s3")
        etag = s3_client.upload_part(
            Bucket=bucket_name,
            Key=key,
            PartNumber=partition_num,
            UploadId=upload_id,
            Body=buffer.getvalue().encode("utf-8"),
        )["ETag"]

        yield {"PartNumber": partition_num, "ETag": etag}
bucket_name = "BUCKET"
key = "myfile.csv"
s3_client = boto3.client("s3")
upload_id = s3_client.create_multipart_upload(Bucket=bucket_name, Key=key)["UploadId"]

spark: SparkSession = (
    SparkSession.builder.master("local[*]").appName("MyApp").getOrCreate()
)
df = df  # Insert your dataframe

try:
    # Use a better method of partitioning than this,
    # but keep in mind they must all at least be 5mb.
    parts = df.rdd.coalesce(4).mapPartitionsWithIndex(func).collect()
except:
    s3_client.abort_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id)
    raise

s3_client.complete_multipart_upload(
    Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={"Parts": parts}
)
© www.soinside.com 2019 - 2024. All rights reserved.