我想创建一个后台任务,它将在应用程序的整个生命周期中保持运行。
这是我到目前为止尝试过的:
async def my_task():
# work in background
@app.get('/push_task')
def push_task(background_tasks: BackgroundTasks):
background_tasks.add_task(my_task)
但是,这样的话,我必须执行一个
GET
请求,才能添加后台任务。
我知道
lifespan
eventns,但我可以t use
BackgroundTasks`里面。例如:
@asynccontextmanager
async def lifespan(app: FastAPI, background_tasks: BackgroundTasks):
background_tasks.add_task(my_task) # don`t work
来自:https://github.com/tiangolo/fastapi/issues/2713
class BackgroundTask:
def __init__(self):
pass
async def my_task():
pass #work in background
bgtask = BackgroundTask()
@app.on_event('startup')
def on_startup():
asyncio.create_task(bgtask.my_task())
根据您的具体用例,您可能需要稍微调整一下。