SqlAlchemy asyncio orm:如何查询数据库

问题描述 投票:0回答:2

在 SqlAlchemy 异步 orm 引擎中,如何查询表并获取值或全部值?

我从非异步方法中知道我可以这样做

SESSION.query(TableClass).get(x)

但是使用异步方法尝试此操作会引发下一个错误:

AttributeError: 'AsyncSession' object has no attribute 'query'.

这是我定义的 SESSION 变量。 LOOP 变量只是

asyncio.get_event_loop()
用于在加载 sql 模块时启动异步方法并填充用作缓存的变量,以避免每次需要时都缓存数据库:

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

from .. import CONFIG, LOOP

def _build_async_db_uri(uri):
    if "+asyncpg" not in uri:
        return '+asyncpg:'.join(uri.split(":", 1))
    return uri

async def start() -> declarative_base:
    engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
    async with engine.begin() as conn:
        BASE.metadata.bind = engine
        await conn.run_sync(BASE.metadata.create_all)
    return scoped_session(sessionmaker(bind=engine, autoflush=False, class_=AsyncSession))


BASE = declarative_base()
SESSION = LOOP.run_until_complete(start())

以下是表和缓存函数的示例:

class TableClass:
    __tablename__ = "tableclass"
    id = Column(Integer, primary_key = True)
    alias = Column(Integer)

CACHE = {}
async def _load_all():
    global CACHE
    try:
        curr = await SESSION.query(TableClass).all()
        CACHE = {i.id: i.alias for i in curr}

LOOP.run_until_complete(_load_all())
python-3.x sqlalchemy python-asyncio
2个回答
37
投票

session.query 是旧的 API。异步版本使用 select 和随附的方法。

from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker


engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
async_session = sessionmaker(
    engine, expire_on_commit=False, class_=AsyncSession
)


CACHE = {}
async def _load_all():
    global CACHE
    try:
        async with async_session() as session:
            q = select(TableClass)
            result = await session.execute(q)
            curr = result.scalars()
            CACHE = {i.id: i.alias for i in curr}
    except:
        pass

LOOP.run_until_complete(_load_all())

您可以在这里阅读有关 SqlAlchemy 异步 I/O 的更多信息


12
投票

通过主键查询

版本 1.4 中的新增功能:添加了 Session.get(),该方法是从现已弃用的 Query.get() 方法中移出的。

  id = 1
  user = await session.get(User, id)

Session.get - SQLAlchemy 1.4 文档

其他栏目查询

正确的方法是使用 select 语句。

statement = select(User).where(User.name == 'John')
result = await session.execute(statement)

# This will return a collection of users named 'John'
johns : list[User] = result.scalars().all()

# This will take the first 'John'
first_john : User = result.scalar()

# This will take one result. 
# If your query has more than one result, an exception will be thrown. 
# Use it only if you're expecting a single result
john : User = result.scalars().one()

相关:https://stackoverflow.com/a/63591326/8843585

选择 - SQLAlchemy 1.4 文档

执行

更多示例

© www.soinside.com 2019 - 2024. All rights reserved.