我想在 Databricks 上的 PySpark 中解压许多 7z 格式的文件。 zip 文件包含数千个小文件。
我使用二进制文件读取文件,并使用 UDF 解压缩文件:
schema = ArrayType(StringType())
@F.udf(returnType=schema)
def unzip_content_udf(content):
extracted_files = []
with py7zr.SevenZipFile(io.BytesIO(content), mode='r') as z:
for name, bytes_stream in z.readall().items():
if name.startswith("v1") or name.startswith("v2"):
unzipped_content = bytes_stream.read().decode(ENCODING)
extracted_files.append(unzipped_content)
return extracted_files
df = spark.read.format("binaryFile").load("/mnt/file_pattern*")
df = df.withColumn("unzipped_files", unzip_content_udf(F.col("content")))
df.write.mode("overwrite").parquet("/mnt/test_dump_unzipped")
这对于较小的文件效果很好,但如果我指定较大的文件之一(压缩的 150 MB,解压缩的 4.5 GB),进程就会终止,我得到:
Py4JJavaError: An error occurred while calling o1665.parquet.
ValueError: can not serialize object larger than 2G
我想,这是有道理的,因为序列化限制小于解压缩文件的大小。
您对如何增加限制或将解压缩操作的大小分块到限制以下有什么想法吗?
典型的 stackoverflow 答案是:你做错了。
这似乎是 Spark 的误用,因为您并没有真正使用 Spark 功能。您主要使用它来跨集群的多个节点分发解压缩。例如。你可以使用 dispy 来代替。
can not serialize object larger than 2G
IMO,如果您的 Dataframe/表中的单行超过 2GB,那么您会遇到一些数据建模问题,这是非常合理的。
尝试使用 udf 执行此操作的主要问题是:
Java 字节数组的最大容量为 2GB。我不知道这是否是 Spark 限制的原因,但这意味着您可能无法在这个问题上投入更多的钱/硬件并更改一些 Spark 配置以序列化 4.5 GB,即运行您发布的代码就像在更大的硬件上一样。
总的来说,在我看来,您遇到了问题,因为您使用了错误的工具(spark)来执行此操作。
选项:
ssh
命令以在集群中并行解压缩文件。列表可以并行完成(ThreadPoolExecutor)。