我正在尝试并行化通过栅格成本面查找最小成本路径的任务,但我不断遇到相同的 PicklingError:无法腌制任务以将其发送给工作人员。
这是正在发生的事情的代码示例:
import numpy as np
from skimage.graph import MCP_Geometric
import timeit
from joblib import Parallel, delayed
np.random.seed(123)
cost_surface = np.random.rand(1000, 1000)
mcp = MCP_Geometric(cost_surface)
pois = [(np.random.randint(0, 1000), np.random.randint(0, 1000)) for _ in range(20)]
def task(poi):
costs_array, traceback = mcp.find_costs(starts=[poi], ends=pois)
ends_idx = tuple(np.asarray(pois).T.tolist())
costs = costs_array[ends_idx]
tracebacks = [mcp.traceback(end) for end in pois]
Parallel(n_jobs=6)(delayed(task)(poi) for poi in pois)
我对并行化任务相当陌生,但如果按顺序完成并且想要利用并行,我正在运行的代码可能需要数周时间。我知道某些复杂的对象无法进行酸洗,因此我也在寻找替代方案。
这是因为 MCP_Geometric 是不可选取的。您需要将此类的初始化移至
task
函数中:
import numpy as np
from skimage.graph import MCP_Geometric
import timeit
from joblib import Parallel, delayed
np.random.seed(123)
cost_surface = np.random.rand(1000, 1000)
pois = [(np.random.randint(0, 1000), np.random.randint(0, 1000)) for _ in range(20)]
def task(poi):
# --- each process will have it's own version of the `mcp` class
mcp = MCP_Geometric(cost_surface)
costs_array, traceback = mcp.find_costs(starts=[poi], ends=pois)
ends_idx = tuple(np.asarray(pois).T.tolist())
costs = costs_array[ends_idx]
tracebacks = [mcp.traceback(end) for end in pois]
Parallel(n_jobs=6)(delayed(task)(poi) for poi in pois)