我正在 Azure Synapse Analytics 上运行 Spark 作业。笔记本从 Azure Data Lake Storage Gen 2 帐户读取和写入数据(相同的存储,但是读取和写入发生在不同的路径)。它处理 CSV 数据(大块)和小参考数据(parquet/CSV),并以 parquet 格式写入最终输出。 CSV 格式的较大数据集存储为 200 个分区文件。
笔记本作业使用 980 个 vCore 的 Spark 计算集群(每个节点有 4 个 vCore 和 28 GB 内存)。总体应用以下代码逻辑:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
spark.conf.set("spark.sql.shuffle.partitions",200)
// Read the data from ADLS Gen2
// Do computations
// Write output
df = (outputDf
.repartition(980)
.write
.mode("overwrite")
.parquet(outputPath))
执行器错误日志指示以下错误:
2024-12-13 04:29:22,453 WARN TaskSetManager [task-result-getter-3]: Lost task 998.0 in stage 13.0 (TID 5210) (vm-d8e97779 executor 118): org.apache.spark.SparkException: Encountered error while reading file abfss://<endpointWithPath>?version=1733996160491?flength=8008249846. Details:
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:353)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:158)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:764)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:574)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: org.apache.spark.util.TaskCompletionListenerException: null
Previous exception in task: Encountered error while reading file abfss://<endpointWithPath>?version=1733996160491?flength=8008249846. Details:
org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:353)
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:158)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:764)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
org.apache.spark.scheduler.Task.run(Task.scala:139)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:574)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:254)
at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:137)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:172)
... 7 more
Suppressed: java.lang.NullPointerException
at org.apache.hadoop.shaded.com.microsoft.vegas.common.VegasPipeInputStream.close(VegasPipeInputStream.java:806)
at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
at org.apache.hadoop.util.LineReader.close(LineReader.java:152)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.close(LineRecordReader.java:281)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.close(RecordReaderIterator.scala:69)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.close(HadoopFileLinesReader.scala:71)
at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.$anonfun$readFile$2(CSVDataSource.scala:97)
at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.$anonfun$readFile$2$adapted(CSVDataSource.scala:97)
at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:132)
at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:199)
... 10 more
Caused by: java.io.IOException: Could not open pipe [/mnt/vegas/pipes/2575cfd8-c0e5-43d6-b86d-21f829a05087.pipe, pos=0,blocksRead=0; bytesRead=0; availInPipe=0]
Vegas Service: Context=c18dceff-0a3e-4be0-9dfa-81200dbe71ce, Temporarily Unsupported Scenario: Line Separator not in initial block of partition
at org.apache.hadoop.shaded.com.microsoft.vegas.common.VegasPipeInputStream.pipelineThread(VegasPipeInputStream.java:536)
... 3 more
我想了解这个问题的根本原因。因为经常重试或更改火花池会有所帮助,并且业务逻辑工作正常。但是,这些错误会间歇性地不断出现,并且在这些情况下笔记本无法成功完成。
由:java.io.IOException引起:
Java IOException 是输入或输出操作失败或中断时发生的输入/输出 (I/O) 异常。例如,尝试读取不存在的文件将导致 Java 抛出 I/O 异常。
错误:
无法打开管道 [/mnt/vegas/pipes/2575cfd8-c0e5-43d6-b86d-21f829a05087.pipe, pos=0,块读取=0;字节读取=0; availableInPipe=0] 维加斯服务: Context=c18dceff-0a3e-4be0-9dfa-81200dbe71ce,暂时不支持 场景:行分隔符不在分区的初始块中 org.apache.hadoop.shaded.com.microsoft.vegas.common.VegasPipeInputStream.pipelineThread(VegasPipeInputStream.java:536)
在上面的错误(无法打开管道...暂时不支持的情况:行分隔符不在分区的初始块中)意味着驱动程序无法处理数据中的分区边界。
处理大文件时,您可以使用以下设置来优化大文件读取:
spark.conf.set("spark.hadoop.fs.azure.read.max.concurrency", "128") spark.conf.set("spark.hadoop.fs.azure.block.size", "128MB")
正如您提到的,有 980 个 vCore 和 245 个执行器(假设每个执行器有 4 个 vCore),集群配置为高水平并行性。
不使用 repartition(980)(匹配核心数量),而是根据数据集的大小重新分区。
您可以根据数据集大小或所需的文件数量进行重新分区
df = df.repartition(200)
在写入较小的输出时,可以使用合并来减少分区
df = df.coalesce(100)
因此,使用上述方法,每个分区可以实现 128 MB 到 1 GB。您可以使用以下命令检查分区
print("Number of partitions:", df.rdd.getNumPartitions())
结果:
Number of partitions: 16