我使用了 PySpark DataFrame,在其中调用了 UDF 函数。此 UDF 函数进行 API 调用并将响应存储回 DataFrame。我的目标是存储 DataFrame 并重用它,而不是多次调用同一个 DataFrame
df = df_source.withColumn("Id",generateid_udf(col("ExID"),col("DueDate"),col("MoxiePDMID"),col("Details"),col("Name")))
df.persist()
data = df.collect()
print(data)
**Merge condition **
targetDF = DeltaTable.forPath(spark, targetPath)
(targetDF.alias("tgt")
.merge(df.alias("src"), "tgt.ExID = src.ExID" )
.whenMatchedUpdate(set =
{
"tgt.Id": "src.Id",
"tgt.UpdatedBy": lit("Created"),
"tgt.UpdatedDate": current_timestamp()})
.execute()
)
假设在 Collect 操作中,ID 字段返回 101,但在合并条件下,它存储为 102。我想避免进行多个 API 调用。根据我的理解,如果我们缓存并持久化数据,它将存储在磁盘上,以便我们重用它
您需要在创建数据框后立即对其进行持久化。
还要确保在持久化之前不执行收集。
我已经完成了以下测试。
source_data = [(1,)]
df_source = spark.createDataFrame(source_data, ["ExID"])
df = df_source.withColumn("Id", add_one(col("ExID")))
df.persist()
print(df.collect())
print(df.collect())
print(df.collect())
输出:
[Row(ExID=1, Id=710)]
[Row(ExID=1, Id=710)]
[Row(ExID=1, Id=710)]
对于下面的代码。
source_data = [(1,)]
df_source = spark.createDataFrame(source_data, ["ExID"])
df = df_source.withColumn("Id", add_one(col("ExID")))
print(df.collect())
df.persist()
print(df.collect())
print(df.collect())
print(df.collect())
输出:
[Row(ExID=1, Id=675)]
[Row(ExID=1, Id=182)]
[Row(ExID=1, Id=658)]
[Row(ExID=1, Id=387)]
如果您在这里观察,当在持久化之前完成收集时,值正在发生变化。
所以,请确保立即坚持下去。
如果仍然遇到相同的问题,请从创建数据帧的开头运行代码。