将 psycopg3 异步池与 SQLAlchemy、SQLModel 和 Fastapi 结合使用

问题描述 投票:0回答:1

我想利用 psycopg3、SQLAlchemy、SQLModel 和 FastAPI 对异步的最新改进。我找到了一些关于此类设置的参考资料,但从未涉及到 postgres 的连接池。

肯定缺少一些东西,因为我有一个错误:

“pool-1”中连接错误:连接信息字符串中“postgresql+psycopg://postgres:[email protected]:5432/foo”后缺少“=”

# Database

#CREATE TABLE IF NOT EXISTS hero (
#    id INT PRIMARY KEY,
#    name VARCHAR(255) NOT NULL,
#    country VARCHAR(50) NOT NULL
#);

import asyncio
from fastapi import FastAPI, APIRouter
from functools import lru_cache
from psycopg_pool import AsyncConnectionPool

DATABASE_URL = "postgresql+psycopg://postgres:[email protected]:5432/foo"

class AsyncPoolProxy:
    def __init__(self, pool):
        self.pool = pool

    async def acquire(self):
        return await self.pool.acquire()

    async def release(self, conn):
        await self.pool.release(conn)

    def dispose(self):
        pass


async def check_async_connections(async_pool):
    while True:
        await asyncio.sleep(600)
        print("check connections")
        await async_pool.check()

@lru_cache()
def get_async_pool():
    return AsyncConnectionPool(conninfo=DATABASE_URL)

app = FastAPI(
    title="Test FASTAPI SQLALCHEMY SQLMODEL",
    description="TBD",
    version="0.1.0",
    contact={
        "name": "RD",
    },
)

async_pool = get_async_pool()


@app.on_event("startup")
def startup():
    asyncio.create_task(check_async_connections(async_pool))


engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool,  # Pool handled directly
    connect_args={"server_settings": {"jit": "off"}},  # Deactivate JIT for better performances
)

async def get_session() -> AsyncSession:
    pool = get_async_pool()
    async_session = sessionmaker(
        bind=engine,
        class_=AsyncSession,
        expire_on_commit=False,
        pool=AsyncPoolProxy(pool)
    )
    async with async_session() as session:
        yield session


class Hero(SQLModel, table=True):
    __tablename__ = 'Hero'

    id: int = Field(primary_key=True)
    name: str
    country: str | None = None


@app.post("/heroes/")
def create_hero(hero: Hero, session: Session = Depends(get_session)):
    session.add(hero)
    session.commit()
    session.refresh(hero)
    return hero

@app.get("/heroes/{hero_id}")
def read_hero(hero_id: int, session: Session = Depends(get_session)):
    hero = session.get(Hero, hero_id)
    if not hero:
        raise HTTPException(status_code=404, detail="Hero not found")
    return hero

@app.get("/heroes/")
def read_heroes(session: Session = Depends(get_session)):
    heroes = session.exec(select(Hero)).all()
    return heroes
# Requirements
alembic==1.13.2
fastapi==0.112.1
psycopg==3.2.1
psycopg-binary==3.2.1
psycopg-pool==3.2.2
pydantic==2.8.2
pydantic_core==2.20.1
SQLAlchemy==2.0.32
sqlmodel==0.0.21
uvicorn==0.30.6

您有机会构建这样的设置吗?你知道为什么我的连接字符串不正确吗?

python postgresql sqlalchemy sqlmodel psycopg3
1个回答
0
投票

正如 Adrian 在评论中提到的,psycopg 的连接字符串不应包含驱动程序,而对于 sqlalchemy,如果您想避免 psycopg2,它也包含驱动程序

© www.soinside.com 2019 - 2024. All rights reserved.