我有一个带有预测输入数据集的数据集,其中包含一些通用列、一些特征列和一个标签列。我还有一个 xgb.Booster 类型的 xgboost 模型。如何使用 xgb 模型对输入数据集进行预测?这是我现在的代码
from pyspark.sql.functions import col, DataFrame, udf, array
from pyspark.sql.types import FloatType
feature_cols = ["feature1", "feature2", "feature3"]
label_col = "label"
all_cols = ["id1", "id2"] + feature_cols + label_col
pred_col_name = "pred"
prediction_path = "/input/path"
prediction_input = spark.read.parquet(prediction_path).select(all_cols)
for column in feature_cols:
prediction_input = prediction_input.withColumn(column, col(column).cast("float"))
def predict_udf(*features):
dmatrix = xgb.DMatrix(list(features))
return float(model.predict(dmatrix)[0])
predict_udf_spark = udf(predict_udf, FloatType())
mask_labeled = prediction_input.filter(col(label_col) != 0)
if mask_labeled.count() > 0:
prediction_labeled = mask_labeled.withColumn(pred_col_name, predict_udf_spark(*[mask_labeled[col] for col in feature_cols]))
prediction_labeled.select(*output_cols).repartition(1000).write.csv(output_path, header=False, mode="append")
但是这段代码给了我以下错误
jc = sc._jvm.functions.array(_to_seq(sc, cols, _to_java_column))
AttributeError: 'NoneType' object has no attribute '_jvm'
如何解决这个问题?
据我所知,您正在尝试在
xgboost
上下文中使用 xgboost 库的 spark
算法。请注意,在 spark
库中有一个专用的 xgboost
实现,您的代码似乎没有使用该实现(从您的 predict_udf
函数中,我了解到您正在尝试整理您的 pyspark
数据,执行预测,并将预测转换回 pyspark
格式)。通过使用 SparkXGBClassifier
来拟合您的模型并应用预测,您可以使您的代码变得更加容易。
SparkXGBClassifier
并使用其预测,其中代码是使用 pyspark==3.5.0
以 xgboost=2.0.0
、python==3.10.12
和 Google Colab
编写的。请注意,我的示例将机器学习部分保持在绝对最低限度,并且没有显示您应该如何训练机器学习模型(例如,实际上没有模型评估、超参数调整、验证等)
请注意,由于您没有提供任何示例数据或有关依赖项的信息,因此很难复制您的问题。使用我的
SparkXGBClassifier
示例中的数据后,我从此函数中收到错误:
def predict_udf(*features):
dmatrix = xgb.DMatrix(list(features))
return float(model.predict(dmatrix)[0])
这给了我错误:
ValueError: Please reshape the input data into 2-dimensional matrix.
如何在(大概)与您的数据类似的数据上训练
SparkXGBClassifier
的可复制示例:
# required for Google Colab
# !pip install pyspark
import random
from xgboost.spark import SparkXGBClassifier
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
spark = SparkSession.builder.appName('spark_session').getOrCreate()
dataset_size = 100_000
labels = [0, 1, 2]
feature_cols = ["feature1", "feature2", "feature3"]
label_col = "label"
pred_col_name = "pred"
all_cols = ["id1", "id2"] + feature_cols + [label_col]
data = [
(i+1, i+1001, random.random(), random.random() * 1_000, random.random() * 1_000, random.choice(labels))
for i in range(dataset_size)
]
prediction_input = spark.createDataFrame(data, ("id1", "id2", "feature1", "feature2", "feature3", 'label'))
# VectorAssembler required if you cannot train on GPU
vec_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
prediction_input = vec_assembler.transform(prediction_input)
prediction_input.show(5)
# +---+----+-------------------+-----------------+------------------+-----+--------------------+
# |id1| id2| feature1| feature2| feature3|label| features|
# +---+----+-------------------+-----------------+------------------+-----+--------------------+
# | 1|1001| 0.5020143404453867|317.9123523833789| 80.99056175390396| 1|[0.50201434044538...|
# | 2|1002| 0.5070869997709387|760.0823371718348| 939.3547071386726| 0|[0.50708699977093...|
# | 3|1003| 0.4075714302471459|196.3132088899051| 618.1676550139244| 2|[0.40757143024714...|
# | 4|1004| 0.2969972133808245|708.3811336815659| 633.2752459199171| 0|[0.29699721338082...|
# | 5|1005|0.18151957704419186|307.2738259905281|416.45499440549196| 2|[0.18151957704419...|
# +---+----+-------------------+-----------------+------------------+-----+--------------------+
# create a xgboost pyspark regressor estimator and set device="cuda"
classifier = SparkXGBClassifier(
features_col='features',
label_col=label_col
)
# train and return the model
model = classifier.fit(prediction_input)
# predict on test data
predict_df = model.transform(prediction_input)
predict_df.show(5)
# +---+----+-------------------+-----------------+------------------+-----+--------------------+--------------------+----------+--------------------+
# |id1| id2| feature1| feature2| feature3|label| features| rawPrediction|prediction| probability|
# +---+----+-------------------+-----------------+------------------+-----+--------------------+--------------------+----------+--------------------+
# | 1|1001| 0.5020143404453867|317.9123523833789| 80.99056175390396| 1|[0.50201434044538...|[0.45034444332122...| 2.0|[0.33440056443214...|
# | 2|1002| 0.5070869997709387|760.0823371718348| 939.3547071386726| 0|[0.50708699977093...|[0.59553760290145...| 0.0|[0.35163819789886...|
# | 3|1003| 0.4075714302471459|196.3132088899051| 618.1676550139244| 2|[0.40757143024714...|[0.48832395672798...| 2.0|[0.33687391877174...|
# | 4|1004| 0.2969972133808245|708.3811336815659| 633.2752459199171| 0|[0.29699721338082...|[0.60149568319320...| 0.0|[0.35905247926712...|
# | 5|1005|0.18151957704419186|307.2738259905281|416.45499440549196| 2|[0.18151957704419...|[0.26893869042396...| 2.0|[0.27391168475151...|
# +---+----+-------------------+-----------------+------------------+-----+--------------------+--------------------+----------+--------------------+