我已经使用自动加载器功能设置了流作业,输入位于 azure adls gen2,采用 parquet 格式。下面是代码。
df = spark.readStream.format("cloudFiles")\
.options(**cloudfile)\
.schema(schema).load(staging_path)
df.writeStream\
.trigger(processingTime="10 minutes"))\
.outputMode("append")\
.option("checkpointLocation", checkpoint_path)\
.foreachBatch(writeBatchToADXandDelta)\
.start()
此代码抛出如下错误
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 11.0 failed 4 times, most recent failure: Lost task 5.3 in stage 11.0 (TID 115) (172.20.58.133 executor 1): com.databricks.sql.io.FileReadException: Error while reading file /mnt/adl2/environment=production/data_lake=main/tier=ingress/area=transient/domain=iotdata/entity=screens/topic=sensor/vendor=abc/source_system=iot_hub/parent=external/dataset=screens/kind=data/evolution=2/file_format=parquet/source=kevitsa/ingestion_date=2022/08/03/13/-136567710_c96a862c2aaf43cfbd62025cd3db4a48_1.parquet.
..
Caused by: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$2.apply(ParquetFileFormat.scala:397)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$2.apply(ParquetFileFormat.scala:373)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:333)
... 18 more
这可能是什么原因。
提前致谢!