https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers availableNow 模式应该以微批处理方式处理当前可用的所有数据。我的问题是关于流处理现有数据的同时到达的数据。是否也处理了新到达的数据? 我没有在文档中找到这种情况。我的代码是这样的:
bronze_query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema(schema)
.load(<some_path>)
.writeStream
.format("delta")
.outputMode("append")
.trigger(availableNow=True)
.trigger(processingTime='5 seconds')
.option("checkpointLocation", f"<path>")
.table("bronze"))
但是与正在运行的这段代码并发,在初始化流之后,在另一个数据块的笔记本中,我将另一个文件放到上面指定的加载路径中。因此,新文件会在流打开时登陆。