Autoloader 未在流模式下拾取 .text 文件

问题描述 投票:0回答:1

我正在使用 Databricks Autoloader 以流(微批)模式处理文件。源文件采用.text 格式。虽然创建了检查点并且流没有失败,但未创建 Delta 表,并且似乎根本没有拾取文件。


(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .option("encoding", "UTF-16")
    .load(source_path)
    .withColumn("sourcefile", F.col("_metadata.file_path")) 
    # .withColumn("load_timestamp", F.to_timestamp(F.regexp_extract(F.col("sourcefile"), "([0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2})", 1), 'yyyy-MM-dd-HH-mm-ss'))           
    .writeStream
    .option("checkpointLocation", sink_path + '/checkpoints')
    # .option("path", sink_path + '/delta')
    .trigger(availableNow=True)  # Process the file once and stop, new config
    .foreachBatch(lambda batch_df, batch_id: foreach_batch(batch_df, batch_id, sink_path, source_path, table_name, version, meta_data_path))
    .queryName(f"AIDA_{BU}_staging")
    .start()
    .awaitTermination()
)

我检查了以下内容:

正在创建检查点目录。 流运行没有任何错误。 尽管如此,Delta 表并没有被创建,而且 .text 文件似乎没有被处理。

我的配置是否有问题,或者我是否缺少 Autoloader 的一些必要选项?

databricks spark-streaming azure-databricks spark-structured-streaming databricks-autoloader
1个回答
0
投票

我尝试过以下方法:

from pyspark.sql.functions import decode
df = (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "binaryFile")
      .option("cloudFiles.schemaLocation", schema_loc)
      .load(source_data_loc))
decoded_df = df.withColumn("File_Data", decode(col("content"), "UTF-8"))
(decoded_df.writeStream
    .format("delta")
    .option("checkpointLocation", checkpoint_loc)
    .outputMode("append")
    .start(target_data_loc))
display(decoded_df)

在上面的代码中使用二进制格式读取流。 使用 UTF-8 解码内容 将数据写入 Delta 表

结果:

enter image description here

enter image description here

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.