我有一个非常大的 Spark DataFrame,我需要将其作为单个 CSV 文件写入 AWS S3 存储桶(我使用 pySpark)。
我无法使用标准的
csv_df.coalesce(1).write.csv()
方法,因为文件太大,无法仅由一个节点处理(内存不足)。
我无法找到有关具有某种追加模式的 S3 的任何信息。
如何实现这一目标?如果可能的话,我更喜欢对 S3 进行某种并行写入以加快速度。
我不知道这个解决方案的性能如何,但它确实允许并行地将 CSV 写入 S3。
import boto3
import csv
from io import StringIO
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
from typing import Iterator
def func(partition_num: int, itr: Iterator[Row]):
global upload_id
global bucket_name
global key
partition_num += 1
with StringIO() as buffer:
for i, row in enumerate(itr):
row = row.asDict()
if not i:
writer = csv.DictWriter(buffer, row.keys(), lineterminator="\n")
if partition_num == 1:
writer.writeheader()
writer.writerow(row)
buffer.truncate(buffer.tell() - 1) # Remove extra newline
# Unfortunatly, s3_client isn't pickleable,
# so we have to create a new one for each partition.
s3_client = boto3.client("s3")
etag = s3_client.upload_part(
Bucket=bucket_name,
Key=key,
PartNumber=partition_num,
UploadId=upload_id,
Body=buffer.getvalue().encode("utf-8"),
)["ETag"]
yield {"PartNumber": partition_num, "ETag": etag}
bucket_name = "BUCKET"
key = "myfile.csv"
s3_client = boto3.client("s3")
upload_id = s3_client.create_multipart_upload(Bucket=bucket_name, Key=key)["UploadId"]
spark: SparkSession = (
SparkSession.builder.master("local[*]").appName("MyApp").getOrCreate()
)
df = df # Insert your dataframe
try:
# Use a better method of partitioning than this,
# but keep in mind they must all at least be 5mb.
parts = df.rdd.coalesce(4).mapPartitionsWithIndex(func).collect()
except:
s3_client.abort_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id)
raise
s3_client.complete_multipart_upload(
Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={"Parts": parts}
)