我想创建一个新列,其中包含
lookup
列中列出的列名称的值数组。
input_df = spark.createDataFrame([
Row(id=123, alert=1, operation=1, lookup=[]),
Row(id=234, alert=0, operation=0, lookup=['alert']),
Row(id=345, alert=1, operation=0, lookup=['operation']),
Row(id=456, alert=0, operation=1, lookup=['alert','operation']),
])
id | 警报 | 操作 | 查找 | 查找值 |
---|---|---|---|---|
123 | 1 | 1 |
|
|
234 | 0 | 0 |
|
|
345 | 1 | 0 |
|
|
456 | 0 | 1 |
|
|
input_df.withColumn("lookup_values", F.transform(F.col("lookup"), lambda x: input_df[f'{x}'])).show()
因错误而失败
AnalysisException:[UNRESOLVED_COLUMN.WITH_SUGGESTION] 无法解析名称为
的列或函数参数。做过 您是指以下之一吗? [Column<'x_1'>
、id
、alert
、operation
]。lookup
这个错误令人惊讶,因为下面的代码没有错误,但它也没有产生预期的结果。
input_df.withColumn("lookup_values", F.transform(F.col("lookup"), lambda x: input_df['alert'])).show()
id | 警报 | 操作 | 查找 | 查找值 |
---|---|---|---|---|
123 | 1 | 1 |
|
|
234 | 0 | 0 |
|
|
345 | 1 | 0 |
|
|
456 | 0 | 1 |
|
|
一种方法是将整行传递到 UDF 中,然后根据
lookup
列将查找值放入列表中:
@func.udf(returnType=ArrayType(IntegerType()))
def lookup_values_udf(row):
return [row[field] for field in row["lookup"]]
input_df.withColumn(
"lookup_values",
lookup_values_udf(func.struct([func.col(col) for col in input_df.columns]))
).show(
10, False
)
+---+-----+---------+------------------+-------------+
|id |alert|operation|lookup |lookup_values|
+---+-----+---------+------------------+-------------+
|123|1 |1 |[] |[] |
|234|0 |0 |[alert] |[0] |
|345|1 |0 |[operation] |[0] |
|456|0 |1 |[alert, operation]|[0, 1] |
+---+-----+---------+------------------+-------------+