我正在尝试跨多个进程分配工作。另一个进程中发生的异常应该传播回来并在主进程中处理。这似乎适用于工作人员抛出的异常,但不适用于
Ctrl-C
。
import time
from concurrent.futures import ProcessPoolExecutor, Future, wait
import traceback
def worker():
time.sleep(5)
# raise RuntimeError("Some Error")
return True
def main():
with ProcessPoolExecutor() as executor:
stop = False
tasks = set()
while True:
try:
# keep submitting new tasks
if not stop:
time.sleep(0.1)
print("Submitting task to worker")
future = executor.submit(worker)
tasks.add(future)
done, not_done = wait(tasks, timeout=0)
tasks = not_done
# get the result
for future in done:
try:
result = future.result()
except Exception as e:
print(f"Exception in worker: {type(e).__name__}: {e}")
else:
print(f"Worker finished with result: {result}")
# exit loop if there are no tasks left and loop was stopped
if stop:
print(f"Waiting for {len(tasks)} to finish.")
if not len(tasks):
print("Finished all remaining tasks")
break
time.sleep(1)
except KeyboardInterrupt:
print("Recieved Ctrl-C")
stop = True
except Exception as e:
print(f"Caught {e}")
stop = True
if __name__ == "__main__":
main()
我的一些观察:
我怀疑如果在生成进程时按下 Ctrl-C,可能会导致一些意外的行为。
编辑:这些问题在 Linux 和 Windows 上都会出现。理想情况下,两者都有一个解决方案,但如果有疑问,该解决方案应该在 Linux 上工作
我不清楚您是否希望
worker
函数继续运行直到完成(正常或异常),忽略任何 Ctrl-C 事件。假设是这样,下面的代码应该在 Linux 和 Windows 下都能工作。
这个想法是使用“池初始化程序”,即在执行提交的任务之前在每个池进程中运行的函数。这里初始化程序执行代码来忽略 Ctrl-C 事件(
KeyboardInterrupt
例外)。
请注意,我还进行了一些其他代码调整(用注释标记)。
import time
from concurrent.futures import ProcessPoolExecutor, Future, wait
import traceback
import signal
def init_pool_processes():
# Ignore Ctrl-C
signal.signal(signal.SIGINT, signal.SIG_IGN)
def worker():
time.sleep(5)
# raise RuntimeError("Some Error")
return True
def main():
# Create pool child processes, which will now
# ignore Ctrl-C
with ProcessPoolExecutor(initializer=init_pool_processes) as executor:
stop = False
tasks = set()
while True:
try:
# keep submitting new tasks
if not stop:
print("Submitting task to worker")
future = executor.submit(worker)
tasks.add(future)
# Move to here:
time.sleep(0.1)
done, not_done = wait(tasks, timeout=0)
tasks = not_done
# get the result
for future in done:
try:
result = future.result()
except Exception as e:
print(f"Exception in worker: {type(e).__name__}: {e}")
else:
print(f"Worker finished with result: {result}")
# exit loop if there are no tasks left and loop was stopped
if stop:
print(f"Waiting for {len(tasks)} to finish.")
if not len(tasks):
print("Finished all remaining tasks")
break
time.sleep(1)
except KeyboardInterrupt:
# Ignore future Ctrl-C events:
signal.signal(signal.SIGINT, signal.SIG_IGN)
print("Received Ctrl-C") # spelling
stop = True
except Exception as e:
print(f"Caught {e}")
# Ignore future Ctrl-C events:
if not stop:
signal.signal(signal.SIGINT, signal.SIG_IGN)
stop = True
if __name__ == "__main__":
main()