应用模式时,为什么我的 PySpark row_number 列混乱?

问题描述 投票:0回答:1

我想将架构应用于 Spark DataFrame 的特定非技术列。事先,我使用

Window
row_number
添加人工 ID,以便稍后可以将一些其他技术专栏从初始 DataFrame 加入到新 DataFrame 中。但是,应用架构后,生成的 ID 变得混乱。下面是一个代码示例。有人可以解释为什么会发生这种情况以及如何解决问题吗?

from pyspark.sql.functions import row_number, lit, col, monotonically_increasing_id, sum
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Sample DataFrame
data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
df = spark.createDataFrame(data, ["id", "name"])


# Schema to apply
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
])

# Create ID column
w = Window().orderBy(lit('A'))
df = df.withColumn('_special_surrogate_id', row_number().over(w))

# Improved method
surrogate_key_field = StructField("_special_surrogate_id", StringType(), False)
schema_with_surrogate = StructType(schema.fields + [surrogate_key_field])

# Loop because sometimes it works and sometimes it does't work
for i in range(11):
    
    df_filtered = df.select("id", "name", "_special_surrogate_id")   
    df_filtered = spark.createDataFrame(df_filtered.rdd, schema_with_surrogate)

    combined_df = df.withColumnRenamed("id", "id1").join(df_filtered.withColumnRenamed("id", "id2"), on="_special_surrogate_id")

    print("Diffs in Iteration " + str(i) + ":")
    print(combined_df.withColumn("diff", (col("id1") != col("id2")).cast("integer")).agg(sum("diff")).collect()[0][0])

python apache-spark pyspark rdd azure-synapse
1个回答
0
投票

问题:

orderBy(lit('A'))
用于确定 row_number() 排序。这不是定义行数值的确定性方法。

Spark幕后:

Spark 使用多个执行器来完成行编号过程。为此,Spark 正在执行第一遍,其中每个执行程序对执行程序拥有的数据进行排序。然后再进行一次对所有数据进行排序。

Alice
数据行和
Bob
数据行分别由不同的执行器处理时,作为第一遍的结果,它们都可以是
1
。然后 Spark 在第二遍中决定哪个数据行最终是
1
。然而,你的逻辑并没有告诉 Spark 如何做出这个决定,因此每次的决定可能不会以相同的方式做出。

推荐:

如果您的列组合可能会导致每行产生唯一的值集,则应在

orderBy
语句中使用该组合,以便为 Spark 提供一致且确定性的行号分配所需的信息。

例如

orderBy(F.col('id'),F.col('name'))

© www.soinside.com 2019 - 2024. All rights reserved.