如今,我正在使用ML和Pipeline pypark API实现模糊分类器,我的问题是在生成规则库(RB)的最后阶段,我想将结果预测为一个新的”预测”列。
为此,我使用df.withColumn(),它使用infrence_udf作为模糊原理进行决策。这样,我不知道为什么它不适用于大型数据集。(如果我将最后四个方法移到类中,则会遇到序列化问题。)用很小的数据集就可以了,用大数据集可以正确生成和传输规则,问题就在于withColumn的速度慢吗?您是否有类似的经验或想法?
class FuzzyClassifier(Estimator, HasFeaturesCol, HasLabelCol,HasInputCol, HasPredictionCol):
.
.
.
def _fit(self, df):
rb_p,rb_n=self.GenerateRules(df)
fuzzyModel=FuzzyModel(labelCol=self.getLabelCol(),featuresCol=self.getFeaturesCol(),RB_Pos=rb_p,RB_Neg=rb_n)
return (fuzzyModel)
class FuzzyModel(Model,HasFeaturesCol, HasLabelCol):
.
.
.
def _transform(self, df):
RB_N=self.getRB_Neg()
RB_P=self.getRB_Pos()
MFsParams=Fuzzifier().getMFsParams()
df = df.withColumn('prediction', infrence_udf(RB_P,RB_N,MFsParams)('features'))
return df
def infrence_udf(RB_P,RB_N,MFsParams):
return F.udf(lambda l: infrence(l,RB_P,RB_N,MFsParams), DoubleType())
def infrence(sampleRow,RB_P,RB_N,MFsParams):
p=getCoverageDegree(sampleRow,RB_P,MFsParams)
n=getCoverageDegree(sampleRow,RB_N,MFsParams)
if p > n : # degree 'PositiveClass' > 'NegetiveClass'
predictedLabed = 1.0
elif p < n: # degree 'PositiveClass' > 'NegetiveClass'
predictedLabed = 0.0
else:
predictedLabed = random.randrange(0, 2)
return float(predictedLabed)
def getCoverageDegree(sampleRow, RB,MFsParams):
# calc sum of the mathching degrees of all rules and each sample row
RB=np.array(RB.reshape(RB.shape[0],RB.shape[2]))
x=np.array([sampleRow,]*len(RB)) # sampleRow is replicated
degree=np.sum(np.array([getMatchingDegree(xp,r,MFsParams) for xp,r in zip(x,RB)]))
return degree
def getMatchingDegree(xp, r_i,MFsParams):
mfValues=np.array([trimf(xp_i,MFsParams[a_ji-1]) for xp_i,a_ji in zip(xp,r_i)])
m=np.prod(mfValues, axis=0)
return m
def trimf(z,p):
a,b,c=p[0],p[1],p[2]
if (z<=a) or (z>=c):
y = 0
elif (a <= z <= b):
y = (z-a) / (b-a)
elif (b <= z <= c):
y = (c-z) / (c-b)
return y
这是由于您的编码方式。我会推荐1.用spark重写功能2.保持getCoverageDegree在withcolumn语句之前