Python在进程之间共享锁定

问题描述 投票:38回答:2

我试图使用部分函数,​​以便pool.map()可以定位具有多个参数的函数(在本例中为Lock()对象)。

这是示例代码(取自我之前的一个问题的答案):

from functools import partial

def target(lock, iterable_item):
    for item in items:
        # Do cool stuff
        if (... some condition here ...):
            lock.acquire()
            # Write to stdout or logfile, etc.
            lock.release()

def main():
    iterable = [1, 2, 3, 4, 5]
    pool = multiprocessing.Pool()
    l = multiprocessing.Lock()
    func = partial(target, l)
    pool.map(func, iterable)
    pool.close()
    pool.join()

但是,当我运行此代码时,我收到错误:

Runtime Error: Lock objects should only be shared between processes through inheritance.

我在这里错过了什么?如何在子进程之间共享锁?

python locking multiprocessing share
2个回答
72
投票

对不起,我应该在回答你的其他问题时发现这一点。你不能将正常的multiprocessing.Lock对象传递给Pool方法,因为它们无法被腌制。有两种方法可以解决这个问题。一个是创建Manager()并通过Manager.Lock()

def main():
    iterable = [1, 2, 3, 4, 5]
    pool = multiprocessing.Pool()
    m = multiprocessing.Manager()
    l = m.Lock()
    func = partial(target, l)
    pool.map(func, iterable)
    pool.close()
    pool.join()

不过,这有点重量级;使用Manager需要生成另一个进程来托管Manager服务器。所有对acquire / release锁的调用都必须通过IPC发送到该服务器。

另一种选择是使用multiprocessing.Lock() kwarg在Pool创建时传递常规initializer。这将使您的锁实例在所有子工作者中全局:

def target(iterable_item):
    for item in items:
        # Do cool stuff
        if (... some condition here ...):
            lock.acquire()
            # Write to stdout or logfile, etc.
            lock.release()
def init(l):
    global lock
    lock = l

def main():
    iterable = [1, 2, 3, 4, 5]
    l = multiprocessing.Lock()
    pool = multiprocessing.Pool(initializer=init, initargs=(l,))
    pool.map(target, iterable)
    pool.close()
    pool.join()

第二种解决方案具有不再需要partial的副作用。


0
投票

这是一个版本(使用Barrier而不是Lock,但你明白了)这也适用于Windows(缺少fork导致额外麻烦):

import multiprocessing as mp

def procs(uid_barrier):
    uid, barrier = uid_barrier
    print(uid, 'waiting')
    barrier.wait()
    print(uid, 'past barrier')    

def main():
    N_PROCS = 10
    with mp.Manager() as man:
        barrier = man.Barrier(N_PROCS)
        with mp.Pool(N_PROCS) as p:
            p.map(procs, ((uid, barrier) for uid in range(N_PROCS)))

if __name__ == '__main__':
    mp.freeze_support()
    main()
© www.soinside.com 2019 - 2024. All rights reserved.