我有一个使用 FastAPI 构建的 API,该端点将任务提交给 celery 工作人员,等待工作人员完成其工作并将结果返回给用户。
问题是等待结果的正确方法是什么?
端点代码
from tasks import celery_application, some_task
from celery.result import AsyncResult
@api.post('/submit')
async def submit(data: str):
task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
result = AsyncResult(id=task.task_id, app=celery_application).get()
return {'task_result': result}
AsyncResult
的问题是get
方法会阻塞应用程序,它会同步等待结果,同时API会冻结。
我想出的解决方案之一是循环检查 n 秒的结果
from tasks import celery_application, some_task
import asyncio
import redis
r = redis.Redis.from_url(REDIS_CONN_URI)
@api.post('/submit')
async def submit(data: str):
task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
result = None
for _ in range(100):
if r.exists(task.task_id):
result = r.get(task.task_id)
break
await asyncio.sleep(0.3)
return {'task_result': result}
但它只能部分起作用。虽然端点没有被阻止并且可以访问。当端点尝试再次到达发送任务时,它会被阻止。
是的,那个redis接口是阻塞的。 您应该切换到使用
aioredis
进行此池化,以便它可以与异步代码一起工作,或者将 redis 任务卡在 ThreadPoolExecutor 中 - 这样主线程就不会阻塞等待 .get
的返回.
第二种方法不需要您对代码或必要条件进行任何更改 - 只需创建一个可以按流程使用的合适的工作池。这应该与您每秒收到的请求数成正比,并且不受您拥有的 CPU 核心数量的限制:工作线程中的大多数线程在等待结果时不会执行任何操作。 (不幸的是,Python 执行器模型不提供动态池,否则,根据负载调整池大小可能是一个不错的选择)。无论如何,如果 celery 工作线程是本地的,则您可以同时获得的结果数量受到限制 - 因此标准 2X CPU 核心数可能就足够了。否则,我相信,最多 100 或 200 个线程的线程池执行器可以适用于中型虚拟机,并为您提供每秒大约 1000 秒请求的良好吞吐量。
from tasks import celery_application, some_task
import asyncio
import redis
MAXWORKERS = 24 # check text.
executor = concurrent.futures.ThreadPoolExecutor(MAXWORKERS)
r = redis.Redis.from_url(REDIS_CONN_URI)
@api.post('/submit')
async def submit(data: str):
task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
result_call = AsyncResult(id=task.task_id, app=celery_application).get
result = await asyncio.run_in_executor(executor, result_call)
return {'task_result': result}
return {'task_result': result}
当然,这假设您的前端可以轻松等待
.get()
返回 - 我假设最多需要几秒钟。这不会阻止您的 HTTP API。.get
返回时运行 - 在这种情况下,您可以调用 asyncio.create_task
,将 .run_in_executor
调用传递给它,然后对返回的任务调用 .add_done_callback
。 (此外,您将需要一个数据结构(set
)来跟踪正在进行的任务,否则它们可能会被事件循环取消引用)