我想将架构应用于 Spark DataFrame 的特定非技术列。事先,我使用
Window
和 row_number
添加人工 ID,以便稍后可以将一些其他技术专栏从初始 DataFrame 加入到新 DataFrame 中。但是,应用架构后,生成的 ID 变得混乱。下面是一个代码示例。有人可以解释为什么会发生这种情况以及如何解决问题吗?
from pyspark.sql.functions import row_number, lit, col, monotonically_increasing_id, sum
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Sample DataFrame
data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
df = spark.createDataFrame(data, ["id", "name"])
# Schema to apply
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
])
# Create ID column
w = Window().orderBy(lit('A'))
df = df.withColumn('_special_surrogate_id', row_number().over(w))
# Improved method
surrogate_key_field = StructField("_special_surrogate_id", StringType(), False)
schema_with_surrogate = StructType(schema.fields + [surrogate_key_field])
# Loop because sometimes it works and sometimes it does't work
for i in range(11):
df_filtered = df.select("id", "name", "_special_surrogate_id")
df_filtered = spark.createDataFrame(df_filtered.rdd, schema_with_surrogate)
combined_df = df.withColumnRenamed("id", "id1").join(df_filtered.withColumnRenamed("id", "id2"), on="_special_surrogate_id")
print("Diffs in Iteration " + str(i) + ":")
print(combined_df.withColumn("diff", (col("id1") != col("id2")).cast("integer")).agg(sum("diff")).collect()[0][0])
问题:
orderBy(lit('A'))
用于确定 row_number() 排序。这不是定义行数值的确定性方法。
Spark幕后:
Spark 使用多个执行器来完成行编号过程。为此,Spark 正在执行第一遍,其中每个执行程序对执行程序拥有的数据进行排序。然后再进行一次对所有数据进行排序。
当
Alice
数据行和Bob
数据行分别由不同的执行器处理时,作为第一遍的结果,它们都可以是1
。然后 Spark 在第二遍中决定哪个数据行最终是 1
。然而,你的逻辑并没有告诉 Spark 如何做出这个决定,因此每次的决定可能不会以相同的方式做出。
推荐:
如果您的列组合可能会导致每行产生唯一的值集,则应在
orderBy
语句中使用该组合,以便为 Spark 提供一致且确定性的行号分配所需的信息。
例如
orderBy(F.col('id'),F.col('name'))