我正在尝试使用 SQLAlchemy 进行更新插入。我知道 SQL 中没有 upsert 这样的东西 但 SQLAlchemy 提供了这个
https://docs.SQLAlchemy.org/en/20/dialects/mysql.html#insert-on-duplicate-key-update-upsert
现在我正在尝试使用 SQLAlchemy ORM 会话执行同样的操作
这是我的代码:
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(engine)
with Session() as session:
"""Here upsert functionality"""
session.insert_or_update(company)
session.commit()
session.merge(company)
我还没有尝试过,因为我也在寻找多条记录
编辑:session.merge(company)对我来说效果很好,因为我只需要检查主键而不是其他唯一值
这就是文档所说的:
Session.merge() 检查源实例的主键属性,并尝试将其与会话中具有相同主键的实例进行协调。如果本地找不到,则尝试根据主键从数据库加载对象,如果找不到,则创建一个新实例。然后,将源实例上每个属性的状态复制到目标实例。然后该方法返回生成的目标实例;原始源实例保持不变,并且如果尚未与会话关联,则不关联。
所以现在如果有人知道如何对多个对象执行相同的操作,那将非常有帮助。
正如您所注意到的,
Session.merge()
将在逐个对象的基础上完成任务。例如,如果我们有
class Thing(Base):
__tablename__ = "thing"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=False)
txt: Mapped[str] = mapped_column(String(50))
my_thing = Thing(id=1, txt="foo")
我们能做到
with Session(engine) as sess:
sess.merge(my_thing)
sess.commit()
但是,我们不想做类似的事情
rows_to_upsert = 4000
things = [Thing(id=i, txt=f"txt_{i}") for i in range(rows_to_upsert)]
with Session(engine, autoflush=False) as sess:
t0 = time.perf_counter()
for thing in things:
sess.merge(thing)
sess.commit()
print(
f"merge: {rows_to_upsert:,} rows upserted in {(time.perf_counter() - t0):0.1f} seconds"
)
因为这将导致 4000 次往返服务器来为每个对象执行 SELECT,而且速度会很慢。 (在我的测试中,更新插入 4000 行大约需要 40 秒,或者大约 100 行/秒。)
相反,我们应该将对象列表转换为
dict
列表
list_of_dict = [dict(id=thing.id, txt=thing.txt) for thing in things]
然后使用
INSERT … ON DUPLICATE KEY
语句
from sqlalchemy.dialects.mysql import insert
insert_stmt = insert(Thing).values(list_of_dict)
on_duplicate_stmt = insert_stmt.on_duplicate_key_update(
dict(txt=insert_stmt.inserted.txt)
)
with Session(engine) as sess:
t0 = time.perf_counter()
sess.execute(on_duplicate_stmt)
sess.commit()
print(
f"insert: {rows_to_upsert:,} rows upserted in {(time.perf_counter() - t0):0.1f} seconds"
)
只花费了大约 2 秒,或大约 2000 行/秒。