跳过Python Spark Pyspark Databricks未知字段异常中的坏记录

问题描述 投票:0回答:2

我想知道是否有人知道如何跳过我们从 json 文件获取的记录

这是错误

[UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_RECORD_WITH_FILE_PATH] 解析过程中遇到未知字段: 这是失败的代码

sent = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.format', 'json') \
  .option('multiline', 'true') \
  .option('cloudFiles.inferColumnTypes', 'true') \
  .option('cloudFiles.schemaLocation', checkpoint_path) \
  .load(raw_files) \
  .withColumn('load_ts', F.current_timestamp()) \
  .writeStream \
  .format('delta') \
  .option('checkpointLocation', checkpoint_path) \
  .trigger(availableNow=True) \
  .option('mergeSchema', 'true') \
  .toTable(b_write_path)

谢谢!

我还没有看到任何有关如何修复此错误的文档。

python apache-spark error-handling databricks databricks-autoloader
2个回答
1
投票

这取决于您想如何处理这些数据。默认情况下,Databricks Autoloader 使用

addNewColumns
模式,该模式在遇到新列时会失败,但重新启动后会正确处理它们。

您可以使用

rescue
none
作为模式演化模式,就像这样。

.option("cloudFiles.schemaEvolutionMode", "rescue")

rescue
模式下,新列的数据将被放入所谓的“救援列”中,您可以在必要时对其进行分析,并且该过程不会失败。

none
模式下,新列将被忽略,并且该过程不会失败。

请参阅文档了解更多详细信息。


0
投票

我还要感谢你,因为我也遇到了同样的问题,并且我设法解决了它。关于addNewColumns模式,它对你有用吗?

© www.soinside.com 2019 - 2024. All rights reserved.