我想使用python ThreadPoolExecutor同时调用两个api(附代码)。 如果这两个 api 调用中的任何一个有响应,我想停止调用另一个。因为对于我的用例,两个 api 之一将需要很长时间才能返回响应,我想避免调用。
def get_rest_api_response(url):
return requets.get(url)
import requests, os
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=4) as executor:
f1 = executor.submit(get_rest_api_response, url="REST_API_URL_1")
f2 = executor.submit(get_rest_api_response, url="REST_API_URL_2")
no_future_is_done = True
while(no_future_is_done):
if f1.done():
no_future_is_done = False
print("f1 is done")
output = f1.result()
print(f2.cancel()) ######------> Failing!
if f2.done():
no_future_is_done = False
print("f2 is done")
output = f2.result()
print(f1.cancel()) ######-------> Failing!
print(output)
我正在使用 future.cancel() 但它失败并返回 False。 https://pd.codechef.com/docs/py/3.4.2/library/concurrent.futures.html#concurrent.futures.Future.cancel
还有其他方法可以实现这个目标吗?
虽然
ThreadPoolExecutor
不提供停止运行任务的工具,但我们可以更新目标任务函数以在设置线程安全标志时停止运行。这可以使用 threading.Event. 来实现
首先,它要求您首先创建一个
threading.Event
来控制运行任务何时停止。
from threading import Event
...
# create an event to shut down all running tasks
event = Event()
然后可以将此事件作为参数传递给每个目标任务函数。
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
# execute the task, passing the event
future = executor.submit(work, event)
任务运行后,可以通过在
Event
上设置标志来从主线程停止它。
...
# stop running tasks via the event
event.set()
每个目标任务函数必须经常检查事件的状态,例如在任务内循环的每次迭代中。
如果设置了事件,则任务可以停止运行,也许立即返回。
...
# check the status of the flag
if event.is_set():
return
如果您的目标任务函数打开了一些资源,则可能需要在返回之前进行清理。
这种停止运行任务的方法可能需要您更改任务的结构,以便拥有一个允许您检查标志值的循环结构。
例如,如果您的任务从文件或套接字读取数据,您可能需要更改在循环中的数据块中执行的读取操作,以便循环的每次迭代都可以检查标志的状态。
# SuperFastPython.com
# example of stopping a running task in a thread pool
from time import sleep
from threading import Event
from concurrent.futures import ThreadPoolExecutor
# mock target task function
def work(event):
# pretend read data for a long time
for _ in range(10):
# pretend to read some data
sleep(1)
# check if the task should stop
if event.is_set():
return
# create an event used to stop running tasks
event = Event()
# create a thread pool
with ThreadPoolExecutor() as executor:
# execute one task
future = executor.submit(work, event)
# wait a moment
print('The task is running...')
sleep(2)
# cancel the task, just in case it is not yet running
future.cancel()
# stop the running task using the flag
print('Stopping the task...')
event.set()
# waiting for all tasks to complete
print('Waiting...')
print('All done.')
运行示例首先创建一个线程池并调度一个任务。
创建一个事件对象并将其传递给任务,在该任务中每次迭代都会检查它是否已设置,如果已设置则退出任务。
任务开始正常执行两秒钟。首先,我们取消池中的所有任务,以防万一它尚未开始执行。
然后我们设置事件来触发正在运行的任务停止。该任务每秒检查事件的状态,并在事件设置后的下一次迭代中停止执行。
The task is running...
Stopping the task...
Waiting...
All done.
原来的答案可以在这里找到!
import concurrent.futures
import time
class MyTask:
def __init__(self, task_name):
self.task_name = task_name
self.if_run = True
def run(self):
while self.if_run:
print(f"Task '{self.task_name}' is running. '{self.if_run}'")
time.sleep(1)
class ThreadPoolManager:
def __init__(self):
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
self.task = {} # task_name - MyTask[task_name]
self.task_pool_dict = {} # task_name - future
def add_task(self, task_name, *args, **kwargs):
self.task[task_name] = MyTask(task_name)
future = self.executor.submit(self.task[task_name].run, *args, **kwargs)
self.task_pool_dict[task_name] = future
def stop_task(self, task_name):
if task_name in self.task_pool_dict:
self.task[task_name].if_run = False
self.task.pop(task_name, None)
print(f"stopping task'{task_name}'")
future = self.task_pool_dict.pop(task_name)
future.cancel()
if __name__ == "__main__":
thread_pool = ThreadPoolManager()
# Add tasks
thread_pool.add_task("Task1")
thread_pool.add_task("Task2")
time.sleep(5)
print("end of sleep")
# Stop tasks
thread_pool.stop_task("Task1")
thread_pool.stop_task("Task2")
thread_pool.executor.shutdown(wait=True)
针对问题中提到的具体情况,可以修改“MyTask”:每个api一个“MyTask”,两个“run”函数