我正在使用 Python 中的 multiprocessing.Pool 来并行化需要为工作负载中的每个项目进入和退出上下文管理器的任务。我想确保当键盘中断发生时,程序通过允许每个工作进程通过传播中断(或类似的东西)来完成其上下文管理器的exit方法来正常退出。
这是一个简单的代码片段来说明我的想法:
class MyContext:
def __init__(self, name):
self.name = name
def __enter__(self):
print(f'Entering context {self.name}')
return self
def __exit__(self, exc_type, exc_value, traceback):
print(f'Exiting context {self.name}')
time.sleep(5)
print(f'Exited context {self.name}')
def process_name(name):
with MyContext(name):
print(f'Processing {name}')
time.sleep(100)
print(f'Finished {name}')
names = ['Alice', 'Bob']
for name in names:
process_name(name)
这是我期望的结果:
Entering context Alice
Processing Alice
Entering context Bob
Processing Bob
^CExiting context Alice
Exiting context Bob
Exited context Alice
Exited context Bob
KeyboardInterrupt (maybe multiple times)
这是实际输出:
Entering context Alice
Processing Alice
Entering context Bob
Processing Bob
^CExiting context Alice
Exiting context Bob
KeyboardInterrupt
上下文管理器的 exit 方法被调用,但在程序崩溃之前它没有完成。我应该改变什么才能使其按预期工作?
你需要在异常处理程序中加入池,否则Python将杀死所有子工人。
from multiprocessing import pool
import time
class MyContext:
def __init__(self, name):
self.name = name
def __enter__(self):
print(f'Entering context {self.name}')
return self
def __exit__(self, exc_type, exc_value, traceback):
print(f'Exiting context {self.name}')
time.sleep(5)
print(f'Exited context {self.name}')
def process_name(name):
with MyContext(name):
print(f'Processing {name}')
time.sleep(100)
print(f'Finished {name}')
names = ['Alice', 'Bob']
if __name__ == "__main__":
with pool.Pool(2) as pool:
try:
pool.map(process_name, names)
except KeyboardInterrupt as e:
pool.join()
Entering context Alice
Processing Alice
Entering context Bob
Processing Bob
Exiting context Bob
Exiting context Alice
Exited context Alice
Exited context Bob