在其他文件中的Python进程之间共享锁

问题描述 投票:0回答:1

我正在尝试使用多处理映射函数在我的列表上执行函数。然而,在这个函数中,我执行了一些不能同时执行的 API 调用,因为它会导致 API 过载。然而,我的代码分散在多个文件中,因为它很长。

我试图遵循这个解决方案,但它对我不起作用。我怀疑它不起作用,因为我正在处理多个文件。我得到的具体错误是:NameError: name 'lock' is not Defined.

特此简化我的代码:

main.py

import file2

def evaluate_entry_of_list(entry):
    response = file2.get_data(entry)
    # some calculations
    return results

def init_pool(given_lock):
    global lock
    lock = given_lock

if __name__ == '__main__':

    list = apicaller.get_list()
    t_lock = Lock()
    with Pool(8, initializer=init_pool, initargs=(t_lock,)) as pool:
        results = pool.map(evaluate_entry_of_list, list)
        process_results(results)

文件2.py

def make_call(url, body)-> requests.Response:
    lock.acquire()
    # Make API call
    lock.release()
    return response

我尝试过的其他解决方案:在 main 中定义变量并导入到 file2 并使用单独的类创建静态变量并导入到 file2 中。

python parallel-processing multiprocessing locking
1个回答
0
投票

自然有很多方法可以解决此类问题,但我建议使用

Process
对象创建自己的“池”。像
Lock
Queue
等这样的东西只能在进程创建时传递(这就是为什么在
pool
中它们必须传递给初始化器)。如果您控制流程创建,则传递这些对象会更容易一些。

这是一个演示(只是为了了解可以有多简单)

from multiprocessing import Process, Lock, Queue, cpu_count

from api import make_call, task_list

def worker(lock, in_queue, out_queue):
    for task in iter(in_queue.get, None):
        with lock:
            out_queue.put(make_call(task))

if __name__ == "__main__":
    api_lock = Lock()
    in_queue = Queue()
    out_queue = Queue()
    
    workers = [Process(target=worker, args=(api_lock, in_queue, out_queue)) for _ in range(cpu_count())]
    for worker in workers: worker.start()
    for task in task_list: in_queue.put(task)
    results = [out_queue.get() for _ in task_list]
    for _ in workers: in_queue.put(None)  # worker shutdown sentinel
    for worker in workers: worker.join()
© www.soinside.com 2019 - 2024. All rights reserved.