Spark:坚持没有按预期工作

问题描述 投票:0回答:1

我使用了 PySpark DataFrame,在其中调用了 UDF 函数。此 UDF 函数进行 API 调用并将响应存储回 DataFrame。我的目标是存储 DataFrame 并重用它,而不是多次调用同一个 DataFrame

df = df_source.withColumn("Id",generateid_udf(col("ExID"),col("DueDate"),col("MoxiePDMID"),col("Details"),col("Name")))

df.persist() 
data = df.collect()   
print(data)

**Merge condition **
targetDF = DeltaTable.forPath(spark, targetPath)  
(targetDF.alias("tgt")
.merge(df.alias("src"), "tgt.ExID = src.ExID" )
    .whenMatchedUpdate(set =
    {
    "tgt.Id": "src.Id",
    "tgt.UpdatedBy": lit("Created"),
    "tgt.UpdatedDate": current_timestamp()})
.execute()
)

假设在 Collect 操作中,ID 字段返回 101,但在合并条件下,它存储为 102。我想避免进行多个 API 调用。根据我的理解,如果我们缓存并持久化数据,它将存储在磁盘上,以便我们重用它

apache-spark pyspark caching databricks azure-databricks
1个回答
0
投票

您需要在创建数据框后立即对其进行持久化。

还要确保在持久化之前不执行收集。

我已经完成了以下测试。

source_data = [(1,)]
df_source = spark.createDataFrame(source_data, ["ExID"])
df = df_source.withColumn("Id", add_one(col("ExID")))
df.persist()
print(df.collect())
print(df.collect())
print(df.collect())

输出:

[Row(ExID=1, Id=710)]
[Row(ExID=1, Id=710)]
[Row(ExID=1, Id=710)]

对于下面的代码。

source_data = [(1,)]
df_source = spark.createDataFrame(source_data, ["ExID"])
df = df_source.withColumn("Id", add_one(col("ExID")))
print(df.collect())
df.persist()
print(df.collect())
print(df.collect())
print(df.collect())

输出:

[Row(ExID=1, Id=675)]
[Row(ExID=1, Id=182)]
[Row(ExID=1, Id=658)]
[Row(ExID=1, Id=387)]

如果您在这里观察,当在持久化之前完成收集时,值正在发生变化。

所以,请确保立即坚持下去。

如果仍然遇到相同的问题,请从创建数据帧的开头运行代码。

© www.soinside.com 2019 - 2024. All rights reserved.