我正在尝试向我的 FastAPI 应用程序添加一些功能,它将异步更新 ML 模型(从 S3 存储桶中提取)。这个想法是每小时更新一次此模型,而不会阻止 API 响应 CPU 密集型任务的能力,例如模型推理请求。
# Global model variable
ml_model = None
# Async function to download model from S3
async def download_model_from_s3(path="integration-tests/artifacts/MULTI.joblib"):
global ml_model
s3_client = boto3.client("s3")
bucket = os.environ.get("BUCKET_BUCKET", "artifacts_bucket")
try:
local_model_path = './model.joblib'
download_coroutine = s3_client.download_file(bucket, path, local_model_path)
await download_coroutine
ml_model = joblib.load(local_model_path)
http://logging.info(f"Model updated.")
except Exception as e:
logging.exception(f"Error downloading or loading model: {e}")
# Asynchronous scheduler function that updates the model every interval
async def scheduler(bucket_name: str, model_key: str, interval=60):
while True:
# Sleep for the specified interval (in minutes)
await asyncio.sleep(interval * 60)
# Call the download function to update the model
await download_model_from_s3(bucket_name, model_key)
app = FastAPI()
# Startup event to start the scheduler
@app.on_event("startup")
async def startup_event():
# BLOCKING: Download the model once at startup to ensure it is available
download_model_from_s3() # Blocking, ensures model is available
# Start the scheduler to update the model every 60 minutes (async, non-blocking)
await scheduler(bucket_name, model_key, interval=60)
我熟悉 FastAPI,但对异步编程相对较新。我的问题:这是从 S3 异步提取数据的正确方法吗?是否需要单独的异步 S3 客户端?
我选择使用 while true 语句,而不是显式地每小时安排一次作业。然而,我不确定这种技术是否合适。
乍一看,这看起来是正确的,假设
s3_client.download_file()
本身内部是非阻塞的。事实上,它看起来是一个第三方库,而你调用的API函数确实是async
(因为你必须await
),这表明这一切都很好。
在循环中使用
asyncio.sleep()
是完全没问题的,因为每次你 await
睡眠时,你都可以有效地将控制权交还给事件循环,然后事件循环就能够处理其他协程(这就是异步并发的原因)。
有趣的技巧:调用
asyncio.sleep(0)
根本不会休眠,而只会将控制权交还给事件循环。这对于执行同步调用但希望能够在每次迭代后放弃控制的 while True
循环很有帮助。