Pyspark 在连接后选择会引起歧义,但列应该只出现在其中一个数据帧中

问题描述 投票:0回答:1

我正在对来自同一原始数据帧的两个数据帧进行联接。然后,这些会遭受一些聚合,并且除了用于连接的列之外,所选的列不相等。

所以我们可以说在 df1 中我们得到列 [a,b,max(c),min(d)] 并且在 df2 中我们选择 [a,b,e, avg(f)] 并且我想加入它们以便我的最终 df 有 [a,b,max(c), min(d),e, avg(f)] 所以类似

df_final = df1.join(df2, (df1.a == df2.a) & (df1.b == df2.b))\
    .select('a','b', 'max(c)', 'min(d)', 'e', 'avg(f)') 

应该可以解决问题。

在我的具体情况下,连接条件也有一个聚合,但它不相关。

这是带有警告的真实示例:

df_3 =dfs_dict['trigger']\
            .groupBy(
            F.col('opportunity_id'),
            F.col('action_status').alias('latest_urgency')) \
            .agg({'user_id': 'max', 'write_date': 'min'}) \
            .withColumnRenamed('min(write_date)', 'latest_urgency_date') \
            .withColumnRenamed('max(user_id)', 'user_id')

window = Window.partitionBy("opportunity_id") \
            .orderBy(F.col("write_date").desc())
df_4 = dfs_dict['trigger'].select(
            'opportunity_id',
            'write_date',
            'action_status',
            'opportunity_type',
            'user_id',
            F.row_number().over(window).alias('row_num')) \
            .where(((F.col("action_status").isNotNull()) & (F.col("action_status") != ''))) \
            .where(F.col('row_num') == 1)

以及列的输出:

df_3.columns
# ['opportunity_id', 'latest_urgency', 'latest_urgency_date', 'user_id']
df_4.columns
# ['opportunity_id', 'write_date', 'action_status', 'opportunity_type', 'user_id', 'row_num']

和加入:

df_5 = df_3.join(df_4,
               (df_3.opportunity_id == df_4.opportunity_id) & (
                   df_3.latest_urgency == df_4.action_status)
               ).select(df_3.opportunity_id,
                        df_3.latest_urgency,
                        df_4.opportunity_type,
                        df_3.user_id,
                        df_3.latest_urgency_date)

这会抱怨列opportunity_type不明确:

AnalysisException:列 opportunity_type#1559 不明确。这可能是因为您将多个数据集连接在一起,并且其中一些数据集是相同的。此列指向其中一个数据集,但 Spark 无法确定是哪一个。在加入数据集之前,请通过

Dataset.as
使用不同的名称为数据集别名,并使用限定名称指定列,例如
df.as("a").join(df.as("b"), $"a.id" > $"b.id")
。您还可以将 Spark.sql.analyzer.failAmbigouslySelfJoin 设置为 false 以禁用此检查。

我想了解为什么。即使我在聚合之前在 df_3 中进行选择,错误仍然存在。我已经检查了查询计划,由于它们来自相同的原始 df,它们都扫描有问题的列,但最终只选择了一个......

如果我删除 opportunity_type 之前的 df_4 并将其称为“opportunity_type”,错误就会消失,这让我更加困惑。 (我也尝试过使用 dfs 的别名,但出现了同样的错误)

无论如何,我想了解为什么会发生这种情况?避免它/正确执行的最佳方法应该是什么。

dataframe apache-spark pyspark apache-spark-sql
1个回答
0
投票

请检查输入数据框中的列。 dfs_dict['触发']

您的输入数据框可能具有重复的列名称。

© www.soinside.com 2019 - 2024. All rights reserved.