在 Python 中使用 `concurrent.futures` 取消所有失败的任务

问题描述 投票:0回答:1

我正在使用 Python 的

concurrent.futures
库以及
ThreadPoolExecutor
ProcessPoolExecutor
。我想实现一种机制,如果任何一个任务失败,则取消所有正在运行或未执行的任务。具体来说,我想:

  1. 当任务失败时取消所有 future(包括正在运行的和未执行的)。
  2. 如果错误被静默忽略,则引发导致第一个任务失败的错误;否则,就让Python自然提升吧。

这是我尝试过的方法:

from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial

copy_func = partial(copy_from, table_name=table_name, column_string=column_string)
with ProcessPoolExecutor(max_workers=cores_to_use) as executor:
    futures = {executor.submit(copy_func, file_path): file_path for file_path in file_path_list}
    for f in as_completed(futures):
        try:
            f.result()
        except Exception as e:
            executor.shutdown(wait=False)  # Attempt to stop the executor
            for future in futures:
                future.cancel()  # Cancel all futures
            raise e  # Raise the exception

问题:

  • 这是在
    ThreadPoolExecutor
    ProcessPoolExecutor
    中处理任务取消的正确方法吗?
  • 有没有更好的方法来实现这个功能?
  • 如何确保引发的异常不会被默默忽略?
  • 发生异常后如何释放
    concurrent.futures
    使用的所有资源?

谢谢!

python error-handling concurrency multiprocessing concurrent.futures
1个回答
0
投票

这是 ThreadPoolExecutor 和 ProcessPoolExecutor 中处理任务取消的正确方法吗?

你的方法几乎是正确的,但有一些重要的事情需要注意:

  • executor.shutdown(wait=False)
    仅阻止提交新任务。它实际上并没有取消正在运行的任务。已经开始执行的任务将继续执行直至完成。
  • future.cancel()
    仅取消仍在队列中(未启动)的任务。如果任务已经开始,
    future.cancel()
    不会停止它。

要有效地取消正在运行的任务,您需要通过检查标志或捕获异常来中断您的任务。

有没有更好的方法来实现这个功能?

更好的方法是结合使用:

  • 优雅的任务取消:您的任务应定期检查标志或信号以自行取消。
  • 正确处理异常:您可以将
    try/except
    块与工作函数(
    copy_from
    )本身一起使用来检测故障并传播它。

这是代码:

from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
import threading

# Cancellation event that worker tasks can check
cancel_event = threading.Event()

def copy_from(file_path, table_name, column_string):
    if cancel_event.is_set():
        print(f"Task {file_path} cancelled.")
        return
    # Simulate some work or copying logic
    try:
        # Your actual copy operation here
        if file_path == "some_bad_file":
            raise ValueError("Simulated task failure")
        print(f"Processing {file_path}")
    except Exception as e:
        cancel_event.set()  # Set the cancellation flag
        raise e

# Function to handle task execution
def run_tasks(file_path_list, table_name, column_string, cores_to_use):
    copy_func = partial(copy_from, table_name=table_name, column_string=column_string)

    with ProcessPoolExecutor(max_workers=cores_to_use) as executor:
        futures = {executor.submit(copy_func, file_path): file_path for file_path in file_path_list}

        try:
            for f in as_completed(futures):
                if cancel_event.is_set():
                    # If cancellation has been triggered, skip waiting for further results
                    break
                # Attempt to get the result and raise any exception if task failed
                f.result()
        except Exception as e:
            cancel_event.set()  # Cancel all tasks
            for future in futures:
                future.cancel()  # Attempt to cancel futures that haven't started yet
            raise e  # Re-raise the exception

file_path_list = ["file1", "file2", "some_bad_file", "file4"]
table_name = "my_table"
column_string = "col1, col2"

try:
    run_tasks(file_path_list, table_name, column_string, cores_to_use=4)
except Exception as ex:
    print(f"An error occurred: {ex}")

如何确保引发的异常不会被默默忽略?

为了确保异常不会被默默忽略,至关重要的是:

  • 始终在
    f.result()
    循环中调用
    for f in as_completed(futures)
    。这将引发任务执行期间发生的任何异常。
  • 在取消其他任务后,使用
    raise a
    传播异常,就像您所做的那样。

发生异常后如何释放
concurrent.futures
使用的所有资源?

默认情况下,当

with ProcessPoolExecutor
块存在时,它会调用
executor.shutdown()
,从而释放资源。这是由 Python 的上下文管理器自动处理的。但是,添加
executor.shutdown(wait=False)
是不必要的,因为如果您正确处理了取消,则不需要提前关闭执行器。

为了确保资源得到正确清洁:

  • 尚未开始的任务应使用
    future.cancel()
    取消。
  • 已经启动的任务将通过设置取消标志来停止,当所有任务完成时,上下文管理器将清理池并释放资源。

希望这对你有一点帮助。

© www.soinside.com 2019 - 2024. All rights reserved.