将嵌套列表添加到多处理池中的共享字典中,并在上下文之后保存到JSON

问题描述 投票:0回答:1

我正在尝试在多进程池中运行一个进程:

from multiprocessing.pool import Pool
from multiprocessing import Manager

manager = Manager()
lock = manager.Lock()
data_dict = manager.dict({data_subset: {}})
with Pool(processes=cpu_count()-2) as p:
        with tqdm(total=len(paths)) as pbar:
            for v in p.imap_unordered(
                partial(extract_video, root_dir=opt.data_path, dataset=dataset, output_path=opt.output_path, data_dict=data_dict, data_subset=data_subset),
                paths
            ):
                pbar.update()

print(data_dict.copy())
# Save the data_dict to a JSON file
with open(os.path.join(opt.output_path, "data_dict.json"), 'w') as json_file:
   print('writing to json')
   json.dump(data_dict.copy(), json_file)

这是 extract_video 函数:

def extract_video(video, root_dir, dataset, output_path, data_dict, data_subset):
    try:
       # some code to create image crops and other variables
       # ....
       for j, crop in enumerate(crops):
          image_path = os.path.join(output_path, id, "{}_{}.png".format(i, j))
          cv2.imwrite(image_path, crop)

          # Update data_dict
          label = 0  # Assuming label is 0, you can modify this based on your logic

          # Tried without lock as well
          with lock:
             if id not in data_dict[data_subset]:                    
                 data_dict[data_subset][id] = {'label': label, 'list': []} # tried manager.list() as well 
             data_dict[data_subset][id]['list'] += [image_path]  # tried append method on list as well
             # data_dict[data_subset][id] = {'label': label, 'list': [image_path]}  # Tried this by adding image_path directly to the list as well
        print(data_dict)
     except Exception as e:
        print("Error:", e)

现在的问题是,如果我注释将嵌套对象添加到 data_dict 的行,池上下文中的所有代码都可以正常工作。但是当我取消注释这些行时,出现了一些例外。我尝试使用一些虚拟字典和列表来调试代码片段,如 this 线程中讨论的那样,但没有成功。

python json multiprocessing python-multiprocessing
1个回答
0
投票

想象一下

d
是一个使用以下命令创建的托管字典:

inner_dict_key = 'nested_dict'

...
with Manager() as manager:
    d = manager.dict({inner_dict_key: {}})

d
实际上是执行
Manager()
时创建的进程中存在的实际字典的代理。现在考虑:

d[inner_dict_key]['x'] = 1

这相当于:

inner_dict = d[inner_dict_key]
inner_dict['x'] = 1

在代理上进行调用,

d[inner_dict_key]
,然后将方法调用和参数编组,并将它们发送到管理器的进程以在实际字典上执行。返回给调用者的
inner_dict
是嵌套字典。但是这个嵌套字典是管理器进程中存在的字典的副本。也就是说,
inner_dict
驻留在当前进程的内存中。因此,作业
inner_dict['x'] = 1
仅更新本地副本,您会看到您所看到的。

但是,如果您有:

inner_dict = d[inner_dict_key]
inner_dict['x'] = 1
d[inner_dict_key] = inner_dict

现在您正在更新驻留在管理器进程中的副本(可共享字典),并完全替换嵌套字典。这可能很昂贵。但更糟糕的是,考虑以下完整的程序:

from multiprocessing import Pool, Manager
from functools import partial
import time

inner_dict_key = 'nested_dict'

def worker(d, new_key, new_value):
    inner_dict = d[inner_dict_key]
    inner_dict[new_key] = new_value
    # Give a second pool process a chance to process a task:
    time.sleep(1)
    d[inner_dict_key] = inner_dict

def main():
    with Manager() as manager, Pool() as pool:
        d = manager.dict({inner_dict_key: {}})
        pool.starmap(partial(worker, d), (('x', 1), ('y', 2)))
        print(d)

if __name__ == "__main__":
    main()

打印:

{'nested_dict': {'y': 2}}

由于竞争条件,嵌套字典中仅添加了一个键。这与让两个进程执行

x.value += 1
相同,其中
x
是共享
multiprocessing.Value
实例。这两种情况的问题在于,获取嵌套字典、更新它并将其存储回不是原子操作。需要锁定:

from multiprocessing import Pool, Manager, Lock
from functools import partial
import time

inner_dict_key = 'nested_dict'

def init_pool_processes(the_lock):
    global lock

    lock = the_lock

def worker(d, new_key, new_value):
    with lock:
        inner_dict = d[inner_dict_key]
        inner_dict[new_key] = new_value
        # Give a second pool process a chance to process a task:
        time.sleep(1)
        d[inner_dict_key] = inner_dict

def main():
    with Manager() as manager, \
    Pool(initializer=init_pool_processes, initargs=(Lock(),)) as pool:
        d = manager.dict({inner_dict_key: {}})
        pool.starmap(partial(worker, d), (('x', 1), ('y', 2)))
        print(d)

if __name__ == "__main__":
    main()

打印:

{'nested_dict': {'x': 1, 'y': 2}}

另一种方法是使嵌套字典也成为托管字典:

from multiprocessing import Pool, Manager
from functools import partial

inner_dict_key = 'nested_dict'

def worker(d, new_key, new_value):
    # There is no longer a race condition:
    d[inner_dict_key][new_key] = new_value

def main():
    with Manager() as manager, Pool() as pool:
        inner_dict = manager.dict()
        d = manager.dict({inner_dict_key: inner_dict})
        pool.starmap(partial(worker, d), (('x', 1), ('y', 2)))
        d_copy = d.copy()
        d_copy[inner_dict_key] = inner_dict.copy()
        print(d_copy)

if __name__ == "__main__":
    main()

打印:

{'nested_dict': {'x': 1, 'y': 2}}

但是现在每次更新嵌套字典都需要两次代理调用。

选择你的毒药。

© www.soinside.com 2019 - 2024. All rights reserved.