使用 SQLAlchemy 创建嵌套查询

问题描述 投票:0回答:2

我有一个嵌套查询,需要使用 SQLAlchemy 在 Pyhton 项目中编写,但我尝试的所有内容似乎都不起作用,而且我不断收到错误。 这是 SQL 查询(我使用的是 Snowflake),它返回具有最大订单数的所有帐户(有多个帐户具有相同的最大订单数):

SELECT ACCOUNT_ID, COUNT(*) as MAX_ORDERS
FROM ORDERS
GROUP BY ACCOUNT_ID
HAVING COUNT(*) = (
    SELECT MAX(COUNT_ORDERS)
    FROM (
        SELECT ACCOUNT_ID, COUNT(ORDER_ID) as COUNT_ORDERS
        FROM ORDERS
        GROUP BY ACCOUNT_ID
        ORDER BY COUNT(ORDER_ID) DESC
    )
);

这是用 Python 编写的 Orders 类:

class Order(Base):
    __tablename__ = "ORDERS"

    order_id = db.Column(db.INTEGER, primary_key=True)
    account_id = db.Column(db.INTEGER)
    bank_to = db.Column(db.String(16777216))
    account_to = db.Column(db.INTEGER)
    amount = db.Column(db.Float)
    k_symbol = db.Column(db.String(16777216))

我尝试将查询分成 3 个相互关联的查询,但我正在从一个错误转到另一个错误。 有人可以帮我弄清楚这个嵌套查询吗?

python sqlalchemy snowflake-cloud-data-platform subquery
2个回答
0
投票

您可以使用两个子查询来做您想做的事情。

from sqlalchemy import create_engine, String, select, func, text, column
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session

class Base(DeclarativeBase):
    pass

class Order(Base):
    __tablename__ = "ORDERS"
    order_id: Mapped[int] = mapped_column(primary_key=True)
    account_id: Mapped[int]
    bank_to: Mapped[str] = mapped_column(String(16777216))
    account_to: Mapped[int]
    amount: Mapped[float]
    k_symbol: Mapped[str] = mapped_column(String(16777216))

engine = create_engine("sqlite:///temp.sqlite", echo=True)
Base.metadata.create_all(engine)

with Session(engine) as session:
    subq_1 = (
        select(Order.account_id, func.count(Order.order_id).label("COUNT_ORDERS"))
        .group_by(Order.account_id)
        .order_by(func.count(Order.order_id).desc())
    )
    subq_2 = select(func.max(column("COUNT_ORDERS"))).select_from(subq_1.subquery())
    statement = (
        select(Order.account_id, func.count(text("*")).label("MAX_ORDERS"))
        .group_by(Order.account_id)
        .having(func.count(text("*")) == subq_2.scalar_subquery())
    )
    for i in session.scalars(statement):
        print(i)

这会生成查询

SELECT "ORDERS".account_id, count(*) AS "MAX_ORDERS" 
FROM "ORDERS" GROUP BY "ORDERS".account_id 
HAVING count(*) = (SELECT max("COUNT_ORDERS") AS max_1 
FROM (SELECT "ORDERS".account_id AS account_id, count("ORDERS".order_id) AS "COUNT_ORDERS" 
FROM "ORDERS" GROUP BY "ORDERS".account_id ORDER BY count("ORDERS".order_id) DESC) AS anon_1)

0
投票

使用

func.count
func.max
:

from sqlalchemy import func

subquery = (
    db.session.query(
        Order.account_id,
        func.count(Order.order_id).label('count_orders')
    )
    .group_by(Order.account_id)
    .subquery()
)
max_orders_subquery = (
    db.session.query(
        func.max(subquery.c.count_orders).label('max_count_orders')
    )
    .subquery()
)
query = (
    db.session.query(
        Order.account_id,
        func.count(Order.order_id).label('max_order')
    )
    .group_by(Order.account_id)
    .having(func.count(Order.order_id) == max_orders_subquery.c.max_count_orders)
)
result = query.all()
© www.soinside.com 2019 - 2024. All rights reserved.