我正在尝试使用多处理映射函数在我的列表上执行函数。然而,在这个函数中,我执行了一些不能同时执行的 API 调用,因为它会导致 API 过载。然而,我的代码分散在多个文件中,因为它很长。
我试图遵循这个解决方案,但它对我不起作用。我怀疑它不起作用,因为我正在处理多个文件。我得到的具体错误是:NameError: name 'lock' is not Defined.
特此简化我的代码:
main.py
import file2
def evaluate_entry_of_list(entry):
response = file2.get_data(entry)
# some calculations
return results
def init_pool(given_lock):
global lock
lock = given_lock
if __name__ == '__main__':
list = apicaller.get_list()
t_lock = Lock()
with Pool(8, initializer=init_pool, initargs=(t_lock,)) as pool:
results = pool.map(evaluate_entry_of_list, list)
process_results(results)
文件2.py
def make_call(url, body)-> requests.Response:
lock.acquire()
# Make API call
lock.release()
return response
我尝试过的其他解决方案:在 main 中定义变量并导入到 file2 并使用单独的类创建静态变量并导入到 file2 中。
自然有很多方法可以解决此类问题,但我建议使用
Process
对象创建自己的“池”。像 Lock
Queue
等这样的东西只能在进程创建时传递(这就是为什么在 pool
中它们必须传递给初始化器)。如果您控制流程创建,则传递这些对象会更容易一些。
这是一个演示(只是为了了解可以有多简单)
from multiprocessing import Process, Lock, Queue, cpu_count
from api import make_call, task_list
def worker(lock, in_queue, out_queue):
for task in iter(in_queue.get, None):
with lock:
out_queue.put(make_call(task))
if __name__ == "__main__":
api_lock = Lock()
in_queue = Queue()
out_queue = Queue()
workers = [Process(target=worker, args=(api_lock, in_queue, out_queue)) for _ in range(cpu_count())]
for worker in workers: worker.start()
for task in task_list: in_queue.put(task)
results = [out_queue.get() for _ in task_list]
for _ in workers: in_queue.put(None) # worker shutdown sentinel
for worker in workers: worker.join()