我正在 Databricks 14.3 ML LTS 集群上运行以下代码。我正在执行从 Databricks 10.4 ML LTS 到 14.3 ML LTS 的代码迁移的验证任务。
window_c = (
Window()
.partitionBy("outlet_id")
.orderBy("year_month")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
distinct_combination_count = (
master_keys
.withColumn("distributor_id", f.last("distributor_id", True).over(window_c))
.select("item_id","distributor_id")
.distinct()
.count()
)
为了在两个集群上进行验证,我需要获取 item_id 和经销商_id 的唯一组合。但每次我运行上面的代码片段时,我都会在两个集群中得到不同的结果。 (我在同一个集群中多次运行代码片段并得到了不同的结果)。
如有任何帮助,我们将不胜感激。 我还想知道是否有人注意到在 Databricks 10.4 与 14.3 LTS 中运行代码时结果有任何其他重大差异。
为了在不同集群中获得“item_id”和“distributor_id”的独特组合的一致结果,我尝试了以下方法:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F
data = [
(1, "A", "2022-01-01"),
(2, "B", "2022-01-01"),
(3, None, "2022-01-01"),
(1, "A", "2022-02-01"),
(2, "B", "2022-02-01"),
(3, "C", "2022-02-01"),
]
columns = ["item_id", "distributor_id", "year_month"]
df = spark.createDataFrame(data, columns)
window_c = (
Window()
.partitionBy("item_id")
.orderBy("year_month")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
filled_df = df.withColumn("filled_distributor_id", F.last("distributor_id", True).over(window_c))
distinct_combinations = filled_df.select("item_id", "filled_distributor_id").distinct()
distinct_combination_count = distinct_combinations.count()
distinct_combinations.show()
print("Distinct Combination Count:", distinct_combination_count)
在这段代码中,我定义了窗口规范,用每个分区中的最后一个非空值填充distributor_id中的空值,获得item_id和distributor_id的不同组合,并对不同组合进行计数。
结果:
+-------+---------------------+
|item_id|filled_distributor_id|
+-------+---------------------+
| 1| A|
| 2| B|
| 3| NULL|
| 3| C|
+-------+---------------------+
Distinct Combination Count: 4