在我的
gunicorn/uvicorn
运行的 FastAPI 应用程序中,我需要以完全非阻塞的方式执行一些长时间运行的任务,因此主 asyncio
事件循环不受影响。我能想到的唯一方法是分离单独的进程来触发任务,然后以某种方式收集结果并向主循环发出信号。所以基本上,我的工作流程应该类似于:
1. Fire task in separate process (do ffmpeg video encoding and save files/data).
2. Forget about the running process and do other stuff in a normal way.
3. Get "I'm done" signal from the process(es) and check for errors.
4. Handle results.
我的长时间运行任务在某些文件上调用
ffmpeg
视频编码器,实际上使用 asyncio.subprocess
来分叉外部 ffmpeg
进程。然后它对生成的文件执行一些文件操作,并将一些数据存储在应用程序的数据库中。代码如下(简化版):
import ffmpeg # ffmpeg-python (https://kkroening.github.io/ffmpeg-python/)
import asyncio
from pydantic import BaseModel
class ProcessResultModel(BaseModel):
returncode: int = None
stdout: str = ''
stderr: str = ''
class Config:
arbitrary_types_allowed = True
@ffmpeg.nodes.output_operator()
async def run_async_async(stream_spec, cmd='ffmpeg', pipe_stdin=False, pipe_stdout=False,
pipe_stderr=False, quiet=False, overwrite_output=False,
run: bool = True) -> Union[asyncio.subprocess.Process, ProcessResultModel]:
# compile ffmpeg args
args = ffmpeg._run.compile(stream_spec, cmd, overwrite_output=overwrite_output)
# pipe streams as required
stdin_stream = asyncio.subprocess.PIPE if pipe_stdin else None
stdout_stream = asyncio.subprocess.PIPE if pipe_stdout or quiet else None
stderr_stream = asyncio.subprocess.PIPE if pipe_stderr or quiet else None
# create subprocess (ffmpeg)
process = await asyncio.create_subprocess_exec(*args, stdin=stdin_stream,
stdout=stdout_stream, stderr=stderr_stream)
# if not told to run, simply return the process object
if not run: return process
# run process and collect results
stdout, stderr = await process.communicate()
# return results in a nice object
return ProcessResultModel(returncode=process.returncode,
stdout=stdout.decode('utf-8') if stdout else '',
stderr=stderr.decode('utf-8') if stderr else '')
如果我只是在 FastAPI CRUD 函数中按原样调用它:
async def fire_task(stream):
res = await stream.run_async_async(run=True)
它将调用
process.communicate()
并有效地阻止我的主事件循环,直到整个任务完成。如果我用 run=False
调用它,它只会返回初始化的进程,我需要自己从某个地方开始。
是否有一种方法可以在不阻塞事件循环的情况下“即发即忘”进程,然后在某个时刻让进程发出已完成的信号并以安全且可靠的方式收集结果?
因此,在仔细阅读了对此处提到的类似问题的答复(感谢@Chris)后,我终于设法整理出一个可行的解决方案。见下文。
在应用程序的生命周期中实例化全局单例线程池:
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
POOL_MAX_THREADS = 20
@asynccontextmanager
async def lifespan(app: FastAPI):
# create thread pool for long-runners
pool = ThreadPoolExecutor(max_workers=POOL_MAX_THREADS)
# do other initialization / regular tasks
await on_startup_single(app)
await regular_tasks_5min()
# yield globals (will be accessible in requests)
yield {'pool': pool}
# do shutdown activities
await on_shutdown_single(app)
pool.shutdown()
通过
Request
: 将全局池从端点传递到底层 CRUD 函数
from fastapi import APIRouter, Request, Body
import crud # app's CRUD functions
router = APIRouter()
@router.post('/do-stuff')
async def do_stuff(request: Request, data: dict = Body(...)):
# pass global thread pool in request state
await crud.do_stuff(data, request.state.pool)
准备参数并在线程池中触发长时间运行的任务:
from concurrent.futures import Executor, ThreadPoolExecutor
import asyncio
POOL_MAX_THREADS = 20
# the long-running worker we need to run in a non-blocking fashion
async def very_long_task(data):
await one()
await two()
return await three()
# the task-spawning function
async def do_stuff(data: dict, pool: Executor = None)
# we may also choose to execute in a default pool
if pool is None: pool = ThreadPoolExecutor(max_workers=POOL_MAX_THREADS)
# get main event loop
loop = asyncio.get_running_loop()
# fire task and return immediately, leaving the task to run in thread pool
loop.run_in_executor(pool, lambda: asyncio.run(very_long_task(data)))
基本上就是这样!
我无法让代码与其他建议的解决方案一起使用,包括使用
ProcessPoolExecutor
而不是 ThreadPoolExecutor
,或者使用 asyncio.Queue
将出现的任务从队列中取出。