使用两个增量表(tableA,tableB)作为流管道的输入,我想实现以下目标:
我从以下开始:
tableA = spark.readstream.format("delta").load(path_to_tableA)
tableB = spark.readstream.format("delta").load(path_to_tableB)
mergedTable = tableA.join(tableB, ...., "inner")
def process_microbatch(df, batch_id):
...transformations on df...
df.write.mode("append").saveAsTable(path_to_tableB)
mergedTable.writeStream.foreachBatch(process_microbatch).start()
我如何确保只有 tableA 的更新才会触发微批量处理?当然也很重要的是,tableB 的新行在下一批的第 2 点中被识别。
如果
tableB
仅在流开头加载一次且此后未更新,则微批次内对其所做的任何更改都不会反映在后续微批次中。为了解决这个问题,您需要确保在每个微批次中重新加载 tableB
,以便它包含上一个微批次中所做的更新。
因此,在
tableB
函数中重新加载 process_microbatch
。
这是代码。
from pyspark.sql.functions import expr
# Read tableA and apply watermarking
tableA = spark.readStream.format("delta").load(path_to_tableA)
# Define the processing function
def process_microbatch(df, batch_id):
# Read tableB within the processing function
tableB = spark.read.format("delta").load(path_to_tableB)
# Join tables
mergedTable = df.join(tableB, ...., "inner")
transformed_df = ...
transformed_df.write.mode("append").format("delta").save(path_to_tableB)
streamingQuery = tableA.writeStream.foreachBatch(process_microbatch)
.start()
streamingQuery.awaitTermination()
通过在每个微批次中重新加载
tableB
,您可以确保在之前的微批次中对其所做的任何更改都会在后续微批次中得到考虑。