Azure-Databricks autoloader Binaryfile 选项与 foreach() 给出 java.lang.OutOfMemoryError: Java heap space

问题描述 投票:0回答:1

我正在尝试使用 BinaryFile 选项和自动加载器中的 foreach(copy) 将文件从一个位置复制到另一个位置。它在较小的文件(最多 150 MB)上运行良好,但在较大的文件上运行失败,抛出以下异常:

22/09/07 10:25:51 INFO FileScanRDD:读取文件路径:dbfs:/mnt/somefile.csv,范围:0-1652464461,分区值:[空行],修改时间:1662542176000。 22/09/07 10:25:52 错误实用程序:/databricks/python/bin/python 的线程标准输出编写器中未捕获的异常 java.lang.OutOfMemoryError: Java 堆空间 在 org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:416) 在 org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader.read(SpecializedGettersReader.java:75) 在 org.apache.spark.sql.catalyst.expressions.UnsafeRow.get(UnsafeRow.java:333) 在 org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:58) 在 org.apache.spark.sql.execution.python.PythonForeachWriter.$anonfun$inputByteIterator$1(PythonForeachWriter.scala:43) 在 org.apache.spark.sql.execution.python.PythonForeachWriter$$Lambda$1830/1643360976.apply(未知来源) 在 scala.collection.Iterator$$anon$10.next(Iterator.scala:461) 在 org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:92) 在 org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:82) 在 scala.collection.Iterator.foreach(Iterator.scala:943) 在 scala.collection.Iterator.foreach$(Iterator.scala:943) 在 org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:82) 在 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:442) 在 org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:871) 在 org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:573) 在 org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$2008/2134044540.apply(未知来源) 在 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2275) 在 org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:365) 22/09/07 10:25:52 错误 SparkUncaughtExceptionHandler:线程线程中未捕获的异常 [/databricks/python/bin/python,5,main] 的标准输出编写器 java.lang.OutOfMemoryError: Java 堆空间 在 org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:416) 在 org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader.read(SpecializedGettersReader.java:75) 在 org.apache.spark.sql.catalyst.expressions.UnsafeRow.get(UnsafeRow.java:333) 在 org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:58) 在 org.apache.spark.sql.execution.python.PythonForeachWriter.$anonfun$inputByteIterator$1(PythonForeachWriter.scala:43) 在 org.apache.spark.sql.execution.python.PythonForeachWriter$$Lambda$1830/1643360976.apply(未知来源) 在 scala.collection.Iterator$$anon$10.next(Iterator.scala:461) 在 org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:92) 在 org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:82) 在 scala.collection.Iterator.foreach(Iterator.scala:943) 在 scala.collection.Iterator.foreach$(Iterator.scala:943) 在 org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:82) 在 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:442) 在 org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:871) 在 org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:573) 在 org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$2008/2134044540.apply(未知来源) 在 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2275) 在 org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:365)

以下是供参考的高级代码片段:

集群大小为 2 个工人和 1 个驱动程序,每个 14 Gb ram 和 4 个内核


cloudfile_options = {
    "cloudFiles.subscriptionId":subscription_ID,
    "cloudFiles.connectionString": queue_SAS_connection_string,
    "cloudFiles.format": "BinaryFile", 
    "cloudFiles.tenantId":tenant_ID,
    "cloudFiles.clientId":client_ID,
    "cloudFiles.clientSecret":client_secret,
    "cloudFiles.useNotifications" :"true"
}

def copy(row):
    source = row['path']
    destination = "somewhere"
    shutil.copy(source,destination)

spark.readStream.format("cloudFiles")
                        .options(**cloudfile_options)
                        .load(storage_input_path)              
                        .writeStream
                        .foreach(copy)
                        .option("checkpointLocation", checkpoint_location)
                        .trigger(once=True)
                        .start()

我还在 foreach() 之外用巨大的文件大小 (20GB) 测试了 shutil.copy 并且它工作得很好。

对此的任何线索将不胜感激😊

apache-spark pyspark databricks azure-databricks databricks-autoloader
1个回答
2
投票

发生这种情况是因为您正在传递包含应从 JVM 序列化到 Python 的文件内容的整行。如果你所做的一切只是复制文件,那么只需在

.select("path")
之前添加
.writeStream
,这样只有文件名将被传递给 Python,但没有内容:

© www.soinside.com 2019 - 2024. All rights reserved.