如何在启动FastAPI应用程序之前推送后台任务?

问题描述 投票:0回答:1

我想创建一个后台任务,它将在应用程序的整个生命周期中保持运行。

这是我到目前为止尝试过的:

async def my_task():
    # work in background

@app.get('/push_task')
def push_task(background_tasks: BackgroundTasks):
    background_tasks.add_task(my_task)

但是,这样的话,我必须执行一个

GET
请求,才能添加后台任务。

我知道

lifespan
eventns,但我可以
t use 
BackgroundTasks`里面。例如:

@asynccontextmanager
async def lifespan(app: FastAPI, background_tasks: BackgroundTasks):
    background_tasks.add_task(my_task) # don`t work
python asynchronous task fastapi
1个回答
0
投票

来自:https://github.com/tiangolo/fastapi/issues/2713

class BackgroundTask:
    def __init__(self):
        pass

    async def my_task():
        pass #work in background

bgtask = BackgroundTask()

@app.on_event('startup')
def on_startup():
    asyncio.create_task(bgtask.my_task())

根据您的具体用例,您可能需要稍微调整一下。

© www.soinside.com 2019 - 2024. All rights reserved.