我想创建一项带有一个
FastAPI
端点的 /get
服务,该端点将返回 ML 模型推理结果。实现起来非常容易,但问题是我需要定期使用较新的版本更新模型(通过另一台带有模型的服务器上的请求,但这不是重点),在这里我看到了一个问题!
如果一个请求调用旧模型,但旧模型当前正在被新模型取代,会发生什么?如何用
asyncio
实现这种锁定机制?
这是代码:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI, Request
from sentence_transformers import SentenceTransformer
app = FastAPI()
sbertmodel = None
def create_model():
global sbertmodel
sbertmodel = SentenceTransformer('multi-qa-MiniLM-L6-cos-v1')
# if you try to run all predicts concurrently, it will result in CPU trashing.
pool = ProcessPoolExecutor(max_workers=1, initializer=create_model)
def model_predict():
ts = time.time()
vector = sbertmodel.encode('How big is London')
return vector
async def vector_search(vector):
# simulate I/O call (e.g. Vector Similarity Search using a VectorDB)
await asyncio.sleep(0.005)
@app.get("/")
async def entrypoint(request: Request):
loop = asyncio.get_event_loop()
ts = time.time()
# worker should be initialized outside endpoint to avoid cold start
vector = await loop.run_in_executor(pool, model_predict)
print(f"Model : {int((time.time() - ts) * 1000)}ms")
ts = time.time()
await vector_search(vector)
print(f"io task: {int((time.time() - ts) * 1000)}ms")
return "ok"
我的模型更新将通过重复任务来实现(但这现在并不重要):https://fastapi-utils.davidmontague.xyz/user-guide/repeated-tasks/
这是模型服务的想法:https://luis-sena.medium.com/how-to-optimize-fastapi-for-ml-model-serving-6f75fb9e040d
编辑:并发运行多个请求很重要,当模型更新时,获取锁以便请求不会失败,它们应该等待更长的时间,因为它是一个小模型。
谢谢你的片段。 有了它可见,就可以写一个提案 那里有你需要的东西 -
由于您正在使用 ProcessPool 工作线程, 你需要一种方法来公开变量 流程工作人员可以“看到”的根流程 -
Python 的形式是
multiprocessing.Manager
物体 -
下面我选择您的代码并添加部件 满足“不直接但不冲突”的要求 更新正在使用的模型。 事实证明,一旦我们有了可以在 工作人员,所需要的只是检查模型运行程序 方法本身来查看模型是否需要更新。\
我没有运行此代码片段 - 因此变量名称中可能存在一些拼写错误,甚至缺少一个或其他括号 - 用作模型, 不是“复制+粘贴”(但我测试了“移动部件”
Manager.Namespace()
对象并作为参数传递
作为 initargs
中的 ProcessPoolExecutor
)
import asyncio
import time
import threading
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
from fastapi import FastAPI, Request
from sentence_transformers import SentenceTransformer
sbertmodel = None
local_model_iteration = -1
shared_namespace = None
# pool, and other multi-processing objects can`t simply
# be started in the top level of the body, or they't be re
# created in each subprocess!!
# check https://fastapi.tiangolo.com/advanced/events/#lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool, root_namespace
manager = Manager()
root_namespace = manager.NameSpace()
# Values assigned to the "namespace" object are
# visible on the subprocess created by the pool
root_namspace.model_iteration = 0
root_namespace.model_parameters = "multi-qa-MiniLM-L6-cos-v1"
# (as long as we send the namespace object to each subprocess
# and store it there)
pool = ProcessPoolExecutor(max_workers=1, initializer=initialize_subprocess, initargs=(root_namespace,))
with pool, manager:
# pass control to fastapi: all the app is executed
yield
# end of "with" block:
# both the pool and manager are shutdown when fastapi server exits!
app = FastAPI(lifespan=lifespan)
# if you try to run all predicts concurrently, it will result in CPU trashing.
def initialize_subprocess(shared_namespace_arg):
global shared_namespace
# Store the shared namespace in _this_ process:
shared_namespace = shared_namespac_arg
update_model()
def update_model():
"called on worker subprocess start, and at any time the model is outdated"
global local_model_iteration, sbertmodel
local_model_iteration = shared_namespace.model_iteration
# retrieve parameter posted by root process:
sbertmodel = SentenceTransformer(shared_namespace.model_parameters)
def model_predict():
ts = time.time()
# verify if model was updatd from the root process
if shared_namespace.model_iteration > local_model_iteration:
# if so, just update the model
update_model()
# model is synchronied, just do our job:
vector = sbertmodel.encode('How big is London')
return vector
async def vector_search(vector):
# simulate I/O call (e.g. Vector Similarity Search using a VectorDB)
await asyncio.sleep(0.005)
@app.get("/")
async def entrypoint(request: Request):
loop = asyncio.get_event_loop()
ts = time.time()
# worker should be initialized outside endpoint to avoid cold start
vector = await loop.run_in_executor(pool, model_predict)
print(f"Model : {int((time.time() - ts) * 1000)}ms")
ts = time.time()
await vector_search(vector)
print(f"io task: {int((time.time() - ts) * 1000)}ms")
return "ok"
@app.get("/update_model")
async def update_model_endpoint(request: Request):
# extract from the request the needed paramters for the new model
...
new_model_parameters = ...
# uodate the model parameters and model iteration so they are visible
# in the worker(s)
root_namespace.model_parameters = new_model_parameters
# This increment taking place _after_ the "model_parameters" are set
# is all that is needed to keep things running in order here:
root_namespace.model_iteration += 1
return {} # whatever response needed by the endpoint