如何在 python ThreadPoolExecutor 中终止/取消/停止运行执行器 future? future.cancel() 返回 False

问题描述 投票:0回答:2

我想使用python ThreadPoolExecutor同时调用两个api(附代码)。 如果这两个 api 调用中的任何一个有响应,我想停止调用另一个。因为对于我的用例,两个 api 之一将需要很长时间才能返回响应,我想避免调用。

def get_rest_api_response(url):
    return requets.get(url)
    
import requests, os
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=4) as executor:
    f1 = executor.submit(get_rest_api_response, url="REST_API_URL_1")
    f2 = executor.submit(get_rest_api_response, url="REST_API_URL_2")
    
    no_future_is_done = True
    while(no_future_is_done):
        if f1.done():
            no_future_is_done = False
            print("f1 is done")
            output = f1.result()
            print(f2.cancel())  ######------> Failing!
        if f2.done():
            no_future_is_done = False
            print("f2 is done")
            output = f2.result()
            print(f1.cancel()) ######-------> Failing!
    print(output)

我正在使用 future.cancel() 但它失败并返回 False。 https://pd.codechef.com/docs/py/3.4.2/library/concurrent.futures.html#concurrent.futures.Future.cancel

还有其他方法可以实现这个目标吗?

python multithreading threadpool python-multithreading threadpoolexecutor
2个回答
8
投票

虽然

ThreadPoolExecutor
不提供停止运行任务的工具,但我们可以更新目标任务函数以在设置线程安全标志时停止运行。这可以使用 threading.Event.

来实现

首先,它要求您首先创建一个

threading.Event
来控制运行任务何时停止。

from threading import Event
...
# create an event to shut down all running tasks
event = Event()

然后可以将此事件作为参数传递给每个目标任务函数。

with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    # execute the task, passing the event
    future = executor.submit(work, event)

任务运行后,可以通过在

Event
上设置标志来从主线程停止它。

...
# stop running tasks via the event
event.set()

每个目标任务函数必须经常检查事件的状态,例如在任务内循环的每次迭代中。

如果设置了事件,则任务可以停止运行,也许立即返回

...
# check the status of the flag
if event.is_set():
    return

如果您的目标任务函数打开了一些资源,则可能需要在返回之前进行清理。

这种停止运行任务的方法可能需要您更改任务的结构,以便拥有一个允许您检查标志值的循环结构。

例如,如果您的任务从文件或套接字读取数据,您可能需要更改在循环中的数据块中执行的读取操作,以便循环的每次迭代都可以检查标志的状态。

完整示例

# SuperFastPython.com
# example of stopping a running task in a thread pool
from time import sleep
from threading import Event
from concurrent.futures import ThreadPoolExecutor

# mock target task function
def work(event):
    # pretend read data for a long time
    for _ in range(10):
        # pretend to read some data
        sleep(1)
        # check if the task should stop
        if event.is_set():
            return

# create an event used to stop running tasks
event = Event()
# create a thread pool
with ThreadPoolExecutor() as executor:
    # execute one task
    future = executor.submit(work, event)
    # wait a moment
    print('The task is running...')
    sleep(2)
    # cancel the task, just in case it is not yet running
    future.cancel()
    # stop the running task using the flag
    print('Stopping the task...')
    event.set()
    # waiting for all tasks to complete
    print('Waiting...')
print('All done.')

运行示例首先创建一个线程池并调度一个任务。

创建一个事件对象并将其传递给任务,在该任务中每次迭代都会检查它是否已设置,如果已设置则退出任务。

任务开始正常执行两秒钟。首先,我们取消池中的所有任务,以防万一它尚未开始执行。

然后我们设置事件来触发正在运行的任务停止。该任务每秒检查事件的状态,并在事件设置后的下一次迭代中停止执行。

The task is running...
Stopping the task...
Waiting...
All done.

原来的答案可以在这里找到!


0
投票
与@Shahzod1011类似,但在课堂上管理任务和信号

import concurrent.futures import time class MyTask: def __init__(self, task_name): self.task_name = task_name self.if_run = True def run(self): while self.if_run: print(f"Task '{self.task_name}' is running. '{self.if_run}'") time.sleep(1) class ThreadPoolManager: def __init__(self): self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) self.task = {} # task_name - MyTask[task_name] self.task_pool_dict = {} # task_name - future def add_task(self, task_name, *args, **kwargs): self.task[task_name] = MyTask(task_name) future = self.executor.submit(self.task[task_name].run, *args, **kwargs) self.task_pool_dict[task_name] = future def stop_task(self, task_name): if task_name in self.task_pool_dict: self.task[task_name].if_run = False self.task.pop(task_name, None) print(f"stopping task'{task_name}'") future = self.task_pool_dict.pop(task_name) future.cancel() if __name__ == "__main__": thread_pool = ThreadPoolManager() # Add tasks thread_pool.add_task("Task1") thread_pool.add_task("Task2") time.sleep(5) print("end of sleep") # Stop tasks thread_pool.stop_task("Task1") thread_pool.stop_task("Task2") thread_pool.executor.shutdown(wait=True)
针对问题中提到的具体情况,可以修改“MyTask”:每个api一个“MyTask”,两个“run”函数

© www.soinside.com 2019 - 2024. All rights reserved.