我正在对来自同一原始数据帧的两个数据帧进行联接。然后,这些会遭受一些聚合,并且除了用于连接的列之外,所选的列不相等。
所以我们可以说在 df1 中我们得到列 [a,b,max(c),min(d)] 并且在 df2 中我们选择 [a,b,e, avg(f)] 并且我想加入它们以便我的最终 df 有 [a,b,max(c), min(d),e, avg(f)] 所以类似
df_final = df1.join(df2, (df1.a == df2.a) & (df1.b == df2.b))\
.select('a','b', 'max(c)', 'min(d)', 'e', 'avg(f)')
应该可以解决问题。
在我的具体情况下,连接条件也有一个聚合,但它不相关。
这是带有警告的真实示例:
df_3 =dfs_dict['trigger']\
.groupBy(
F.col('opportunity_id'),
F.col('action_status').alias('latest_urgency')) \
.agg({'user_id': 'max', 'write_date': 'min'}) \
.withColumnRenamed('min(write_date)', 'latest_urgency_date') \
.withColumnRenamed('max(user_id)', 'user_id')
window = Window.partitionBy("opportunity_id") \
.orderBy(F.col("write_date").desc())
df_4 = dfs_dict['trigger'].select(
'opportunity_id',
'write_date',
'action_status',
'opportunity_type',
'user_id',
F.row_number().over(window).alias('row_num')) \
.where(((F.col("action_status").isNotNull()) & (F.col("action_status") != ''))) \
.where(F.col('row_num') == 1)
以及列的输出:
df_3.columns
# ['opportunity_id', 'latest_urgency', 'latest_urgency_date', 'user_id']
df_4.columns
# ['opportunity_id', 'write_date', 'action_status', 'opportunity_type', 'user_id', 'row_num']
和加入:
df_5 = df_3.join(df_4,
(df_3.opportunity_id == df_4.opportunity_id) & (
df_3.latest_urgency == df_4.action_status)
).select(df_3.opportunity_id,
df_3.latest_urgency,
df_4.opportunity_type,
df_3.user_id,
df_3.latest_urgency_date)
这会抱怨列opportunity_type不明确:
AnalysisException:列 opportunity_type#1559 不明确。这可能是因为您将多个数据集连接在一起,并且其中一些数据集是相同的。此列指向其中一个数据集,但 Spark 无法确定是哪一个。在加入数据集之前,请通过
使用不同的名称为数据集别名,并使用限定名称指定列,例如Dataset.as
。您还可以将 Spark.sql.analyzer.failAmbigouslySelfJoin 设置为 false 以禁用此检查。df.as("a").join(df.as("b"), $"a.id" > $"b.id")
我想了解为什么。即使我在聚合之前在 df_3 中进行选择,错误仍然存在。我已经检查了查询计划,由于它们来自相同的原始 df,它们都扫描有问题的列,但最终只选择了一个......
如果我删除 opportunity_type 之前的 df_4 并将其称为“opportunity_type”,错误就会消失,这让我更加困惑。 (我也尝试过使用 dfs 的别名,但出现了同样的错误)
无论如何,我想了解为什么会发生这种情况?避免它/正确执行的最佳方法应该是什么。
请检查输入数据框中的列。 dfs_dict['触发']
您的输入数据框可能具有重复的列名称。