我正在使用 foreachbatch 将流数据写入多个目标,并且它在第一次微批处理执行时工作正常。当它尝试运行第二个微批次时,它失败并出现以下错误。 “StreamingQueryException:查询 [id = 0d8e45ff-4f3a-42c0-964d-6f41c93df801,runId = 186a22bf-c75e-482b-bd4b-19b039a9aa38] 异常终止: abfss://xxxx@xxxxxxxxxx.dfs.core.windows.net/primary/directory1 已经存在”
下面是我使用的 foreach 片段。
df_new = <<<some streaming dataset>>>
val appId = "1dbcd4f2-eeb7-11ed-a05b-0242ac120003"
df_new.writeStream.format("delta").option("mergeSchema", "true").outputMode("append").option("checkpointLocation", "abfss://xxx@xxxxxxxxxx.dfs.core.windows.net/checkpoint/chkdir").foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
val fc_final= batchDF.filter(col("msg_type") === "FC" ).drop(columnlist_fc:_*)
fc_final.write.option("txnVersion", batchId).option("txnAppId", appId).save("abfss://xxxx@xxxxxxxxxx.dfs.core.windows.net/primary/directory1")
val hb_final = batchDF.filter(col("msg_type") =!= "FC" ).drop(columnlist_hb:_*)
hb_final.write.partitionBy("occurrence_month").option("txnVersion", batchId).option("txnAppId", appId).save("abfss://xxx@xxxxxxxxxx.dfs.core.windows.net/primary/directory2")
batchDF.unpersist()
()
}.start().awaitTermination()
我在这里缺少什么?为什么它不能将数据文件追加到 delta 目录,即使我指定了 mode=append。非常感谢您的帮助。