使用 pyspark 数据帧执行 xgboost 预测

问题描述 投票:0回答:1

我有一个带有预测输入数据集的数据集,其中包含一些通用列、一些特征列和一个标签列。我还有一个 xgb.Booster 类型的 xgboost 模型。如何使用 xgb 模型对输入数据集进行预测?这是我现在的代码

from pyspark.sql.functions import col, DataFrame, udf, array
from pyspark.sql.types import FloatType

feature_cols = ["feature1", "feature2", "feature3"]
label_col = "label"
all_cols = ["id1", "id2"] + feature_cols + label_col
pred_col_name = "pred"
prediction_path = "/input/path"

prediction_input = spark.read.parquet(prediction_path).select(all_cols)

for column in feature_cols:
    prediction_input = prediction_input.withColumn(column, col(column).cast("float"))

def predict_udf(*features):
    dmatrix = xgb.DMatrix(list(features))
    return float(model.predict(dmatrix)[0])

predict_udf_spark = udf(predict_udf, FloatType())

mask_labeled = prediction_input.filter(col(label_col) != 0)

if mask_labeled.count() > 0:
    prediction_labeled = mask_labeled.withColumn(pred_col_name, predict_udf_spark(*[mask_labeled[col] for col in feature_cols]))
    prediction_labeled.select(*output_cols).repartition(1000).write.csv(output_path, header=False, mode="append")

但是这段代码给了我以下错误

jc = sc._jvm.functions.array(_to_seq(sc, cols, _to_java_column))
AttributeError: 'NoneType' object has no attribute '_jvm'

如何解决这个问题?

python pyspark prediction xgboost
1个回答
0
投票

据我所知,您正在尝试在

xgboost
上下文中使用 xgboost 库
spark
算法。请注意,在
spark
库中有一个专用的
xgboost
实现,您的代码似乎没有使用该实现(从您的
predict_udf
函数中,我了解到您正在尝试整理您的
pyspark
数据,执行预测,并将预测转换回
pyspark
格式)。通过使用
SparkXGBClassifier
来拟合您的模型并应用预测,您可以使您的代码变得更加容易。

您可以在下面找到一个完全可重现的示例,展示如何拟合

SparkXGBClassifier
并使用其预测,其中代码是使用
pyspark==3.5.0
xgboost=2.0.0
python==3.10.12
Google Colab
编写的。请注意,我的示例将机器学习部分保持在绝对最低限度,并且没有显示您应该如何训练机器学习模型(例如,实际上没有模型评估、超参数调整、验证等)

请注意,由于您没有提供任何示例数据或有关依赖项的信息,因此很难复制您的问题。使用我的

SparkXGBClassifier
示例中的数据后,我从此函数中收到错误:

def predict_udf(*features):
    dmatrix = xgb.DMatrix(list(features))
    return float(model.predict(dmatrix)[0])

这给了我错误:

ValueError: Please reshape the input data into 2-dimensional matrix.

如何在(大概)与您的数据类似的数据上训练

SparkXGBClassifier
的可复制示例:

# required for Google Colab 
# !pip install pyspark

import random

from xgboost.spark import SparkXGBClassifier
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder.appName('spark_session').getOrCreate()

dataset_size = 100_000
labels = [0, 1, 2]

feature_cols = ["feature1", "feature2", "feature3"]
label_col = "label"
pred_col_name = "pred"
all_cols = ["id1", "id2"] + feature_cols + [label_col]

data = [
    (i+1, i+1001, random.random(), random.random() * 1_000, random.random() * 1_000, random.choice(labels))
    for i in range(dataset_size)
]

prediction_input = spark.createDataFrame(data, ("id1", "id2", "feature1", "feature2", "feature3", 'label'))

# VectorAssembler required if you cannot train on GPU
vec_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
prediction_input = vec_assembler.transform(prediction_input)


prediction_input.show(5)

# +---+----+-------------------+-----------------+------------------+-----+--------------------+
# |id1| id2|           feature1|         feature2|          feature3|label|            features|
# +---+----+-------------------+-----------------+------------------+-----+--------------------+
# |  1|1001| 0.5020143404453867|317.9123523833789| 80.99056175390396|    1|[0.50201434044538...|
# |  2|1002| 0.5070869997709387|760.0823371718348| 939.3547071386726|    0|[0.50708699977093...|
# |  3|1003| 0.4075714302471459|196.3132088899051| 618.1676550139244|    2|[0.40757143024714...|
# |  4|1004| 0.2969972133808245|708.3811336815659| 633.2752459199171|    0|[0.29699721338082...|
# |  5|1005|0.18151957704419186|307.2738259905281|416.45499440549196|    2|[0.18151957704419...|
# +---+----+-------------------+-----------------+------------------+-----+--------------------+

# create a xgboost pyspark regressor estimator and set device="cuda"
classifier = SparkXGBClassifier(
  features_col='features',
  label_col=label_col
)

# train and return the model
model = classifier.fit(prediction_input)

# predict on test data
predict_df = model.transform(prediction_input)
predict_df.show(5)

# +---+----+-------------------+-----------------+------------------+-----+--------------------+--------------------+----------+--------------------+
# |id1| id2|           feature1|         feature2|          feature3|label|            features|       rawPrediction|prediction|         probability|
# +---+----+-------------------+-----------------+------------------+-----+--------------------+--------------------+----------+--------------------+
# |  1|1001| 0.5020143404453867|317.9123523833789| 80.99056175390396|    1|[0.50201434044538...|[0.45034444332122...|       2.0|[0.33440056443214...|
# |  2|1002| 0.5070869997709387|760.0823371718348| 939.3547071386726|    0|[0.50708699977093...|[0.59553760290145...|       0.0|[0.35163819789886...|
# |  3|1003| 0.4075714302471459|196.3132088899051| 618.1676550139244|    2|[0.40757143024714...|[0.48832395672798...|       2.0|[0.33687391877174...|
# |  4|1004| 0.2969972133808245|708.3811336815659| 633.2752459199171|    0|[0.29699721338082...|[0.60149568319320...|       0.0|[0.35905247926712...|
# |  5|1005|0.18151957704419186|307.2738259905281|416.45499440549196|    2|[0.18151957704419...|[0.26893869042396...|       2.0|[0.27391168475151...|
# +---+----+-------------------+-----------------+------------------+-----+--------------------+--------------------+----------+--------------------+
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.