我正在尝试在我的 Flask API 项目中添加一个非常基本的 SQL 查询。我使用 SQLAlchemy 作为数据库操作工具。
我要运行的查询如下:
SELECT * from trip_metadata where trip_id in ('trip_id_1', 'trip_id_2', ..., 'trip_id_n')
所以,在我的代码中,我写道:
trips_ids = ['trip_id_1', 'trip_id_2', ..., 'trip_id_n']
result = session.query(dal.trip_table).filter(dal.trip_table.columns.trip_id.in_(trips_ids)).all()
当 n 较低时,假设 n=10,效果非常好。我得到了预期的结果。然而,当 n 很高时,假设 n > 1000,它就会崩溃。我很惊讶,因为在过滤器中放入许多值似乎很常见。
from sqlalchemy import text
result = session.execute(text(f"SELECT * FROM trip_metadata where trip_id in {trip_ids_tuple}"))
错误日志为:
sqlalchemy.exc.DBAPIError: (pyodbc.Error) ('07002', '[07002] [Microsoft][ODBC Driver 17 for SQL Server]COUNT field incorrect or syntax error (0) (SQLExecDirectW)')')
[SQL: SELECT * FROM trip_metadata
WHERE trip_metadata.trip_id IN (?, ?, ..., ?)]
[parameters: ('ABC12345-XXXX-XXXX-XXXX-000000000000', 'DEF12345-XXXX-XXXX-XXXX-000000000000', ..., 'GHI12345-XXXX-XXXX-XXXX-000000000000')]
(Background on this error at: https://sqlalche.me/e/14/dbapi)
127.0.0.1 - - [05/Jan/2023 10:35:48] "POST /api/v1/tripsAggregates HTTP/1.1" 500 -
但是,当我编写原始请求时,即使 n 非常高,它也能正常工作:
from sqlalchemy import text
trip_ids_tuple = ('trip_id_1', 'trip_id_2', ..., 'trip_id_n')
result = session.execute(text(f"SELECT * FROM trip_metadata where trip_id in {trip_ids_tuple}"))
但我认为这不是一个好方法,因为我有更复杂的请求要编写,并且使用 sqlalchemy 过滤器更适合。
您有什么想法来解决我继续使用 sqlalchemy 库的问题吗?非常感谢你
Microsoft SQL Server 的 ODBC 驱动程序使用服务器上的系统存储过程执行语句(
sp_prepexec
或 sp_prepare
)。 SQL Server 上的存储过程仅限于 2100 个参数值,因此使用如下模型
class Trip(Base):
__tablename__ = "trip"
id = Column(String(32), primary_key=True)
此代码可以工作
with Session(engine) as session:
trips_ids = ["trip_id_1", "trip_id_2"]
q = session.query(Trip).where(Trip.id.in_(trips_ids))
results = q.all()
"""SQL emitted:
SELECT trip.id AS trip_id
FROM trip
WHERE trip.id IN (?, ?)
[generated in 0.00092s] ('trip_id_1', 'trip_id_2')
"""
因为它只有两个参数值。如果
trips_ids
列表的长度增加到数千个值,代码最终将失败。
避免此问题的一种方法是让 SQLAlchemy 使用文字值而不是参数占位符构造 IN 子句:
q = session.query(Trip).where(
Trip.id.in_(bindparam("p1", expanding=True, literal_execute=True))
)
results = q.params(p1=trips_ids).all()
"""SQL emitted:
SELECT trip.id AS trip_id
FROM trip
WHERE trip.id IN ('trip_id_1', 'trip_id_2')
[generated in 0.00135s] ()
"""
从错误来看,它可能表明格式问题(正确转义字符串字符?)。当 N 很小时,这种情况就会发生,破坏格式的数据发生的可能性很小。当 N 变大时,sqlalchemy 尝试放入查询中的“坏数据”的可能性就更大。这里不能确定,这也可能是内存或操作系统问题。
首先要问的是,您是否需要在查询中从外部提供元组?有没有办法通过连接查询
trip_ids
?通常最好将操作推送到 SQL 引擎,但如果您在其他地方获取 id 的元组/列表,则这并不总是可能的。
排除是否存在数据问题导致
execute()
期间出错。研究转义字符串值。您可以尝试将列表分成更小的部分,以缩小可能有问题的值的范围(请参阅下面的附录)
尝试使用不同的方式对查询进行字符串格式化。
sql = f"SELECT * FROM trip_metadata where trip_id in ({','.join(trip_ids_tuple)})"
sql
输出str值:
'SELECT * FROM trip_metadata where trip_id in (trip_id_1,trip_id_2,trip_id_n)'
附录:
您可以构建一个分块机制来破坏。崩溃可能是操作系统或内存问题。例如,您可以使用列表切片:
def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
# Example usage
my_list = [1, 2, 3, 4, 5, 6, 7, 8]
chunk_size = 3
for chunk in chunker(my_list, chunk_size):
print(chunk)
输出
[1, 2, 3]
[4, 5, 6]
[7, 8]
使用 N 可控的块大小。这也将有助于缩小可能出错的 str 值的范围。
这可能无法回答您的问题,但我发现它解决了听起来相似的问题..
如果您使用 sqlalchemy 和/或 cvs2sql 并收到 sqlite 语法/sqlalchemy 操作错误,并且 select * 打印一切正常,但“select [field name]..”导致错误,您可以使用“select `field name`”注意使用转义的反引号而不是“”或“并且数据将打印祝你好运