我正在使用 Python 的
concurrent.futures
库以及 ThreadPoolExecutor
和 ProcessPoolExecutor
。我想实现一种机制,如果任何一个任务失败,则取消所有正在运行或未执行的任务。具体来说,我想:
这是我尝试过的方法:
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
copy_func = partial(copy_from, table_name=table_name, column_string=column_string)
with ProcessPoolExecutor(max_workers=cores_to_use) as executor:
futures = {executor.submit(copy_func, file_path): file_path for file_path in file_path_list}
for f in as_completed(futures):
try:
f.result()
except Exception as e:
executor.shutdown(wait=False) # Attempt to stop the executor
for future in futures:
future.cancel() # Cancel all futures
raise e # Raise the exception
ThreadPoolExecutor
和 ProcessPoolExecutor
中处理任务取消的正确方法吗?concurrent.futures
使用的所有资源?谢谢!
你的方法几乎是正确的,但有一些重要的事情需要注意:
executor.shutdown(wait=False)
仅阻止提交新任务。它实际上并没有取消正在运行的任务。已经开始执行的任务将继续执行直至完成。future.cancel()
仅取消仍在队列中(未启动)的任务。如果任务已经开始,future.cancel()
不会停止它。要有效地取消正在运行的任务,您需要通过检查标志或捕获异常来中断您的任务。
更好的方法是结合使用:
try/except
块与工作函数(copy_from
)本身一起使用来检测故障并传播它。这是代码:
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
import threading
# Cancellation event that worker tasks can check
cancel_event = threading.Event()
def copy_from(file_path, table_name, column_string):
if cancel_event.is_set():
print(f"Task {file_path} cancelled.")
return
# Simulate some work or copying logic
try:
# Your actual copy operation here
if file_path == "some_bad_file":
raise ValueError("Simulated task failure")
print(f"Processing {file_path}")
except Exception as e:
cancel_event.set() # Set the cancellation flag
raise e
# Function to handle task execution
def run_tasks(file_path_list, table_name, column_string, cores_to_use):
copy_func = partial(copy_from, table_name=table_name, column_string=column_string)
with ProcessPoolExecutor(max_workers=cores_to_use) as executor:
futures = {executor.submit(copy_func, file_path): file_path for file_path in file_path_list}
try:
for f in as_completed(futures):
if cancel_event.is_set():
# If cancellation has been triggered, skip waiting for further results
break
# Attempt to get the result and raise any exception if task failed
f.result()
except Exception as e:
cancel_event.set() # Cancel all tasks
for future in futures:
future.cancel() # Attempt to cancel futures that haven't started yet
raise e # Re-raise the exception
file_path_list = ["file1", "file2", "some_bad_file", "file4"]
table_name = "my_table"
column_string = "col1, col2"
try:
run_tasks(file_path_list, table_name, column_string, cores_to_use=4)
except Exception as ex:
print(f"An error occurred: {ex}")
为了确保异常不会被默默忽略,至关重要的是:
f.result()
循环中调用 for f in as_completed(futures)
。这将引发任务执行期间发生的任何异常。raise a
传播异常,就像您所做的那样。concurrent.futures
使用的所有资源?默认情况下,当
with ProcessPoolExecutor
块存在时,它会调用executor.shutdown()
,从而释放资源。这是由 Python 的上下文管理器自动处理的。但是,添加 executor.shutdown(wait=False)
是不必要的,因为如果您正确处理了取消,则不需要提前关闭执行器。
为了确保资源得到正确清洁:
future.cancel()
取消。希望这对你有一点帮助。