我正在训练Spark MLlib线性回归器,但我认为我不理解库的部分动手用法。
我有1个功能(NameItem
)和一个输出(Accumulator
)。第一个是类别(速度,温度等),第二个是双精度类型的数字。
训练集由几百万个条目组成,并且它们不是线性相关的(我已通过热图和相关索引进行了检查。
问题:我想通过线性回归估计Accumulator
值下的NameItem
值,但我认为这并不是我真正在做的。
问题:我该怎么办?
我首先将数据集划分为training set
和data set
:
(trainDF, testDF) = df.randomSplit((0.80, 0.20), seed=42)
[之后,我尝试了管道方法,如大多数教程所示:
1)我索引了NameItem
indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")
2)然后我对其进行编码
encoderInput = [indexer.getOutputCol()]
encoderOutput = ["EncodedItem"]
encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)
3)并且也组装了
assemblerInput = encoderOutput
assembler = VectorAssembler(inputCols=assemblerInput, outputCol="features")
此后,我继续进行有效的培训:
lr = LinearRegression(labelCol="Accumulator")
pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])
lrModel = pipeline.fit(trainDF)
这就是将预测应用于测试集时获得的结果:
predictions = lrModel.transform(testDF).show(5, False)
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
|NameItem |Accumulator |CategorizedItem|EncodedItem |features |prediction |
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
|Speed |44000.00000000 |265.0 |(688,[265],[1.0])|(689,[265,688],[1.0,44000.0]) |44000.100892495786|
|Speed |245000.00000000 |265.0 |(688,[265],[1.0])|(689,[265,688],[1.0,245000.0]) |245000.09963708033|
|Temp |4473860.00000000 |66.0 |(688,[66],[1.0]) |(689,[66,688],[1.0,4473860.0]) |4473859.874261986 |
|Temp |6065.00000000 |66.0 |(688,[66],[1.0]) |(689,[66,688],[1.0,6065.0]) |6065.097757082314 |
|Temp |10140.00000000 |66.0 |(688,[66],[1.0]) |(689,[66,688],[1.0,10140.0]) |10140.097731630483|
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+
only showing top 5 rows
对于相同的分类特征(例如Temp
),我怎么可能得到3个不同的预测?
尽管它们非常接近预期值,但我还是感觉有些问题。
对于相同的分类特征(例如
Temp
),我怎么可能得到3个不同的预测?
这是因为您的输出Accumulator
某种程度上已经进入features
了(当然不应该这样),因此模型只是“预测”(基本上是复制)输入的这一部分;这就是为什么预测so“准确” ...
似乎VectorAssembler
弄乱了事情。事实是,您实际上并不需要VectorAssembler
,因为实际上您仅具有“单一”功能(EncodedItem
中的单热编码稀疏矢量)。这[[might是VectorAssembler
的行为在这里如此的原因(要求“组装”单个功能),但是无论如何这都是一个bug。
VectorAssembler
,然后将EncodedItem
直接重命名为features
,即:indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")
encoderInput = [indexer.getOutputCol()]
encoderOutput = ["features"] # 1st change
encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)
lr = LinearRegression(labelCol="Accumulator")
pipeline = Pipeline(stages=[indexer, encoder, lr]) # 2nd change
lrModel = pipeline.fit(trainDF)
UPDATE(在评论后)
不幸的是,仅由于我无法访问您正在使用的Spark 1.4.4,我无法重现此问题。但是我已经确认,在最新版本的Spark2.4.4
中它可以正常工作,这使我更加倾向于相信v1.4中确实存在一些错误,但是后来已解决。 >这是Spark 2.4.4中的复制品,使用了一些类似于您的虚拟数据:
spark.version # '2.4.4' from pyspark.ml.feature import VectorAssembler, OneHotEncoderEstimator, StringIndexer from pyspark.ml.regression import LinearRegression from pyspark.ml import Pipeline # dummy data resembling yours: df = spark.createDataFrame([['Speed', 44000], ['Temp', 23000], ['Temp', 5000], ['Speed', 75000], ['Weight', 5300], ['Height', 34500], ['Weight', 6500]], ['NameItem', 'Accumulator']) df.show() # result: +--------+-----------+ |NameItem|Accumulator| +--------+-----------+ | Speed| 44000| | Temp| 23000| | Temp| 5000| | Speed| 75000| | Weight| 5300| | Height| 34500| | Weight| 6500| +--------+-----------+ indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep") encoderInput = [indexer.getOutputCol()] encoderOutput = ["EncodedItem"] encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput) assemblerInput = encoderOutput assembler = VectorAssembler(inputCols=assemblerInput, outputCol="features") lr = LinearRegression(labelCol="Accumulator") pipeline = Pipeline(stages=[indexer, encoder, assembler, lr]) lrModel = pipeline.fit(df) lrModel.transform(df).show() # predicting on the same df, for simplicity
最后transform
的结果是
+--------+-----------+---------------+-------------+-------------+------------------+ |NameItem|Accumulator|CategorizedItem| EncodedItem| features| prediction| +--------+-----------+---------------+-------------+-------------+------------------+ | Speed| 44000| 2.0|(4,[2],[1.0])|(4,[2],[1.0])| 59500.0| | Temp| 23000| 1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004| | Temp| 5000| 1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004| | Speed| 75000| 2.0|(4,[2],[1.0])|(4,[2],[1.0])| 59500.0| | Weight| 5300| 0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004| | Height| 34500| 3.0|(4,[3],[1.0])|(4,[3],[1.0])| 34500.0| | Weight| 6500| 0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004| +--------+-----------+---------------+-------------+-------------+------------------+
从您可以看到的地方:
features
现在做- 不是确实包含输出变量
Accumulator
的值;实际上,正如我在上面讨论的那样,features
现在与EncodedItem
相同,这使VectorAssembler
变得多余,这正是我们所期望的,因为我们只有一个功能。prediction
值对于相同的NameItem
值现在是相同的,再次如我们期望的那样,加上它们的准确性较差,因此更加真实。因此,最肯定的是,您的问题与所使用的 非常过时 Spark版本1.4.4有关。自v1.4起,Spark取得了飞跃,您应该认真考虑更新...