我有一个带有子查询的查询。它根据 ID 进行分区,并选择分区中每个分区中最新的一个。效果很好:
subquery = db.query(
func.rank()
.over(
order_by=Table.CreatedAt.desc(),
partition_by=[Table.ID],
)
.label("rank"),
Table,
).subquery()
rows: list[Table] = db.query(subquery).filter(subquery.c.rank == 1).all()
问题在于
rows
的类型不是 list[Table]
而是 list[Rows]
并且它会丢失所有 ORM 信息。
如何让它返回
list[Table]
?
解决方案是查询模型类,将子查询传递给查询的 from_statement 方法:
stmt = db.query(subquery).filter(subquery.c.rank == 1)
rows: List[Table] = db.query(Table).from_statement(stmt).all()
或在 SQLAlchemy 2.0 风格中使用 from_statement:
import sqlalchemy as sa
...
stmt = sa.select(subquery).where(subquery.c.rank == 1)
rows: List[Table] = s.scalars(sa.select(Table).from_statement(stmt)).all()