我想在 FastAPI 中添加计划作业,并且我已将计划程序设置和关闭代码放在 FastAPI 应用程序的
lifespan
中。但是,我注意到两个工作人员都独立运行调度程序。
有什么方法可以确保只有一名工人从事预定的工作吗?
示例代码:
from fastapi import FastAPI
from datetime import datetime
from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import uvicorn
def test():
print(f"Test scheduler {datetime.now()}")
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler = AsyncIOScheduler()
scheduler.add_job(test, trigger="cron", second="0-30")
scheduler.start()
yield
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)
if __name__ == "__main__":
uvicorn.run("main:app", workers=2)
输出,其中
test()
每秒运行两次,而我只希望它每秒运行一次:
INFO: Uvicorn running on http://127.0.0.1:5000 (Press CTRL+C to quit)
INFO: Started parent process [34952]
INFO: Started server process [34957]
INFO: Waiting for application startup.
INFO: Started server process [34958]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Application startup complete.
Test scheduler 2024-02-21 14:51:15.001191
Test scheduler 2024-02-21 14:51:15.001314
Test scheduler 2024-02-21 14:51:16.000480
Test scheduler 2024-02-21 14:51:16.001643
Test scheduler 2024-02-21 14:51:17.000901
Test scheduler 2024-02-21 14:51:17.002765
INFO: Shutting down
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Application shutdown complete.
INFO: Finished server process [34958]
INFO: Finished server process [34957]
INFO: Stopping parent process [34952]
我发现对我来说启动一个
BackgroundScheduler
而不是 AsyncIOScheduler
可能是一个更好的主意,并在 main 函数中启动它,以便在父进程中配置调度程序。在这种情况下,只会创建 1 个调度程序。
from fastapi import FastAPI
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
import uvicorn
def test():
print(f"Test scheduler {datetime.now()}")
app = FastAPI()
if __name__ == "__main__":
scheduler = BackgroundScheduler()
scheduler.add_job(test, trigger="cron", second="0-30")
scheduler.start()
uvicorn.run("main:app", workers=2)
scheduler.shutdown()