在 Spark 结构化流中使用流数据帧更新静态数据帧

问题描述 投票:0回答:1

我有一个与帖子如何使用 Spark 结构化流中的流数据帧更新静态数据帧类似的用例。您可以采用与上述帖子中的示例相同的数据。

static_df = Spark.read.schema(schemaName).json(文件名)

streaming_df = Spark.readStream(....)

new_reference_data = update_reference_df(streaming_df, static_df)

def update_reference_df(df, static_df): query: StreamingQuery = df \ .writeStream \ .outputMode("append") \ .foreachBatch(lambda batch_df, batchId: update_static_df(batch_df, static_df)) \ .start() return query def update_static_df(batch_df, static_df): df1: DataFrame = static_df.union(batch_df.join(static_df, (batch_df.SITE == static_df.SITE) "left_anti")) return df1
我想知道 static_df 如何使用通过 foreachBatch 处理的数据中的新值进行刷新。据我所知 foreachBatch 什么也不返回(VOID)。我需要在进一步处理中使用 static_df 中的新值。感谢您的帮助。

spark-streaming
1个回答
0
投票
你只需要把 static_df 设为全局变量即可。

让我们用你的例子:

static_df = spark.read.schema(schemaName).json(fileName) streaming_df = spark.readStream(....) # No need to define a function here query = StreamingQuery = df \ .writeStream \ .outputMode("append") \ .foreachBatch(update_static_df) \ .start() def update_static_df(batch_df, static_df): global static_df static_df = static_df.union(batch_df.join(static_df, (batch_df.SITE == static_df.SITE) "left_anti")) # do whatever you want for static_df
    
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.