我有一个在 docker 内运行的 FastAPI 应用程序,它是使用 portainer 部署的。几分钟后工作正常,但突然停止接收任何请求。我在日志中没有看到任何请求,相反,当在 docker 桥接端口上执行curl 操作时,它只是永远挂起。
portainer 设置是基本的,只有随机端口映射。
FROM python:3.10-slim
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV POETRY_VERSION=1.8.2
WORKDIR /app
RUN apt-get update && apt-get install -y \
curl \
build-essential \
postgresql-client \
libpq-dev \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
RUN curl -sSL https://install.python-poetry.org | python3 -
ENV PATH="${PATH}:/root/.local/bin"
RUN mkdir -p $HOME/.postgresql
RUN curl --create-dirs -o $HOME/.postgresql/root.crt 'https://cockroachlabs.cloud/clusters/.../cert'
COPY pyproject.toml poetry.lock* ./
RUN poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi
COPY . .
EXPOSE 8121
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8121"]
该项目也被分割成一个模块。
main.py 文件看起来像这样
import secrets
from datetime import datetime
from typing import List
import fastapi
import uvicorn
from fastapi import Depends, HTTPException, Security
from fastapi.security import APIKeyHeader
from gpt_proxy.utils import mask_token
from .config import ADMIN_KEY, log
from .db import USERS, add_user, db_close, db_init, del_user
from .firebase_manager import Firebase
from .models import TokenCreate, TokenResponse, UserToken
from .openai_forward import OpenAiForward
app = fastapi.FastAPI()
forwarder = OpenAiForward()
api_key_header = APIKeyHeader(name="X-Admin-Key", auto_error=True)
def verify_admin_key(api_key: str = Security(api_key_header)):
if api_key != ADMIN_KEY:
raise HTTPException(status_code=403, detail="Invalid admin key")
return api_key
@app.on_event("startup")
async def startup():
log.info("Starting up OpenAI Forward application")
await db_init()
log.info("Application startup complete")
@app.on_event("shutdown")
async def shutdown():
log.info("Shutting down OpenAI Forward application")
if forwarder.client:
await forwarder.client.close()
await db_close()
log.info("Application shutdown complete")
@app.post("/tokens", response_model=TokenResponse)
async def create_token(
token_request: TokenCreate, api_key: str = Depends(verify_admin_key)
):
new_token = f"mn-{secrets.token_urlsafe(32)}"
return await add_user(username=token_request.username, token=new_token)
@app.delete("/tokens/{username}")
async def delete_token(username: str, api_key: str = Depends(verify_admin_key)):
await del_user(username)
return {"message": f"Token for user {username} has been deleted"}
@app.get("/tokens", response_model=List[UserToken])
async def list_users(api_key: str = Depends(verify_admin_key)):
users = []
for user in USERS:
user["token"] = mask_token(user["token"])
users.append(user)
return users
@app.route(
"/{api_path:path}",
methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "PATCH", "TRACE"],
)
async def _handle_openai_request(request: fastapi.Request):
return await forwarder.reverse_proxy(request)
if __name__ == "__main__":
log.info("Starting OpenAI Forward server")
uvicorn.run(app, host="0.0.0.0", port=8010)
数据库初始化:
import time
from tortoise import Tortoise
from ..config import log
from .models import FirebaseToken, User
USERS = []
async def _do_migration() -> None:
old_users = {
....
}
for k, v in old_users.items():
_ = await User.get_or_create(username=k, token=v, created_at=time.time())
async def _get_all_users() -> None:
users = await User.all()
if not users:
await _do_migration()
await _get_all_users()
for user in users:
USERS.append(dict(user))
async def db_init() -> None:
await Tortoise.init(db_url=DB_URI, modules={"models": ["gpt_proxy.db.models"]})
await Tortoise.generate_schemas()
await _get_all_users()
Openai正向类代码。不确定这是否有任何相关性,因为之前的请求工作得很好。所以我认为阻塞是在其他地方
import asyncio
import time
from functools import wraps
from typing import Any, AsyncGenerator, Callable, Tuple, Type, TypeVar
import aiohttp
import anyio
import fastapi
from fastapi import HTTPException
from starlette.responses import BackgroundTask, StreamingResponse
from .config import log
from .firebase_manager import Firebase
from .models import ClientConfig
from .utils import get_token_user, header_cloudflare_safe, mask_token
T = TypeVar("T")
def async_retry(
max_retries: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,),
) -> Callable:
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> T | None:
current_delay = delay
for attempt in range(max_retries + 1):
try:
if attempt > 0:
log.info(
f"Retrying {func.__name__}, attempt {attempt}/{max_retries} "
f"after {current_delay:.2f}s delay"
)
await anyio.sleep(current_delay)
current_delay *= backoff
return await func(*args, **kwargs)
except exceptions as e:
log.warning(
f"Attempt {attempt + 1}/{max_retries + 1} failed for {func.__name__}: "
f"{type(e).__name__}: {str(e)}"
)
if attempt == max_retries:
log.error(
f"All retry attempts failed for {func.__name__}. "
f"Final exception: {type(e).__name__}: {str(e)}"
)
raise
return None
return wrapper
return decorator
class OpenAiForward:
def __init__(self) -> None:
log.info("Initializing OpenAI Forward")
self.base_url = "https://api.openai.com/"
self.client: aiohttp.ClientSession | None = None
self.firebase = Firebase()
async def _init_client(self) -> None:
if self.client is None:
log.info("Initializing aiohttp client session")
tcp_connector = aiohttp.TCPConnector(
limit=500, limit_per_host=0, force_close=False
)
self.client = aiohttp.ClientSession(connector=tcp_connector)
log.info("aiohttp client session initialized")
async def _get_token(self, token: str):
log.info(f"Processing token {mask_token(token)}")
if token.startswith("mn-"):
username = await get_token_user(token)
if not username:
log.info("Using direct token")
return None, token
fb_token = await self.firebase.get_token()
return username, fb_token
else:
log.info("Using direct token")
return None, token
async def iter_bytes(
self, response: aiohttp.ClientResponse, request: fastapi.Request
) -> AsyncGenerator[bytes, Any]:
log.info(f"Streaming response for {request.url.path}")
async for chunk, _ in response.content.iter_chunks():
yield chunk
@async_retry(
max_retries=3,
delay=0.2,
backoff=0.2,
exceptions=(
aiohttp.ServerTimeoutError,
aiohttp.ServerConnectionError,
aiohttp.ServerDisconnectedError,
asyncio.TimeoutError,
anyio.EndOfStream,
RuntimeError,
),
)
async def send(
self, client_config: ClientConfig, data: dict | None = None
) -> aiohttp.client.ClientRequest | Any | None:
if not self.client:
await self._init_client()
log.info(f"Sending {client_config.method} request to {client_config.url}")
if self.client:
return await self.client.request(
method=client_config.method,
url=client_config.url,
data=data,
headers=client_config.headers,
)
return None
async def prepare_config(self, request: fastapi.Request) -> ClientConfig:
headers: dict = header_cloudflare_safe(request)
original_bearer: str = headers.get(
"Authorization", headers.get("authorization")
)
if original_bearer:
token: str = original_bearer.split()[-1].strip()
user, replacement_token = await self._get_token(token)
if replacement_token is None:
raise HTTPException(status_code=401, detail="Invalid token")
auth_header = f"Bearer {replacement_token}"
if "Authorization" in headers:
headers["Authorization"] = auth_header
elif "authorization" in headers:
headers["authorization"] = auth_header
log.info(
f"Token processing: User={user or 'direct'}, "
f"Using={'Firebase' if user else 'direct'} token"
)
url = f"https://api.openai.com/{request.url.path}"
if request.url.query:
url = f"{url}?{request.url.query}"
return ClientConfig(
headers=headers,
method=request.method,
url=url,
)
async def reverse_proxy(self, request: fastapi.Request) -> StreamingResponse:
request_id = str(time.time())
log.info(
f"[{request_id}] Incoming request: {request.method} {request.url.path}"
)
config = await self.prepare_config(request)
body = await request.body()
data = body if body else None
try:
log.info(f"[{request_id}] Forwarding request to OpenAI")
response = await self.send(config, data=data)
log.info(f"[{request_id}] OpenAI response received: {response.status}")
return StreamingResponse(
self.iter_bytes(response, request),
status_code=response.status,
media_type=response.headers.get("content-type"),
background=BackgroundTask(response.release),
)
except aiohttp.ClientError as e:
log.exception(f"[{request_id}] Failed to forward request to OpenAI")
raise fastapi.HTTPException(
status_code=fastapi.status.HTTP_502_BAD_GATEWAY,
detail=f"Failed to forward request: {str(e)}",
)
except Exception as e:
log.exception(f"[{request_id}] Unexpected error during request forwarding")
raise fastapi.HTTPException(
status_code=fastapi.status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error: {str(e)}",
)
我已经经过了耐火材料处理,问题似乎已经消失了。不知道是什么解决了它。要么是因为我使用内置 fastapi 命令启动了 Web 应用程序,要么是因为我指定了 fastapi 可以使用的工作人员数量。