我正在尝试在更大的数据集(25k-250k 行,约 100 列)上训练相当多的 scikit 模型 代码看起来像这样
def workers(params)
X_train, X_test, Y_train, Y_test, classifier, ... = params
training...
assessing...
return assessment, trainedmodel
def wrapper (X_train, X_test, Y_train, Y_test, classifier...)
params = (X_train, X_test, Y_train, Y_test, classifier...)
with multiprocessing.Pool(processes=workercount) as pool:
results = pool.map(workers, params)
pool.close()
assessments, trainedmodels = zip(*results)
return df, models
def anotherwrapper(X_train, X_test, Y_train, Y_test, classifier...)
other code...
results2, models2 = differentwrapper(X_train, X_test, Y_train, Y_test, classifier...)
...other code...
return ... resultscombined, models2...
def main()
dataprep....
resultscombined, ... models2, ... = anotherwrapper(X_train, X_test, Y_train, Y_test, classifier...)
并让我 multiprocessing.pool.RemoteTraceback:
Traceback (most recent call last):
pool.py line 125, in worker
result = (True, func(*args, **kwds))
pool.py line 48, in mapstar
return list(map(*args))
[[[mycode]]] line 336, in workers
trainedmodel_post = thismitigator.fit(X_train_copy, Y_train_copy, sensitive_features=X_train[acolumn])
exponentiated_gradient [[[fairlearn]]] line 140, in fit
lagrangian = _Lagrangian(
lagrangian [[[fairlearn]]] line 78, in __init__
self.constraints.load_data(X, y, **kwargs)
[[[mycode]]] line 457, in trainandtestmitigatingmodel
results = pool.map(workers, params)
pool.py line 367, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
pool.py line 774, in get
raise self._value
AssertionError: data can be loaded only once
只有当我训练很多模型时,这个问题才会出现。 只需要几个就可以正常工作。
如何让他们在使用前创建自己的数据副本? 我尝试过改变工人来使用
def workers(params):
X_train, Y_train, X_test, Y_test, classifier, parameters, sensitive_feature, mitigator, constraint, sensitive_feature_array = params
X_train_copy = X_train.copy()
X_test_copy = X_test.copy()
Y_train_copy = Y_train.copy()
Y_test_copy = Y_test.copy()
但是没有效果。
那些在调用中添加“*workercount”并创建单独的“params”解决了问题:
params = [(X_train, Y_train, X_test, Y_test, classifier, ...) for classifier, parameters in classifiersAndParameters for mitigator in mitigators for constraint in constraints] * workercount
with multiprocessing.Pool(processes=workercount) as pool:
params = params[:workercount]
results = pool.map(workers, params)