当n_jobs > 1时,Joblib不调用自定义函数。

问题描述 投票:1回答:1

我有一个带数据的例子。

正如你从代码中看到的,每次调用函数 fit_by_idx() 要打印 'here'但它没有。一切都很好,当 n_jobs=1但如果 n_jobs 是多,比 joblib 不调用该函数。

代码。

import statsmodels.tsa.holtwinters as holtwinters
import pandas as pd
import numpy as np
from joblib import Parallel, delayed

train = pd.read_csv('train.csv').drop(columns=['id'])


def iter_predict(data, model, steps, fit_args=[],  fit_kwargs={}): # steps - кол. предсказываемых точек
    def fit_by_idx(idx):
        print('here')
        endog = data.iloc[idx]
        fitted = model(endog).fit(*fit_args, optimized=False, **fit_kwargs)\
        res[idx, :] = fitted.forecast(steps)

    res = np.zeros((data.shape[0], steps))
    Parallel(n_jobs=2)(delayed(fit_by_idx)(idx) for idx in range(data.shape[0]))
    return res

iter_predict(train, holtwinters.SimpleExpSmoothing, 2, fit_kwargs={'smoothing_level': 0.5})

这里是链接到 数据集.

python python-3.x parallel-processing joblib parallelism-amdahl
1个回答
0
投票

Q : "如果 n_jobs 是多,比 joblib 不调用函数."

好吧,它确实是这样的(你可以检查PID & PPID数字),它只是不显示结果的 print( "here" )

使用API文档中的定义。

print( *objects, sep = ' ', end = '\n', file = sys.stdout, flush = False ) 从执行 flush = True

然而,以后要面临更多的麻烦。joblib-棋子(除非迫不得已,否则,要付出对性能产生不利影响的代价,如果回到纯-。[SERIAL],GIL控制的重新[SERIAL]-任何代码的执行。n_jobs 再一步一步地运行,这是说不通的,因为你支付了所有实例化的成本和其他开销,但却没有从中得到任何加速的好处,不是吗?). 使用。

def iter_preDEMO( data,            # Pandas DF-alike data
                  #other args removed for MCVE-clarity
                  ):

    def fit_by_idx( idx ): #-------------------------------------[FUNCTION]-def-<start> To be transferred to each remote-joblib-initiated process(es)

        print( 'here[{0:_>4d}(PPID:PID={1:_>7d}:{2::>7d})]'.format( idx,
                                                                    os.getppid(), # test joblib-[FUNCTION]-def-transfer here with: lambda x = "_{0:}_" : x.format( os.getppid() )
                                                                    os.getpid()   # test joblib-[FUNCTION]-def-transfer here with: lambda x = "_{0:}_" : x.format( os.getpid()  )
                                                                    ),
                end   = "\t",
                flush = True
                )
    #------------------------------------------------------------[FUNCTION]-def-<end>

    res = np.zeros( ( data.shape[0], 3 ) )
    for aBackEND in ( 'threading', 'loky', 'multiprocessing' ):
        try:
             print( "\n____________________________Going into ['{0:}']-backend".format( aBackEND ) )
             with parallel_backend( aBackEND, n_jobs = N_JOBS ):
                  Parallel( n_jobs = N_JOBS )( delayed( fit_by_idx )( pickled_SER_DES_copy_of_idx )
                                               for                    pickled_SER_DES_copy_of_idx in range( data.shape[0] )
                                               )
        finally:
             print( "\n_____________________________Exit from ['{0:}']-backend".format( aBackEND ) )
    return res

你将会看到这些东西是如何工作的,使用更详细一点。print()-ed结果

START: PID=_____22528

____________________________Going into ['threading']-backend
here[___0(PPID:PID=__22527:::22528)]    here[___1(PPID:PID=__22527:::22528)]    here[___2(PPID:PID=__22527:::22528)]    here[___3(PPID:PID=__22527:::22528)]    here[___4(PPID:PID=__22527:::22528)]    here[___5(PPID:PID=__22527:::22528)]    here[___6(PPID:PID=__22527:::22528)]    here[___7(PPID:PID=__22527:::22528)]    here[___8(PPID:PID=__22527:::22528)]    here[___9(PPID:PID=__22527:::22528)]    here[__10(PPID:PID=__22527:::22528)]    here[__11(PPID:PID=__22527:::22528)]    here[__12(PPID:PID=__22527:::22528)]    here[__13(PPID:PID=__22527:::22528)]    here[__14(PPID:PID=__22527:::22528)]    here[__15(PPID:PID=__22527:::22528)]    here[__16(PPID:PID=__22527:::22528)]    
_____________________________Exit from ['threading']-backend

____________________________Going into ['loky']-backend
here[___0(PPID:PID=__22527:::22528)]    here[___1(PPID:PID=__22527:::22528)]    here[___2(PPID:PID=__22527:::22528)]    here[___3(PPID:PID=__22527:::22528)]    here[___4(PPID:PID=__22527:::22528)]    here[___5(PPID:PID=__22527:::22528)]    here[___6(PPID:PID=__22527:::22528)]    here[___7(PPID:PID=__22527:::22528)]    here[___8(PPID:PID=__22527:::22528)]    here[___9(PPID:PID=__22527:::22528)]    here[__10(PPID:PID=__22527:::22528)]    here[__11(PPID:PID=__22527:::22528)]    here[__12(PPID:PID=__22527:::22528)]    here[__13(PPID:PID=__22527:::22528)]    here[__14(PPID:PID=__22527:::22528)]    here[__15(PPID:PID=__22527:::22528)]    here[__16(PPID:PID=__22527:::22528)]    
_____________________________Exit from ['loky']-backend

____________________________Going into ['multiprocessing']-backend
here[___0(PPID:PID=__22527:::22528)]    here[___1(PPID:PID=__22527:::22528)]    here[___2(PPID:PID=__22527:::22528)]    here[___3(PPID:PID=__22527:::22528)]    here[___4(PPID:PID=__22527:::22528)]    here[___5(PPID:PID=__22527:::22528)]    here[___6(PPID:PID=__22527:::22528)]    here[___7(PPID:PID=__22527:::22528)]    here[___8(PPID:PID=__22527:::22528)]    here[___9(PPID:PID=__22527:::22528)]    here[__10(PPID:PID=__22527:::22528)]    here[__11(PPID:PID=__22527:::22528)]    here[__12(PPID:PID=__22527:::22528)]    here[__13(PPID:PID=__22527:::22528)]    here[__14(PPID:PID=__22527:::22528)]    here[__15(PPID:PID=__22527:::22528)]    here[__16(PPID:PID=__22527:::22528)]    
_____________________________Exit from ['multiprocessing']-backend

 [[0. 0. 0.]
  [0. 0. 0.]
  ...
  ]

另外,请检查 这个这个 在您的操作系统上,与您的实际 joblib 和(隐藏)腌制-SERDES-tools版本。

© www.soinside.com 2019 - 2024. All rights reserved.