在 Databricks 上的 PySpark 中序列化压缩文件时出现内存问题

问题描述 投票:0回答:1

我想在 Databricks 上的 PySpark 中解压许多 7z 格式的文件。 zip 文件包含数千个小文件。

我使用二进制文件读取文件,并使用 UDF 解压缩文件:

schema = ArrayType(StringType())

@F.udf(returnType=schema)
def unzip_content_udf(content):
    extracted_files = []

    with py7zr.SevenZipFile(io.BytesIO(content), mode='r') as z:
        for name, bytes_stream in z.readall().items():
            if name.startswith("v1") or name.startswith("v2"):
                unzipped_content = bytes_stream.read().decode(ENCODING)
                extracted_files.append(unzipped_content)
    return extracted_files

df = spark.read.format("binaryFile").load("/mnt/file_pattern*")
df = df.withColumn("unzipped_files", unzip_content_udf(F.col("content")))
df.write.mode("overwrite").parquet("/mnt/test_dump_unzipped")

这对于较小的文件效果很好,但如果我指定较大的文件之一(压缩的 150 MB,解压缩的 4.5 GB),进程就会终止,我得到:

Py4JJavaError: An error occurred while calling o1665.parquet.
ValueError: can not serialize object larger than 2G

我想,这是有道理的,因为序列化限制小于解压缩文件的大小。

您对如何增加限制或将解压缩操作的大小分块到限制以下有什么想法吗?

python pyspark databricks 7zip py7zr
1个回答
0
投票

典型的 stackoverflow 答案是:你做错了。

这似乎是 Spark 的误用,因为您并没有真正使用 Spark 功能。您主要使用它来跨集群的多个节点分发解压缩。例如。你可以使用 dispy 来代替。

can not serialize object larger than 2G

IMO,如果您的 Dataframe/表中的单行超过 2GB,那么您会遇到一些数据建模问题,这是非常合理的。

尝试使用 udf 执行此操作的主要问题是:

  • udf 无法返回多行(即将一个 zip 文件的 3GB 内容拆分为 3x1GB 行)
  • 一行不能大于2GB。

Java 字节数组的最大容量为 2GB。我不知道这是否是 Spark 限制的原因,但这意味着您可能无法在这个问题上投入更多的钱/硬件并更改一些 Spark 配置以序列化 4.5 GB,即运行您发布的代码就像在更大的硬件上一样。

总的来说,在我看来,您遇到了问题,因为您使用了错误的工具(spark)来执行此操作。


选项:

  1. 如果你真的不着急(性能),那么只需使用 ThreadPoolExecutor 或其他东西,并使用简单的多线程 python 代码解压所有文件。问题是它不能水平缩放。
  2. 如果您有无数的文件和 PB 的数据,而选项 1 需要数年时间:
    • 编写简单的 python 程序来列出所有文件,然后向一堆工作节点发出
      ssh
      命令以在集群中并行解压缩文件。列表可以并行完成(ThreadPoolExecutor)。
    • 使用类似 dispy 的工具来解压缩文件。以小于 2GB 的组重新分发解压缩的文件。然后使用 Spark 读取重新分发的解压缩文件并以 parquet 形式写回。如果您愿意,还有其他框架。
© www.soinside.com 2019 - 2024. All rights reserved.