需要根据条件通过迭代列表向 PySpark DF 添加新列。
new_line_id = 数组('a', 'b', 'c')
输入DF(LineID不是这个DF中的PK):
|线路 ID |
| --------|
|米|
|空 |
| T |
|空 |
|空 |
| P|
列表包含的项目数量与线路 ID 中的 Null 数量完全相同。基本上只要 LineID 为空,就从列表中选择一个值。
所需输出:
|线路 ID |新专栏|
| --------| ----------|
|中号 |中号 |
|空 |一个 |
| T | T |
|空 | b |
|空 | c |
|普 | P|
以下代码在每一行中添加整个列表。 不起作用的代码:
new_df = df.withColumn("new_col", when(df.line_id.isNull(), array([lit(x) for x in new_line_id]).cast(StringType()).otherwise(df.line_id)
你可以试试这个:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, when, col, udf
from pyspark.sql.types import StringType
df.show()
windowSpec = Window.orderBy(col("LineID")).rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_with_row_num = df.withColumn("row_num", when(col("LineID").isNull(), row_number().over(windowSpec)).otherwise(None))
def assign_value(line_id, row_num):
if line_id is None:
return new_line_id[row_num - 1] # subtract 1 because Python list indexing starts at 0
else:
return line_id
assign_value_udf = udf(assign_value, StringType())
new_df = df_with_row_num.withColumn("new_col", assign_value_udf(col("LineID"), col("row_num")))
new_df.show()