我正在尝试在多进程池中运行一个进程:
from multiprocessing.pool import Pool
from multiprocessing import Manager
manager = Manager()
lock = manager.Lock()
data_dict = manager.dict({data_subset: {}})
with Pool(processes=cpu_count()-2) as p:
with tqdm(total=len(paths)) as pbar:
for v in p.imap_unordered(
partial(extract_video, root_dir=opt.data_path, dataset=dataset, output_path=opt.output_path, data_dict=data_dict, data_subset=data_subset),
paths
):
pbar.update()
print(data_dict.copy())
# Save the data_dict to a JSON file
with open(os.path.join(opt.output_path, "data_dict.json"), 'w') as json_file:
print('writing to json')
json.dump(data_dict.copy(), json_file)
这是 extract_video 函数:
def extract_video(video, root_dir, dataset, output_path, data_dict, data_subset):
try:
# some code to create image crops and other variables
# ....
for j, crop in enumerate(crops):
image_path = os.path.join(output_path, id, "{}_{}.png".format(i, j))
cv2.imwrite(image_path, crop)
# Update data_dict
label = 0 # Assuming label is 0, you can modify this based on your logic
# Tried without lock as well
with lock:
if id not in data_dict[data_subset]:
data_dict[data_subset][id] = {'label': label, 'list': []} # tried manager.list() as well
data_dict[data_subset][id]['list'] += [image_path] # tried append method on list as well
# data_dict[data_subset][id] = {'label': label, 'list': [image_path]} # Tried this by adding image_path directly to the list as well
print(data_dict)
except Exception as e:
print("Error:", e)
现在的问题是,如果我注释将嵌套对象添加到 data_dict 的行,池上下文中的所有代码都可以正常工作。但是当我取消注释这些行时,出现了一些例外。我尝试使用一些虚拟字典和列表来调试代码片段,如 this 线程中讨论的那样,但没有成功。
想象一下
d
是一个使用以下命令创建的托管字典:
inner_dict_key = 'nested_dict'
...
with Manager() as manager:
d = manager.dict({inner_dict_key: {}})
d
实际上是执行 Manager()
时创建的进程中存在的实际字典的代理。现在考虑:
d[inner_dict_key]['x'] = 1
这相当于:
inner_dict = d[inner_dict_key]
inner_dict['x'] = 1
在代理上进行调用,
d[inner_dict_key]
,然后将方法调用和参数编组,并将它们发送到管理器的进程以在实际字典上执行。返回给调用者的 inner_dict
是嵌套字典。但是这个嵌套字典是管理器进程中存在的字典的副本。也就是说,inner_dict
驻留在当前进程的内存中。因此,作业 inner_dict['x'] = 1
仅更新本地副本,您会看到您所看到的。
但是,如果您有:
inner_dict = d[inner_dict_key]
inner_dict['x'] = 1
d[inner_dict_key] = inner_dict
现在您正在更新驻留在管理器进程中的副本(可共享字典),并完全替换嵌套字典。这可能很昂贵。但更糟糕的是,考虑以下完整的程序:
from multiprocessing import Pool, Manager
from functools import partial
import time
inner_dict_key = 'nested_dict'
def worker(d, new_key, new_value):
inner_dict = d[inner_dict_key]
inner_dict[new_key] = new_value
# Give a second pool process a chance to process a task:
time.sleep(1)
d[inner_dict_key] = inner_dict
def main():
with Manager() as manager, Pool() as pool:
d = manager.dict({inner_dict_key: {}})
pool.starmap(partial(worker, d), (('x', 1), ('y', 2)))
print(d)
if __name__ == "__main__":
main()
打印:
{'nested_dict': {'y': 2}}
由于竞争条件,嵌套字典中仅添加了一个键。这与让两个进程执行
x.value += 1
相同,其中 x
是共享 multiprocessing.Value
实例。这两种情况的问题在于,获取嵌套字典、更新它并将其存储回不是原子操作。需要锁定:
from multiprocessing import Pool, Manager, Lock
from functools import partial
import time
inner_dict_key = 'nested_dict'
def init_pool_processes(the_lock):
global lock
lock = the_lock
def worker(d, new_key, new_value):
with lock:
inner_dict = d[inner_dict_key]
inner_dict[new_key] = new_value
# Give a second pool process a chance to process a task:
time.sleep(1)
d[inner_dict_key] = inner_dict
def main():
with Manager() as manager, \
Pool(initializer=init_pool_processes, initargs=(Lock(),)) as pool:
d = manager.dict({inner_dict_key: {}})
pool.starmap(partial(worker, d), (('x', 1), ('y', 2)))
print(d)
if __name__ == "__main__":
main()
打印:
{'nested_dict': {'x': 1, 'y': 2}}
另一种方法是使嵌套字典也成为托管字典:
from multiprocessing import Pool, Manager
from functools import partial
inner_dict_key = 'nested_dict'
def worker(d, new_key, new_value):
# There is no longer a race condition:
d[inner_dict_key][new_key] = new_value
def main():
with Manager() as manager, Pool() as pool:
inner_dict = manager.dict()
d = manager.dict({inner_dict_key: inner_dict})
pool.starmap(partial(worker, d), (('x', 1), ('y', 2)))
d_copy = d.copy()
d_copy[inner_dict_key] = inner_dict.copy()
print(d_copy)
if __name__ == "__main__":
main()
打印:
{'nested_dict': {'x': 1, 'y': 2}}
但是现在每次更新嵌套字典都需要两次代理调用。
选择你的毒药。