我正在运行 pandas udf,如下所示:
def some_udf(df, keys = IDS_COLS, cols_to_keep = COLS_TO_KEEP):
INT_ID_COLUMN = '__iid'
df_keys = df.select(keys).distinct().withColumn(INT_ID_COLUMN, F.monotonically_increasing_id())
df = df.join(df_keys, keys)
pred_df_schema = df.select(INT_ID_COLUMN, *keys, 'tx_id', F.lit(0.0).alias('score'), 'class').schema
pred_with_some_udf = F.pandas_udf(
some_func,
returnType=pred_df_schema,
functionType=F.PandasUDFType.GROUPED_MAP
)
prediction_df = df.select(INT_ID_COLUMN, 'tx_id', 'class', *keys ,*cols_to_keep) \
.groupby(INT_ID_COLUMN) \
.apply(pred_with_some_udf) \
.drop(INT_ID_COLUMN)
return prediction_df
pandas 函数如下:
def some_func(df):
...
return df[[*keys, 'tx_id', 'score', 'class']]
我不断收到错误消息:
'运行时错误:返回的 pandas.DataFrame 的列数与指定的架构不匹配。预期:6 实际:5'
我无法弄清楚这个错误。我明确表示我期望 6 列:1 是按键分组,5 是每个组返回的值。有什么想法如何解决这个问题吗?
我的意思是你确实缺少一列作为回报 - 详细说明官方文档中的示例 - 请参阅使用
.assign
添加列,即所有原始列保持原样+还有一个额外的 - 因此显式返回分组列.
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()