Spark 失败并出现错误:行分隔符不在分区的初始块中

问题描述 投票:0回答:1

我正在 Azure Synapse Analytics 上运行 Spark 作业。笔记本从 Azure Data Lake Storage Gen 2 帐户读取和写入数据(相同的存储,但是读取和写入发生在不同的路径)。它处理 CSV 数据(大块)和小参考数据(parquet/CSV),并以 parquet 格式写入最终输出。 CSV 格式的较大数据集存储为 200 个分区文件。

笔记本作业使用 980 个 vCore 的 Spark 计算集群(每个节点有 4 个 vCore 和 28 GB 内存)。总体应用以下代码逻辑:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
spark.conf.set("spark.sql.shuffle.partitions",200)

// Read the data from ADLS Gen2

// Do computations

// Write output
df = (outputDf
    .repartition(980)
    .write
    .mode("overwrite")
    .parquet(outputPath))

执行器错误日志指示以下错误:

2024-12-13 04:29:22,453 WARN TaskSetManager [task-result-getter-3]: Lost task 998.0 in stage 13.0 (TID 5210) (vm-d8e97779 executor 118): org.apache.spark.SparkException: Encountered error while reading file abfss://<endpointWithPath>?version=1733996160491?flength=8008249846. Details:
  at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:353)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:158)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:764)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
  at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
  at org.apache.spark.scheduler.Task.run(Task.scala:139)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:574)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:829)
  Suppressed: org.apache.spark.util.TaskCompletionListenerException: null

Previous exception in task: Encountered error while reading file abfss://<endpointWithPath>?version=1733996160491?flength=8008249846. Details:
  org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
  org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:353)
  org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:158)
  scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:764)
  scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
  org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
  org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
  org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
  org.apache.spark.scheduler.Task.run(Task.scala:139)
  org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:574)
  org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
  java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  java.base/java.lang.Thread.run(Thread.java:829)
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:254)
    at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:144)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:137)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:172)
    ... 7 more
    Suppressed: java.lang.NullPointerException
      at org.apache.hadoop.shaded.com.microsoft.vegas.common.VegasPipeInputStream.close(VegasPipeInputStream.java:806)
      at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
      at org.apache.hadoop.util.LineReader.close(LineReader.java:152)
      at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.close(LineRecordReader.java:281)
      at org.apache.spark.sql.execution.datasources.RecordReaderIterator.close(RecordReaderIterator.scala:69)
      at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.close(HadoopFileLinesReader.scala:71)
      at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.$anonfun$readFile$2(CSVDataSource.scala:97)
      at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.$anonfun$readFile$2$adapted(CSVDataSource.scala:97)
      at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:132)
      at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:144)
      at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:144)
      at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:199)
      ... 10 more
Caused by: java.io.IOException: Could not open pipe [/mnt/vegas/pipes/2575cfd8-c0e5-43d6-b86d-21f829a05087.pipe, pos=0,blocksRead=0; bytesRead=0; availInPipe=0]
Vegas Service: Context=c18dceff-0a3e-4be0-9dfa-81200dbe71ce, Temporarily Unsupported Scenario: Line Separator not in initial block of partition
  at org.apache.hadoop.shaded.com.microsoft.vegas.common.VegasPipeInputStream.pipelineThread(VegasPipeInputStream.java:536)
  ... 3 more

我想了解这个问题的根本原因。因为经常重试或更改火花池会有所帮助,并且业务逻辑工作正常。但是,这些错误会间歇性地不断出现,并且在这些情况下笔记本无法成功完成。

apache-spark pyspark azure-storage azure-synapse
1个回答
0
投票

由:java.io.IOException引起:

Java IOException 是输入或输出操作失败或中断时发生的输入/输出 (I/O) 异常。例如,尝试读取不存在的文件将导致 Java 抛出 I/O 异常。

错误:

无法打开管道 [/mnt/vegas/pipes/2575cfd8-c0e5-43d6-b86d-21f829a05087.pipe, pos=0,块读取=0;字节读取=0; availableInPipe=0] 维加斯服务: Context=c18dceff-0a3e-4be0-9dfa-81200dbe71ce,暂时不支持 场景:行分隔符不在分区的初始块中 org.apache.hadoop.shaded.com.microsoft.vegas.common.VegasPipeInputStream.pipelineThread(VegasPipeInputStream.java:536)

在上面的错误(无法打开管道...暂时不支持的情况:行分隔符不在分区的初始块中)意味着驱动程序无法处理数据中的分区边界。

处理大文件时,您可以使用以下设置来优化大文件读取:

spark.conf.set("spark.hadoop.fs.azure.read.max.concurrency", "128") spark.conf.set("spark.hadoop.fs.azure.block.size", "128MB")

enter image description here

正如您提到的,有 980 个 vCore 和 245 个执行器(假设每个执行器有 4 个 vCore),集群配置为高水平并行性。

不使用 repartition(980)(匹配核心数量),而是根据数据集的大小重新分区。

您可以根据数据集大小或所需的文件数量进行重新分区

df = df.repartition(200)

在写入较小的输出时,可以使用合并来减少分区

df = df.coalesce(100)

因此,使用上述方法,每个分区可以实现 128 MB 到 1 GB。您可以使用以下命令检查分区

 print("Number of partitions:", df.rdd.getNumPartitions())

结果:

Number of partitions: 16
© www.soinside.com 2019 - 2024. All rights reserved.