我想利用 psycopg3、SQLAlchemy、SQLModel 和 FastAPI 对异步的最新改进。我找到了一些关于此类设置的参考资料,但从未涉及到 postgres 的连接池。
肯定缺少一些东西,因为我有一个错误:
“pool-1”中连接错误:连接信息字符串中“postgresql+psycopg://postgres:[email protected]:5432/foo”后缺少“=”
# Database
#CREATE TABLE IF NOT EXISTS hero (
# id INT PRIMARY KEY,
# name VARCHAR(255) NOT NULL,
# country VARCHAR(50) NOT NULL
#);
import asyncio
from fastapi import FastAPI, APIRouter
from functools import lru_cache
from psycopg_pool import AsyncConnectionPool
DATABASE_URL = "postgresql+psycopg://postgres:[email protected]:5432/foo"
class AsyncPoolProxy:
def __init__(self, pool):
self.pool = pool
async def acquire(self):
return await self.pool.acquire()
async def release(self, conn):
await self.pool.release(conn)
def dispose(self):
pass
async def check_async_connections(async_pool):
while True:
await asyncio.sleep(600)
print("check connections")
await async_pool.check()
@lru_cache()
def get_async_pool():
return AsyncConnectionPool(conninfo=DATABASE_URL)
app = FastAPI(
title="Test FASTAPI SQLALCHEMY SQLMODEL",
description="TBD",
version="0.1.0",
contact={
"name": "RD",
},
)
async_pool = get_async_pool()
@app.on_event("startup")
def startup():
asyncio.create_task(check_async_connections(async_pool))
engine = create_async_engine(
DATABASE_URL,
poolclass=NullPool, # Pool handled directly
connect_args={"server_settings": {"jit": "off"}}, # Deactivate JIT for better performances
)
async def get_session() -> AsyncSession:
pool = get_async_pool()
async_session = sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
pool=AsyncPoolProxy(pool)
)
async with async_session() as session:
yield session
class Hero(SQLModel, table=True):
__tablename__ = 'Hero'
id: int = Field(primary_key=True)
name: str
country: str | None = None
@app.post("/heroes/")
def create_hero(hero: Hero, session: Session = Depends(get_session)):
session.add(hero)
session.commit()
session.refresh(hero)
return hero
@app.get("/heroes/{hero_id}")
def read_hero(hero_id: int, session: Session = Depends(get_session)):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.get("/heroes/")
def read_heroes(session: Session = Depends(get_session)):
heroes = session.exec(select(Hero)).all()
return heroes
# Requirements
alembic==1.13.2
fastapi==0.112.1
psycopg==3.2.1
psycopg-binary==3.2.1
psycopg-pool==3.2.2
pydantic==2.8.2
pydantic_core==2.20.1
SQLAlchemy==2.0.32
sqlmodel==0.0.21
uvicorn==0.30.6
您有机会构建这样的设置吗?你知道为什么我的连接字符串不正确吗?
正如 Adrian 在评论中提到的,psycopg 的连接字符串不应包含驱动程序,而对于 sqlalchemy,如果您想避免 psycopg2,它也包含驱动程序