pandas udf RuntimeError:返回的 pandas.DataFrame 的列数与指定的架构不匹配

问题描述 投票:0回答:1

我正在运行 pandas udf,如下所示:

def some_udf(df, keys = IDS_COLS, cols_to_keep = COLS_TO_KEEP):
    INT_ID_COLUMN = '__iid'

    df_keys = df.select(keys).distinct().withColumn(INT_ID_COLUMN, F.monotonically_increasing_id())
    df = df.join(df_keys, keys)

    pred_df_schema = df.select(INT_ID_COLUMN, *keys, 'tx_id', F.lit(0.0).alias('score'), 'class').schema

    pred_with_some_udf = F.pandas_udf(
        some_func,
        returnType=pred_df_schema,
        functionType=F.PandasUDFType.GROUPED_MAP
    )

    prediction_df = df.select(INT_ID_COLUMN, 'tx_id', 'class', *keys ,*cols_to_keep) \
        .groupby(INT_ID_COLUMN) \
        .apply(pred_with_some_udf) \
        .drop(INT_ID_COLUMN)
        
    return prediction_df

pandas 函数如下:

def some_func(df):
    ...
    return df[[*keys, 'tx_id', 'score', 'class']]

我不断收到错误消息:

'运行时错误:返回的 pandas.DataFrame 的列数与指定的架构不匹配。预期:6 实际:5'

我无法弄清楚这个错误。我明确表示我期望 6 列:1 是按键分组,5 是每个组返回的值。有什么想法如何解决这个问题吗?

pyspark pandas-udf
1个回答
0
投票

我的意思是你确实缺少一列作为回报 - 详细说明官方文档中的示例 - 请参阅使用

.assign
添加列,即所有原始列保持原样+还有一个额外的 - 因此显式返回分组列.

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()

请参阅此处文档

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.