我正在尝试创建一个自定义变压器作为我的管道中的一个阶段。我通过 SparkNLP 进行一些转换,接下来的一些转换则使用 MLlib。要将某个阶段的 SparkNLP 转换结果传递到下一个 MLlib 转换,我需要提取 Spark_nlp_col.result 列并传递它,并且我为此使用自定义转换阶段。 在安装管道后,我可以保留它,但是当我再次加载它时,我收到错误:
AttributeError:“DummyMod”对象没有属性“MyTransformer”
这是我的课:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import Param,Params,TypeConverters
class MyTransformer(Transformer,DefaultParamsWritable,DefaultParamsReadable):
inputCol = Param(Params._dummy(), "inputCol", "",TypeConverters.toString)
outputCol = Param(Params._dummy(), "outputCol", "",TypeConverters.toString)
def __init__(self,inputCol=None,outputCol=None):
super(MyTransformer, self).__init__()
self._setDefault(inputCol=None)
self._set(inputCol = inputCol)
self._setDefault(outputCol=None)
self._set(outputCol = outputCol)
def getInputCol(self):
return self.getOrDefault(self.inputCol)
def setInputCol(self, inputCol):
self._set(inputCol=inputCol)
def getOutputCol(self):
return self.getOrDefault(self.outputCol)
def setOutputCol(self, outputCol):
self._set(outputCol=outputCol)
def _transform(self, dataset):
in_col = self.getInputCol()
out_col = self.getOutputCol()
final_in_col = in_col+".result"
result = dataset.withColumn(out_col, dataset[final_in_col])
return result
我在其上创建了一个简单的包装函数以实现标准化,然后用它来创建管道、拟合并保存它:
def extract_col(cols, in_suffix, out_suffix):
return [MyTransformer(inputCol=col+in_suff, outputCol=col+out_suffix) for col in cols]
# stages before custom transformer
extractors = extract_col(cols, "_in", "_out")
# stages after custom transformer
stages = s1 + s2 + .. + extractors + .. + snlast + sn
pipeline = Pipeline(stages = stages)
fit_pipeline = pipeline.fit(data)
fit_pipeline.write().overwrite().save("path_to_store_at")
我如何读回它:
saved_pipeline = PipelineModel.load("path_where_stored")
然后我遇到了错误。 我尝试了多种编写自定义类的方法,使用 HasInputCol、HasOutputCol 等,但到目前为止没有任何效果。
有什么办法可以解决这个问题吗?
我也有同样的问题。有人可以帮忙吗?