池多处理的 Python 包装器 |断言错误:数据只能加载一次

问题描述 投票:0回答:1

我正在尝试在更大的数据集(25k-250k 行,约 100 列)上训练相当多的 scikit 模型 代码看起来像这样

def workers(params)
 X_train, X_test, Y_train, Y_test, classifier, ... = params
 training...
 assessing...
 return assessment, trainedmodel

def wrapper (X_train, X_test, Y_train, Y_test, classifier...)
 params = (X_train, X_test, Y_train, Y_test, classifier...)
 with multiprocessing.Pool(processes=workercount) as pool:
        results = pool.map(workers, params)
 pool.close()
 assessments, trainedmodels = zip(*results)
 return df, models

def anotherwrapper(X_train, X_test, Y_train, Y_test, classifier...)
 other code...
results2, models2 = differentwrapper(X_train, X_test, Y_train, Y_test, classifier...)
 ...other code...
 return ... resultscombined, models2...

def main()
 dataprep....
 resultscombined, ... models2, ... = anotherwrapper(X_train, X_test, Y_train, Y_test, classifier...)

并让我 multiprocessing.pool.RemoteTraceback:

Traceback (most recent call last): 
pool.py line 125, in worker
    result = (True, func(*args, **kwds))
pool.py line 48, in mapstar
    return list(map(*args))
[[[mycode]]] line 336, in workers
    trainedmodel_post = thismitigator.fit(X_train_copy, Y_train_copy, sensitive_features=X_train[acolumn])
exponentiated_gradient [[[fairlearn]]] line 140, in fit
    lagrangian = _Lagrangian(
lagrangian [[[fairlearn]]] line 78, in __init__
    self.constraints.load_data(X, y, **kwargs)
[[[mycode]]] line 457, in trainandtestmitigatingmodel
    results = pool.map(workers, params)
pool.py line 367, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
pool.py line 774, in get
    raise self._value
AssertionError: data can be loaded only once

只有当我训练很多模型时,这个问题才会出现。 只需要几个就可以正常工作。

如何让他们在使用前创建自己的数据副本? 我尝试过改变工人来使用

def workers(params):
    X_train, Y_train, X_test, Y_test, classifier, parameters, sensitive_feature, mitigator, constraint, sensitive_feature_array = params
    X_train_copy = X_train.copy()
    X_test_copy = X_test.copy()
    Y_train_copy = Y_train.copy()
    Y_test_copy = Y_test.copy()

但是没有效果。

python-3.x scikit-learn python-multiprocessing fairlearn
1个回答
0
投票

那些在调用中添加“*workercount”并创建单独的“params”解决了问题:

    params = [(X_train, Y_train, X_test, Y_test, classifier, ...) for classifier, parameters in classifiersAndParameters for mitigator in mitigators for constraint in constraints] * workercount
with multiprocessing.Pool(processes=workercount) as pool:
    params = params[:workercount]
    results = pool.map(workers, params)
© www.soinside.com 2019 - 2024. All rights reserved.