我有一个与帖子如何使用 Spark 结构化流中的流数据帧更新静态数据帧类似的用例。您可以采用与上述帖子中的示例相同的数据。
static_df = Spark.read.schema(schemaName).json(文件名)streaming_df = Spark.readStream(....)
new_reference_data = update_reference_df(streaming_df, static_df)
def update_reference_df(df, static_df):
query: StreamingQuery = df \
.writeStream \
.outputMode("append") \
.foreachBatch(lambda batch_df, batchId: update_static_df(batch_df, static_df)) \
.start()
return query
def update_static_df(batch_df, static_df):
df1: DataFrame = static_df.union(batch_df.join(static_df,
(batch_df.SITE == static_df.SITE)
"left_anti"))
return df1
我想知道 static_df 如何使用通过 foreachBatch 处理的数据中的新值进行刷新。据我所知 foreachBatch 什么也不返回(VOID)。我需要在进一步处理中使用 static_df 中的新值。感谢您的帮助。
让我们用你的例子:
static_df = spark.read.schema(schemaName).json(fileName)
streaming_df = spark.readStream(....)
# No need to define a function here
query = StreamingQuery = df \
.writeStream \
.outputMode("append") \
.foreachBatch(update_static_df) \
.start()
def update_static_df(batch_df, static_df):
global static_df
static_df = static_df.union(batch_df.join(static_df,
(batch_df.SITE == static_df.SITE)
"left_anti"))
# do whatever you want for static_df