DataFrame.withColumn()使用用于管道的定制化UDF的速度非常慢

问题描述 投票:-1回答:1

如今,我正在使用ML和Pipeline pypark API实现模糊分类器,我的问题是在生成规则库(RB)的最后阶段,我想将结果预测为一个新的”预测”列。

为此,我使用df.withColumn(),它使用infrence_udf作为模糊原理进行决策。这样,我不知道为什么它不适用于大型数据集。(如果我将最后四个方法移到类中,则会遇到序列化问题。)用很小的数据集就可以了,用大数据集可以正确生成和传输规则,问题就在于withColumn的速度慢吗?您是否有类似的经验或想法?

class FuzzyClassifier(Estimator, HasFeaturesCol, HasLabelCol,HasInputCol, HasPredictionCol):
    .
    .
    .
    def _fit(self, df):
        rb_p,rb_n=self.GenerateRules(df)             
        fuzzyModel=FuzzyModel(labelCol=self.getLabelCol(),featuresCol=self.getFeaturesCol(),RB_Pos=rb_p,RB_Neg=rb_n)               
        return (fuzzyModel)

class FuzzyModel(Model,HasFeaturesCol, HasLabelCol):
    .
    .
    .
    def _transform(self, df):         
            RB_N=self.getRB_Neg()
            RB_P=self.getRB_Pos()
            MFsParams=Fuzzifier().getMFsParams()   
            df = df.withColumn('prediction', infrence_udf(RB_P,RB_N,MFsParams)('features'))
            return   df 

def infrence_udf(RB_P,RB_N,MFsParams):
            return F.udf(lambda l: infrence(l,RB_P,RB_N,MFsParams), DoubleType()) 

def infrence(sampleRow,RB_P,RB_N,MFsParams):
        p=getCoverageDegree(sampleRow,RB_P,MFsParams)
        n=getCoverageDegree(sampleRow,RB_N,MFsParams)
        if  p > n :       #  degree 'PositiveClass'  > 'NegetiveClass'
            predictedLabed = 1.0
        elif p < n:    #  degree  'PositiveClass'  > 'NegetiveClass'
            predictedLabed = 0.0
        else:
             predictedLabed = random.randrange(0, 2)
        return float(predictedLabed)

def getCoverageDegree(sampleRow, RB,MFsParams):        
    # calc sum of the mathching degrees of all rules and each sample row   
    RB=np.array(RB.reshape(RB.shape[0],RB.shape[2]))
    x=np.array([sampleRow,]*len(RB))  # sampleRow is replicated
    degree=np.sum(np.array([getMatchingDegree(xp,r,MFsParams) for xp,r in zip(x,RB)]))
    return degree


def getMatchingDegree(xp, r_i,MFsParams):  
    mfValues=np.array([trimf(xp_i,MFsParams[a_ji-1]) for xp_i,a_ji in zip(xp,r_i)])        
    m=np.prod(mfValues, axis=0)
    return m

def trimf(z,p):
        a,b,c=p[0],p[1],p[2]
        if (z<=a) or (z>=c):
            y = 0
        elif (a <= z <= b):
            y = (z-a) / (b-a)
        elif (b <= z <= c):
            y = (c-z) / (c-b)
        return y 
apache-spark pyspark apache-spark-sql bigdata user-defined-functions
1个回答
0
投票

这是由于您的编码方式。我会推荐1.用spark重写功能2.保持getCoverageDegree在withcolumn语句之前

© www.soinside.com 2019 - 2024. All rights reserved.